input.cc
Go to the documentation of this file.
1 /*
2  * License: Modified BSD Software License Agreement
3  *
4  * $Id$
5  */
6 
21 #include <unistd.h>
22 #include <sys/socket.h>
23 #include <arpa/inet.h>
24 #include <poll.h>
25 #include <errno.h>
26 #include <fcntl.h>
27 #include <sys/file.h>
28 #include <o3m151_driver/input.h>
29 
30 #include <algorithm>
31 #include <iostream>
32 #include <numeric>
33 #include <vector>
34 
35 #define RESULT_OK (0)
36 #define RESULT_ERROR (-1)
37 
38 namespace o3m151_driver
39 {
40  static const size_t packet_size = 1024; //sizeof(o3m151_msgs::O3M151Packet().data);
41  // The data is in the channel 8
42  static const uint32_t customerDataChannel = 8;
43 
44  typedef struct PacketHeader
45  {
46  uint16_t Version;
47  uint16_t Device;
48  uint32_t PacketCounter;
49  uint32_t CycleCounter;
54  uint32_t ChannelID;
56  uint32_t LengthPayload;
57  } PacketHeader;
58 
59 
60  typedef struct ChannelHeader
61  {
62  uint32_t StartDelimiter;
63  uint8_t reserved[24];
64  } ChannelHeader;
65 
66  typedef struct ChannelEnd
67  {
68  uint32_t EndDelimiter;
69  } ChannelEnd;
70 
72  channel_buf_size_(0),
73  previous_packet_counter_(0),
74  previous_packet_counter_valid_(false),
75  startOfChannelFound_(false),
76  pos_in_channel_(0)
77  {
78  channelBuf = NULL;
79  }
80 
81  // Extracts the data in the payload of the udp packet und puts it into the channel buffer
82  int Input::processPacket( int8_t* currentPacketData, // payload of the udp packet (without ethernet/IP/UDP header)
83  uint32_t currentPacketSize, // size of the udp packet payload
84  int8_t* channelBuffer, // buffer for a complete channel
85  uint32_t channelBufferSize, // size of the buffer for the complete channel
86  uint32_t* pos) // the current pos in the channel buffer
87  {
88 
89  // There is always a PacketHeader structure at the beginning
90  PacketHeader* ph = (PacketHeader*)currentPacketData;
91  int Start = sizeof(PacketHeader);
92  int Length = currentPacketSize - sizeof(PacketHeader);
93 
94  // Only the first packet of a channel contains a ChannelHeader
95  if (ph->IndexOfPacketInChannel == 0)
96  {
97  Start += sizeof(ChannelHeader);
98  }
99 
100  // Only the last packet of a channel contains an EndDelimiter (at the end, after the data)
102  {
103  Length -= sizeof(ChannelEnd);
104  }
105 
106  // Is the buffer big enough?
107  if ((*pos) + Length > channelBufferSize)
108  {
109  // Too small means either an error in the program logic or a corrupt packet
110  ROS_DEBUG("Channel buffer is too small.\n");
111  return RESULT_ERROR;
112  }
113  else
114  {
115  memcpy(channelBuffer+(*pos), currentPacketData+Start, Length);
116  }
117 
118  (*pos) += Length;
119 
120  return RESULT_OK;
121  }
122 
123 
124  double Input::slope(const std::vector<double>& x, const std::vector<double>& y) {
125  const double n = x.size();
126  const double s_x = std::accumulate(x.begin(), x.end(), 0.0);
127  const double s_y = std::accumulate(y.begin(), y.end(), 0.0);
128  const double s_xx = std::inner_product(x.begin(), x.end(), x.begin(), 0.0);
129  const double s_xy = std::inner_product(x.begin(), x.end(), y.begin(), 0.0);
130  const double a = (n * s_xy - s_x * s_y) / (n * s_xx - s_x * s_x);
131  return a;
132  }
133 
134  // Gets the data from the channel and prints them
135  void Input::processChannel8(int8_t* buf, uint32_t size, pcl::PointCloud<pcl::PointXYZI> & pc)
136  {
137  // you may find the offset of the data in the documentation
138  const uint32_t distanceX_offset = 2052;
139  const uint32_t distanceY_offset = 6148;
140  const uint32_t distanceZ_offset = 10244;
141  const uint32_t confidence_offset = 14340;
142  const uint32_t amplitude_offset = 16388;
143  const uint32_t amplitude_normalization_offset = 18436;
144  const uint32_t camera_calibration_offset = 18464;
145 
146  // As we are working on raw buffers we have to check if the buffer is as big as we think
147  if (size < 18488)
148  {
149  ROS_DEBUG("processChannel8: buf too small\n");
150  return;
151  }
152 
153  // These are arrays with 16*64=1024 elements
154  float* distanceX = (float*)(&buf[distanceX_offset]);
155  float* distanceY = (float*)(&buf[distanceY_offset]);
156  float* distanceZ = (float*)(&buf[distanceZ_offset]);
157  uint16_t* confidence = (uint16_t*)(&buf[confidence_offset]);
158  uint16_t* amplitude = (uint16_t*)(&buf[amplitude_offset]);
159  float* amplitude_normalization = (float*)(&buf[amplitude_normalization_offset]);
160  float* camera_calibration = (float*)(&buf[camera_calibration_offset]);
161 
162  // line 8 column 32 is in the middle of the sensor
163  //uint32_t i = 64*7 + 32;
164 
165  std::vector<double> vx, vz;
166  for( uint32_t m =0; m<16; ++m)
167  {
168  for( uint32_t j =0; j<64; ++j)
169  {
170  uint32_t i = 64*m +j;
171  // The lowest bit of the confidence contains the information if the pixel is valid
172  uint32_t pixelValid = confidence[i] & 1;
173 
174  // 0=valid, 1=invalid
175  if (distanceX[i] > 0.2)// && j > 20 && j < 50 && m >0 && m <12)
176  {
177  // \r is carriage return without newline. This way the output is always written in the same line
178  // ROS_DEBUG("X: %5.2f Y: %5.2f Z:%5.2f Amplitude:%5d \r",
179  // distanceX[i],
180  // distanceY[i],
181  // distanceZ[i],
182  // amplitude[i]);
183  pcl::PointXYZI point;
184  point.x = distanceX[i] ;//- camera_calibration[0];
185  point.y = distanceY[i] ;//- camera_calibration[1];
186  point.z = distanceZ[i] - 1;// camera_calibration[2];
187 // bool exposure = (confidence[i] >> 1) & 1;
188 // bool gain = (confidence[i] >> 2) & 1;
189 // int norm_index;
190 // if(gain == false)
191 // if(exposure == false)
192 // norm_index = 0;
193 // else
194 // norm_index = 1;
195 // else
196 // if(exposure == false)
197 // norm_index = 2;
198 // else
199 // norm_index = 3;
200  float normalization_factor = 1;
201 // float normalization_factor = amplitude_normalization[norm_index];
202 // ROS_INFO_STREAM("gain "<<gain<<" exposure "<<exposure<<
203 // " amplitude[i] "<<amplitude[i]<<" normalization_factor "<<normalization_factor);
204  point.intensity = amplitude[i]*normalization_factor;
205  pc.points.push_back(point);
206 // if(j == 40)
207 // {
208 // vx.push_back(point.x);
209 // vz.push_back(point.z);
210 // }
211  }
212  }
213  }
214  for(int n = 0; n <4; n++)
215  ROS_DEBUG_STREAM("amplitude_normalization "<<n<<" "<<amplitude_normalization[n]);
216 // static int k = 0;
217 // static int l = 0;
218 // l++;
219 // static double s=0, s_mean=0;
220 // s += slope(vz, vx);
221 // if(l%10 == 0)
222 // {
223 // k++;
224 // if(k > 63)
225 // k =0;
226 
227 // s_mean = s/10;
228 // s=0;
229 // }
230 // ROS_INFO_STREAM("vx "<<vx.size()<<" vz "<<vz.size());
231 // ROS_INFO_STREAM("s "<<s_mean);
232  }
233 
234  int Input::process(int8_t* udpPacketBuf, const ssize_t rc, pcl::PointCloud<pcl::PointXYZI> & pc)
235  {
236  // As the alignment was forced to 1 we can work with the struct on the buffer.
237  // This assumes the byte order is little endian which it is on a PC.
238  PacketHeader* ph = (PacketHeader*)udpPacketBuf;
239 
240  // Check the packet counter for missing packets
242  {
243  // if the type of the variables is ui32, it will also work when the wrap around happens.
244  if ((ph->PacketCounter - previous_packet_counter_) != 1)
245  {
246  ROS_ERROR("Packet Counter jumped from %ul to %ul", previous_packet_counter_, ph->PacketCounter);
247 
248  // With this it will ignore the already received parts and resynchronize at
249  // the beginning of the next cycle.
250  startOfChannelFound_ = false;
251  }
252  }
253 
256 
257  // is this the channel with our data?
258  if (ph->ChannelID == customerDataChannel)
259  {
260  // are we at the beginning of the channel?
261  if (ph->IndexOfPacketInChannel == 0)
262  {
263  startOfChannelFound_ = true;
264 
265  // If we haven't allocated memory for channel do it now.
266  if (channel_buf_size_ == 0)
267  {
269  channelBuf = new int8_t[channel_buf_size_];
270  }
271 
272  // as we reuse the buffer we clear it at the beginning of a transmission
273  memset(channelBuf, 0, channel_buf_size_);
274  pos_in_channel_ = 0;
275  }
276 
277  // if we have found the start of the channel at least once, we are ready to process the packet
279  {
281 
282  // Have we found the last packet in this channel? Then we are able to process it
283  // The index is zero based so a channel with n parts will have indices from 0 to n-1
285  {
287  return RESULT_OK;
288  }
289  }
290  }
291  }
293  // InputSocket class implementation
295 
301  InputSocket::InputSocket(ros::NodeHandle private_nh, uint16_t udp_port):
302  Input()
303  {
304  sockfd_ = -1;
305 
306  // connect to O3M151 UDP port
307  ROS_INFO_STREAM("Opening UDP socket: port " << udp_port);
308  sockfd_ = socket(PF_INET, SOCK_DGRAM, 0);
309  if (sockfd_ == -1)
310  {
311  perror("socket"); // TODO: ROS_ERROR errno
312  return;
313  }
314 
315  sockaddr_in my_addr; // my address information
316  memset(&my_addr, 0, sizeof(my_addr)); // initialize to zeros
317  my_addr.sin_family = AF_INET; // host byte order
318  my_addr.sin_port = htons(udp_port); // short, in network byte order
319  my_addr.sin_addr.s_addr = INADDR_ANY; // automatically fill in my IP
320 
321  if (bind(sockfd_, (sockaddr *)&my_addr, sizeof(sockaddr)) == -1)
322  {
323  perror("bind"); // TODO: ROS_ERROR errno
324  return;
325  }
326 
327  if (fcntl(sockfd_,F_SETFL, O_NONBLOCK|FASYNC) < 0)
328  {
329  perror("non-block");
330  return;
331  }
332 
333  ROS_DEBUG("O3M151 socket fd is %d\n", sockfd_);
334  }
335 
338  {
339  (void) close(sockfd_);
340  }
341 
342  int InputSocket::getPacket(pcl::PointCloud<pcl::PointXYZI> &pc)
343  {
344 
345  // buffer for a single UDP packet
346  const uint32_t udpPacketBufLen = 2000;
347  int8_t udpPacketBuf[udpPacketBufLen];
348 
349  struct pollfd fds[1];
350  fds[0].fd = sockfd_;
351  fds[0].events = POLLIN;
352  static const int POLL_TIMEOUT = 1000; // one second (in msec)
353 
354  // run for all eternity as long as no error occurs
355  while (true)
356  {
357  // Unfortunately, the Linux kernel recvfrom() implementation
358  // uses a non-interruptible sleep() when waiting for data,
359  // which would cause this method to hang if the device is not
360  // providing data. We poll() the device first to make sure
361  // the recvfrom() will not block.
362  //
363  // Note, however, that there is a known Linux kernel bug:
364  //
365  // Under Linux, select() may report a socket file descriptor
366  // as "ready for reading", while nevertheless a subsequent
367  // read blocks. This could for example happen when data has
368  // arrived but upon examination has wrong checksum and is
369  // discarded. There may be other circumstances in which a
370  // file descriptor is spuriously reported as ready. Thus it
371  // may be safer to use O_NONBLOCK on sockets that should not
372  // block.
373 
374  // poll() until input available
375  do
376  {
377  int retval = poll(fds, 1, POLL_TIMEOUT);
378  if (retval < 0) // poll() error?
379  {
380  if (errno != EINTR)
381  ROS_ERROR("poll() error: %s", strerror(errno));
382  return 1;
383  }
384  if (retval == 0) // poll() timeout?
385  {
386  ROS_WARN("O3M151 poll() timeout");
387  return 1;
388  }
389  if ((fds[0].revents & POLLERR)
390  || (fds[0].revents & POLLHUP)
391  || (fds[0].revents & POLLNVAL)) // device error?
392  {
393  ROS_ERROR("poll() reports O3M151 error");
394  return 1;
395  }
396  } while ((fds[0].revents & POLLIN) == 0);
397  // receive the data. rc contains the number of received bytes and also the error code
398  // IMPORTANT: This is a blocking call. If it doesn't receive anything it will wait forever
399  ssize_t rc=recvfrom(sockfd_,(char*)udpPacketBuf,udpPacketBufLen,0,NULL,NULL);
400  if(rc < 0)
401  return RESULT_ERROR;
402  else
403  {
404  int result = process(udpPacketBuf, rc, pc);
405  ROS_DEBUG("result process %d", result);
406  if(result == RESULT_OK)
407  return result;
408  }
409  }
410  }
411 
413  // InputPCAP class implementation
415 
426  double packet_rate,
427  std::string filename,
428  bool read_once,
429  bool read_fast,
430  double repeat_delay):
431  Input(),
432  packet_rate_(packet_rate)
433  {
434  filename_ = filename;
435  fp_ = NULL;
436  pcap_ = NULL;
437  empty_ = true;
438 
439  // get parameters using private node handle
440  private_nh.param("read_once", read_once_, read_once);
441  private_nh.param("read_fast", read_fast_, read_fast);
442  private_nh.param("repeat_delay", repeat_delay_, repeat_delay);
443 
444  if (read_once_)
445  ROS_INFO("Read input file only once.");
446  if (read_fast_)
447  ROS_INFO("Read input file as quickly as possible.");
448  if (repeat_delay_ > 0.0)
449  ROS_INFO("Delay %.3f seconds before repeating input file.",
450  repeat_delay_);
451 
452  // Open the PCAP dump file
453  ROS_INFO("Opening PCAP file \"%s\"", filename_.c_str());
454  if ((pcap_ = pcap_open_offline(filename_.c_str(), errbuf_) ) == NULL)
455  {
456  ROS_FATAL("Error opening O3M151 socket dump file.");
457  return;
458  }
459  }
460 
461 
464  {
465  pcap_close(pcap_);
466  }
467 
468 
470  int InputPCAP::getPacket(pcl::PointCloud<pcl::PointXYZI> &pc)
471  {
472  struct pcap_pkthdr *header;
473  const u_char *pkt_data;
474  // buffer for a single UDP packet
475  const uint32_t udpPacketBufLen = 2000;
476  int8_t udpPacketBuf[udpPacketBufLen];
477 
478  while (true)
479  {
480  bool keep_reading = false;
481  int res;
482  if ((res = pcap_next_ex(pcap_, &header, &pkt_data)) >= 0)
483  {
484  // Keep the reader from blowing through the file.
485  if (read_fast_ == false)
487 
488  memcpy(udpPacketBuf, pkt_data+42, header->len);
489  int result = RESULT_ERROR;
490  result = process(udpPacketBuf, header->len-42, pc);
491  ROS_DEBUG("result process %d", header->len);
492  empty_ = false;
493  if(result == RESULT_OK)
494  return RESULT_OK; // success
495  else
496  keep_reading = true;
497  }
498 
499  if (empty_) // no data in file?
500  {
501  ROS_WARN("Error %d reading O3M151 packet: %s",
502  res, pcap_geterr(pcap_));
503  return -1;
504  }
505 
506  if (read_once_)
507  {
508  ROS_INFO("end of file reached -- done reading.");
509  return -1;
510  }
511 
512  if (repeat_delay_ > 0.0)
513  {
514  ROS_INFO("end of file reached -- delaying %.3f seconds.",
515  repeat_delay_);
516  usleep(rint(repeat_delay_ * 1000000.0));
517  }
518 
519  if(keep_reading == false)
520  {
521  ROS_DEBUG("replaying O3M151 dump file");
522  // I can't figure out how to rewind the file, because it
523  // starts with some kind of header. So, close the file
524  // and reopen it with pcap.
525  pcap_close(pcap_);
526  pcap_ = pcap_open_offline(filename_.c_str(), errbuf_);
527  empty_ = true; // maybe the file disappeared?
528  }
529  } // loop back and try again
530  }
531 
532 } // o3m151 namespace
#define ROS_FATAL(...)
uint16_t IndexOfPacketInChannel
Definition: input.cc:53
int8_t * channelBuf
Definition: input.h:72
int processPacket(int8_t *currentPacketData, uint32_t currentPacketSize, int8_t *channelBuffer, uint32_t channelBufferSize, uint32_t *pos)
Definition: input.cc:82
virtual int getPacket(pcl::PointCloud< pcl::PointXYZI > &pc)
Read one O3M151 packet.
Definition: input.cc:342
uint16_t NumberOfPacketsInChannel
Definition: input.cc:52
char errbuf_[PCAP_ERRBUF_SIZE]
Definition: input.h:122
uint32_t TotalLengthOfChannel
Definition: input.cc:55
double slope(const std::vector< double > &x, const std::vector< double > &y)
Definition: input.cc:124
struct o3m151_driver::PacketHeader PacketHeader
#define ROS_WARN(...)
void processChannel8(int8_t *buf, uint32_t size, pcl::PointCloud< pcl::PointXYZI > &pc)
Definition: input.cc:135
std::string filename_
Definition: input.h:119
uint32_t previous_packet_counter_
Definition: input.h:78
InputPCAP(ros::NodeHandle private_nh, double packet_rate, std::string filename="", bool read_once=false, bool read_fast=false, double repeat_delay=0.0)
constructor
Definition: input.cc:425
#define RESULT_OK
Definition: input.cc:35
#define ROS_INFO(...)
bool startOfChannelFound_
Definition: input.h:81
bool param(const std::string &param_name, T &param_val, const T &default_val) const
struct o3m151_driver::ChannelHeader ChannelHeader
uint32_t pos_in_channel_
Definition: input.h:75
uint16_t NumberOfPacketsInCycle
Definition: input.cc:50
~InputSocket()
destructor
Definition: input.cc:337
#define ROS_DEBUG_STREAM(args)
bool sleep()
static const uint32_t customerDataChannel
Definition: input.cc:42
int process(int8_t *udpPacketBuf, const ssize_t rc, pcl::PointCloud< pcl::PointXYZI > &pc)
Definition: input.cc:234
uint16_t IndexOfPacketInCycle
Definition: input.cc:51
uint32_t channel_buf_size_
Definition: input.h:71
virtual int getPacket(pcl::PointCloud< pcl::PointXYZI > &pc)
Get one o3m151 packet.
Definition: input.cc:470
#define ROS_INFO_STREAM(args)
InputSocket(ros::NodeHandle private_nh, uint16_t udp_port=UDP_PORT_NUMBER)
constructor
Definition: input.cc:301
ros::Rate packet_rate_
Definition: input.h:127
#define RESULT_ERROR
Definition: input.cc:36
static const size_t packet_size
Definition: input.cc:40
Length
#define ROS_ERROR(...)
bool previous_packet_counter_valid_
Definition: input.h:79
Pure virtual O3M151 input base class.
Definition: input.h:44
struct o3m151_driver::ChannelEnd ChannelEnd
#define ROS_DEBUG(...)


o3m151_driver
Author(s): Vincent Rousseau
autogenerated on Mon Jun 10 2019 14:07:55