dispatch.cc
Go to the documentation of this file.
1 
37 #include "details/channel.hh"
38 
40 
43 
49 
51 
53 
55 
57 
68 
70 
74 
77 
78 #include <limits>
79 
80 namespace crl {
81 namespace multisense {
82 namespace details {
83 namespace {
84 
85 //
86 // Default UDP assembler
87 
88 void defaultUdpAssembler(utility::BufferStreamWriter& stream,
89  const uint8_t *dataP,
90  uint32_t offset,
91  uint32_t length)
92 {
93  stream.seek(offset);
94  stream.write(dataP, length);
95 }
96 
97 }; // anonymous
98 
99 //
100 // Publish an image
101 
103  image::Header& header)
104 {
106 
107  std::list<ImageListener*>::const_iterator it;
108 
109  for(it = m_imageListeners.begin();
110  it != m_imageListeners.end();
111  it ++)
112  (*it)->dispatch(buffer, header);
113 }
114 
115 //
116 // Publish a laser scan
117 
119  lidar::Header& header)
120 {
122 
123  std::list<LidarListener*>::const_iterator it;
124 
125  for(it = m_lidarListeners.begin();
126  it != m_lidarListeners.end();
127  it ++)
128  (*it)->dispatch(buffer, header);
129 }
130 //
131 // Publish a PPS event
132 
134 {
136 
137  std::list<PpsListener*>::const_iterator it;
138 
139  for(it = m_ppsListeners.begin();
140  it != m_ppsListeners.end();
141  it ++)
142  (*it)->dispatch(header);
143 }
144 
145 //
146 // Publish an IMU event
147 
149 {
151 
152  std::list<ImuListener*>::const_iterator it;
153 
154  for(it = m_imuListeners.begin();
155  it != m_imuListeners.end();
156  it ++)
157  (*it)->dispatch(header);
158 }
159 
160 //
161 // Dispatch incoming messages
162 
164 {
165  utility::BufferStreamReader stream(buffer);
166 
167  //
168  // Extract the type and version fields, which are stored
169  // first in the stream
170 
171  wire::IdType id;
172  wire::VersionType version;
173 
174  stream & id;
175  stream & version;
176 
177  //
178  // Handle the message.
179  //
180  // Simple, low-rate messages are dynamically stored
181  // off for possible presentation to the user in a signal
182  // handler.
183  //
184  // Larger data types use a threaded dispatch
185  // mechanism with a reference-counted buffer back-end.
186 
187  switch(id) {
189  {
190  wire::LidarData scan(stream, version);
192 
193  const int32_t scanArc = static_cast<int32_t> (utility::degreesToRadians(270.0) * 1e6); // microradians
194  const uint32_t maxRange = static_cast<uint32_t> (30.0 * 1e3); // mm
195 
196  if (false == m_networkTimeSyncEnabled) {
197 
198  header.timeStartSeconds = scan.timeStartSeconds;
200  header.timeEndSeconds = scan.timeEndSeconds;
202 
203  } else {
204 
205  sensorToLocalTime(static_cast<double>(scan.timeStartSeconds) +
206  1e-6 * static_cast<double>(scan.timeStartMicroSeconds),
207  header.timeStartSeconds, header.timeStartMicroSeconds);
208 
209  sensorToLocalTime(static_cast<double>(scan.timeEndSeconds) +
210  1e-6 * static_cast<double>(scan.timeEndMicroSeconds),
211  header.timeEndSeconds, header.timeEndMicroSeconds);
212  }
213 
214  header.scanId = scan.scanCount;
215  header.spindleAngleStart = scan.angleStart;
216  header.spindleAngleEnd = scan.angleEnd;
217  header.scanArc = scanArc;
218  header.maxRange = maxRange;
219  header.pointCount = scan.points;
220  header.rangesP = scan.distanceP;
221  header.intensitiesP = scan.intensityP;
222 
223  dispatchLidar(buffer, header);
224 
225  break;
226  }
228  {
229  wire::ImageMeta *metaP = new (std::nothrow) wire::ImageMeta(stream, version);
230 
231  if (NULL == metaP)
232  CRL_EXCEPTION("unable to allocate metadata");
233 
234  m_imageMetaCache.insert(metaP->frameId, metaP); // destroys oldest
235 
236  break;
237  }
239  {
240  wire::JpegImage image(stream, version);
241 
242  const wire::ImageMeta *metaP = m_imageMetaCache.find(image.frameId);
243  if (NULL == metaP)
244  break;
245  //CRL_EXCEPTION("no meta cached for frameId %d", image.frameId);
246 
248 
249  if (false == m_networkTimeSyncEnabled) {
250 
251  header.timeSeconds = metaP->timeSeconds;
252  header.timeMicroSeconds = metaP->timeMicroSeconds;
253 
254  } else
255  sensorToLocalTime(static_cast<double>(metaP->timeSeconds) +
256  1e-6 * static_cast<double>(metaP->timeMicroSeconds),
257  header.timeSeconds, header.timeMicroSeconds);
258 
259  header.source = sourceWireToApi(image.source);
260  header.bitsPerPixel = 0;
261  header.width = image.width;
262  header.height = image.height;
263  header.frameId = image.frameId;
264  header.exposure = metaP->exposureTime;
265  header.gain = metaP->gain;
266  header.framesPerSecond = metaP->framesPerSecond;
267  header.imageDataP = image.dataP;
268  header.imageLength = image.length;
269 
270  dispatchImage(buffer, header);
271 
272  break;
273  }
274  case MSG_ID(wire::Image::ID):
275  {
276  wire::Image image(stream, version);
277 
278  const wire::ImageMeta *metaP = m_imageMetaCache.find(image.frameId);
279  if (NULL == metaP)
280  break;
281  //CRL_EXCEPTION("no meta cached for frameId %d", image.frameId);
282 
284 
285  if (false == m_networkTimeSyncEnabled) {
286 
287  header.timeSeconds = metaP->timeSeconds;
288  header.timeMicroSeconds = metaP->timeMicroSeconds;
289 
290  } else
291  sensorToLocalTime(static_cast<double>(metaP->timeSeconds) +
292  1e-6 * static_cast<double>(metaP->timeMicroSeconds),
293  header.timeSeconds, header.timeMicroSeconds);
294 
295  header.source = sourceWireToApi(image.source);
296  header.bitsPerPixel = image.bitsPerPixel;
297  header.width = image.width;
298  header.height = image.height;
299  header.frameId = image.frameId;
300  header.exposure = metaP->exposureTime;
301  header.gain = metaP->gain;
302  header.framesPerSecond = metaP->framesPerSecond;
303  header.imageDataP = image.dataP;
304  header.imageLength = static_cast<uint32_t>(std::ceil(((double) image.bitsPerPixel / 8.0) * image.width * image.height));
305 
306  dispatchImage(buffer, header);
307 
308  break;
309  }
311  {
312  wire::Disparity image(stream, version);
313 
314  const wire::ImageMeta *metaP = m_imageMetaCache.find(image.frameId);
315  if (NULL == metaP)
316  break;
317  //CRL_EXCEPTION("no meta cached for frameId %d", image.frameId);
318 
320 
321  if (false == m_networkTimeSyncEnabled) {
322 
323  header.timeSeconds = metaP->timeSeconds;
324  header.timeMicroSeconds = metaP->timeMicroSeconds;
325 
326  } else
327  sensorToLocalTime(static_cast<double>(metaP->timeSeconds) +
328  1e-6 * static_cast<double>(metaP->timeMicroSeconds),
329  header.timeSeconds, header.timeMicroSeconds);
330 
331  header.source = Source_Disparity;
333  header.width = image.width;
334  header.height = image.height;
335  header.frameId = image.frameId;
336  header.exposure = metaP->exposureTime;
337  header.gain = metaP->gain;
338  header.framesPerSecond = metaP->framesPerSecond;
339  header.imageDataP = image.dataP;
340  header.imageLength = static_cast<uint32_t>(std::ceil(((double) wire::Disparity::API_BITS_PER_PIXEL / 8.0) * image.width * image.height));
341 
342  dispatchImage(buffer, header);
343 
344  break;
345  }
346  case MSG_ID(wire::SysPps::ID):
347  {
348  wire::SysPps pps(stream, version);
349 
351 
352  header.sensorTime = pps.ppsNanoSeconds;
353 
354  sensorToLocalTime(static_cast<double>(pps.ppsNanoSeconds) / 1e9,
355  header.timeSeconds, header.timeMicroSeconds);
356 
357  dispatchPps(header);
358 
359  break;
360  }
362  {
363  wire::ImuData imu(stream, version);
364 
366 
367  header.sequence = imu.sequence;
368  header.samples.resize(imu.samples.size());
369 
370  for(uint32_t i=0; i<imu.samples.size(); i++) {
371 
372  const wire::ImuSample& w = imu.samples[i];
373  imu::Sample& a = header.samples[i];
374 
375  if (false == m_networkTimeSyncEnabled) {
376 
377  const int64_t oneBillion = static_cast<int64_t>(1e9);
378 
379  a.timeSeconds = static_cast<uint32_t>(w.timeNanoSeconds / oneBillion);
380  a.timeMicroSeconds = static_cast<uint32_t>((w.timeNanoSeconds % oneBillion) /
381  static_cast<int64_t>(1000));
382 
383  } else
384  sensorToLocalTime(static_cast<double>(w.timeNanoSeconds) / 1e9,
386 
387  switch(w.type) {
391  default: CRL_EXCEPTION("unknown wire IMU type: %d", w.type);
392  }
393 
394  a.x = w.x; a.y = w.y; a.z = w.z;
395  }
396 
397  dispatchImu(header);
398 
399  break;
400  }
401  case MSG_ID(wire::Ack::ID):
402  break; // handle below
404  m_messages.store(wire::CamConfig(stream, version));
405  break;
407  m_messages.store(wire::CamHistory(stream, version));
408  break;
410  m_messages.store(wire::LedStatus(stream, version));
411  break;
413  m_messages.store(wire::LedSensorStatus(stream, version));
414  break;
416  m_messages.store(wire::SysFlashResponse(stream, version));
417  break;
419  m_messages.store(wire::SysDeviceInfo(stream, version));
420  break;
422  m_messages.store(wire::SysCameraCalibration(stream, version));
423  break;
425  m_messages.store(wire::SysSensorCalibration(stream, version));
426  break;
428  m_messages.store(wire::SysTransmitDelay(stream, version));
429  break;
431  m_messages.store(wire::SysLidarCalibration(stream, version));
432  break;
433  case MSG_ID(wire::SysMtu::ID):
434  m_messages.store(wire::SysMtu(stream, version));
435  break;
437  m_messages.store(wire::SysNetwork(stream, version));
438  break;
440  m_messages.store(wire::SysDeviceModes(stream, version));
441  break;
443  m_messages.store(wire::VersionResponse(stream, version));
444  break;
446  m_messages.store(wire::StatusResponse(stream, version));
447  break;
449  m_messages.store(wire::ImuConfig(stream, version));
450  break;
452  m_messages.store(wire::ImuInfo(stream, version));
453  break;
455  m_messages.store(wire::SysTestMtuResponse(stream, version));
456  break;
458  m_messages.store(wire::SysDirectedStreams(stream, version));
459  break;
462  break;
463  default:
464 
465  CRL_DEBUG("unknown message received: id=%d, version=%d\n",
466  id, version);
467  return;
468  }
469 
470  //
471  // Signal any listeners (_after_ the message is stored/dispatched)
472  //
473  // A [n]ack is a special case where we signal
474  // the returned status code of the ack'd command,
475  // otherwise we signal valid reception of this message.
476 
477  switch(id) {
478  case MSG_ID(wire::Ack::ID):
479  m_watch.signal(wire::Ack(stream, version));
480  break;
481  default:
482  m_watch.signal(id);
483  break;
484  }
485 }
486 
487 //
488 // Get a UDP assembler for this message type. We are given
489 // the first UDP packet in the stream
490 
491 impl::UdpAssembler impl::getUdpAssembler(const uint8_t *firstDatagramP,
492  uint32_t length)
493 {
494  //
495  // Get the message type, it is stored after wire::Header
496 
497  utility::BufferStreamReader stream(firstDatagramP, length);
498  stream.seek(sizeof(wire::Header));
499 
500  wire::IdType messageType;
501  stream & messageType;
502 
503  //
504  // See if a custom handler has been registered
505 
506  UdpAssemblerMap::const_iterator it = m_udpAssemblerMap.find(messageType);
507 
508  if (m_udpAssemblerMap.end() != it)
509  return it->second;
510  else
511  return defaultUdpAssembler;
512 }
513 
514 //
515 // Find a suitably sized buffer for the incoming message
516 
518 {
519  BufferPool *bP = NULL;
520 
521  if (messageLength <= RX_POOL_SMALL_BUFFER_SIZE)
522  bP = &(m_rxSmallBufferPool);
523  else if (messageLength <= RX_POOL_LARGE_BUFFER_SIZE)
524  bP = &(m_rxLargeBufferPool);
525  else
526  CRL_EXCEPTION("message too large: %d bytes", messageLength);
527 
528  //
529  // TODO: re-think the shared() mechanism of the buffers, so we do not
530  // have to walk this vector.
531 
532  BufferPool::const_iterator it = bP->begin();
533  for(; it != bP->end(); it++)
534  if (false == (*it)->shared())
535  return *(*it);
536 
537  CRL_EXCEPTION("no free RX buffers (%d in use by consumers)\n", bP->size());
538 }
539 
540 //
541 // Unwrap a 16-bit wire sequence ID into a unique 64-bit local ID
542 
543 const int64_t& impl::unwrapSequenceId(uint16_t wireId)
544 {
545  //
546  // Look for a sequence change
547 
548  if (wireId != m_lastRxSeqId) {
549 
550  const uint16_t ID_MAX = std::numeric_limits<uint16_t>::max();
551  const uint16_t ID_CENTER = ID_MAX / 2;
552 
553  //
554  // Seed
555 
556  if (-1 == m_lastRxSeqId)
558 
559  //
560  // Detect forward 16-bit wrap
561 
562  else if (wireId < ID_CENTER &&
563  m_lastRxSeqId > ID_CENTER) {
564 
565  m_unWrappedRxSeqId += 1 + (ID_MAX - m_lastRxSeqId) + wireId;
566 
567  //
568  // Normal case
569 
570  } else
571  m_unWrappedRxSeqId += wireId - m_lastRxSeqId;
572 
573  //
574  // Remember change
575 
576  m_lastRxSeqId = wireId;
577  }
578 
579  return m_unWrappedRxSeqId;
580 }
581 
582 //
583 // Handles any incoming packets
584 
586 {
588 
589  for(;;) {
590 
591  //
592  // Receive the packet
593 
594  const int bytesRead = recvfrom(m_serverSocket,
595  (char*)m_incomingBuffer.data(),
596  m_incomingBuffer.size(),
597  0, NULL, NULL);
598  //
599  // Nothing left to read
600 
601  if (bytesRead < 0)
602  break;
603 
604  //
605  // Check for undersized packets
606 
607  else if (bytesRead < (int)sizeof(wire::Header))
608  CRL_EXCEPTION("undersized packet: %d/%d bytes\n",
609  bytesRead, sizeof(wire::Header));
610 
611  //
612  // For convenience below
613 
614  const uint8_t *inP = reinterpret_cast<const uint8_t*>(m_incomingBuffer.data());
615 
616  //
617  // Validate the header
618 
619  const wire::Header& header = *(reinterpret_cast<const wire::Header*>(inP));
620 
621  if (wire::HEADER_MAGIC != header.magic)
622  CRL_EXCEPTION("bad protocol magic: 0x%x, expecting 0x%x",
623  header.magic, wire::HEADER_MAGIC);
624  else if (wire::HEADER_VERSION != header.version)
625  CRL_EXCEPTION("bad protocol version: 0x%x, expecting 0x%x",
626  header.version, wire::HEADER_VERSION);
627  else if (wire::HEADER_GROUP != header.group)
628  CRL_EXCEPTION("bad protocol group: 0x%x, expecting 0x%x",
629  header.group, wire::HEADER_GROUP);
630 
631  //
632  // Unwrap the sequence identifier
633 
634  const int64_t& sequence = unwrapSequenceId(header.sequenceIdentifier);
635 
636  //
637  // See if we are already tracking this messge ID
638 
639  UdpTracker *trP = m_udpTrackerCache.find(sequence);
640  if (NULL == trP) {
641 
642  //
643  // If we drop first packet, we will drop entire message. Currently we
644  // require the first datagram in order to assign an assembler.
645  // TODO: re-think this.
646 
647  if (0 != header.byteOffset)
648  continue;
649  else {
650 
651  //
652  // Create a new tracker for this sequence id.
653 
654  trP = new UdpTracker(header.messageLength,
655  getUdpAssembler(inP, bytesRead),
656  findFreeBuffer(header.messageLength));
657  }
658  }
659 
660  //
661  // Assemble the datagram into the message stream, returns true if the
662  // assembly is complete.
663 
664  if (true == trP->assemble(bytesRead - sizeof(wire::Header),
665  header.byteOffset,
666  &(inP[sizeof(wire::Header)]))) {
667 
668  //
669  // Dispatch to any listeners
670 
671  dispatch(trP->stream());
672 
673  //
674  // Release the tracker
675 
676  if (1 == trP->packets())
677  delete trP; // has not yet been cached
678  else
679  m_udpTrackerCache.remove(sequence);
680 
681  } else if (1 == trP->packets()) {
682 
683  //
684  // Cache the tracker, as more UDP packets are
685  // forthcoming for this message.
686 
687  m_udpTrackerCache.insert(sequence, trP);
688  }
689  }
690 }
691 
692 //
693 // This thread waits for UDP packets
694 
695 #if WIN32
696 DWORD impl::rxThread(void *userDataP)
697 #else
698 void *impl::rxThread(void *userDataP)
699 #endif
700 {
701  impl *selfP = reinterpret_cast<impl*>(userDataP);
702  const int server = selfP->m_serverSocket;
703  fd_set readSet;
704 
705  //
706  // Loop until shutdown
707 
708  while(selfP->m_threadsRunning) {
709 
710  //
711  // Add the server socket to the read set.
712 
713  FD_ZERO(&readSet);
714  FD_SET(server, &readSet);
715 
716  //
717  // Wait for a new packet to arrive, timing out every once in awhile
718 
719  struct timeval tv = {0, 200000}; // 5Hz
720  const int result = select(server+1, &readSet, NULL, NULL, &tv);
721  if (result <= 0)
722  continue;
723 
724  //
725  // Let the comm object handle decoding
726 
727  try {
728 
729  selfP->handle();
730 
731  } catch (const std::exception& e) {
732 
733  CRL_DEBUG("exception while decoding packet: %s\n", e.what());
734 
735  } catch ( ... ) {
736 
737  CRL_DEBUG("unknown exception while decoding packet\n");
738  }
739  }
740 
741  return NULL;
742 }
743 
744 }}}; // namespaces
std::list< ImageListener * > m_imageListeners
Definition: channel.hh:363
#define CRL_EXCEPTION(fmt,...)
Definition: Exception.hh:71
static CRL_CONSTEXPR Type Type_Accelerometer
static CRL_CONSTEXPR IdType ID
Definition: SysMtuMessage.h:50
static CRL_CONSTEXPR IdType ID
Definition: AckMessage.h:49
double sensorToLocalTime(const double &sensorTime)
Definition: channel.cc:481
static CRL_CONSTEXPR DataSource Source_Disparity
bool assemble(uint32_t bytes, uint32_t offset, const uint8_t *dataP)
Definition: channel.hh:253
std::vector< Sample > samples
UdpAssemblerMap m_udpAssemblerMap
Definition: channel.hh:332
void insert(KEY key, DATA *data)
Definition: storage.hh:165
std::list< PpsListener * > m_ppsListeners
Definition: channel.hh:365
std_msgs::Header * header(M &m)
DepthCache< int64_t, wire::ImageMeta > m_imageMetaCache
Definition: channel.hh:325
static CRL_CONSTEXPR Type Type_Magnetometer
void dispatch(utility::BufferStreamWriter &buffer)
Definition: dispatch.cc:163
DepthCache< int64_t, UdpTracker > m_udpTrackerCache
Definition: channel.hh:312
static CRL_CONSTEXPR IdType ID
UdpAssembler getUdpAssembler(const uint8_t *udpDatagramP, uint32_t length)
Definition: dispatch.cc:491
static CRL_CONSTEXPR uint16_t HEADER_VERSION
Definition: Protocol.h:67
std::vector< uint8_t > m_incomingBuffer
Definition: channel.hh:300
#define MSG_ID(x)
Definition: Protocol.h:248
static CRL_CONSTEXPR uint32_t RX_POOL_SMALL_BUFFER_SIZE
Definition: channel.hh:203
void signal(wire::IdType id, Status status=Status_Ok)
Definition: signal.hh:57
static CRL_CONSTEXPR uint8_t API_BITS_PER_PIXEL
void(* UdpAssembler)(utility::BufferStreamWriter &stream, const uint8_t *dataP, uint32_t offset, uint32_t length)
Definition: channel.hh:186
utility::Mutex m_dispatchLock
Definition: channel.hh:337
const IntensityType * intensitiesP
static CRL_CONSTEXPR IdType ID
static CRL_CONSTEXPR uint16_t TYPE_GYRO
Definition: channel.cc:56
static CRL_CONSTEXPR uint16_t TYPE_ACCEL
utility::BufferStreamWriter & stream()
Definition: channel.hh:250
const int64_t & unwrapSequenceId(uint16_t id)
Definition: dispatch.cc:543
utility::BufferStreamWriter & findFreeBuffer(uint32_t messageLength)
Definition: dispatch.cc:517
static CRL_CONSTEXPR uint16_t HEADER_MAGIC
Definition: Protocol.h:66
void dispatchImu(imu::Header &header)
Definition: dispatch.cc:148
void dispatchPps(pps::Header &header)
Definition: dispatch.cc:133
Type degreesToRadians(Type const &value)
Definition: Units.hh:55
#define CRL_DEBUG(fmt,...)
Definition: Exception.hh:77
static CRL_CONSTEXPR uint32_t RX_POOL_LARGE_BUFFER_SIZE
Definition: channel.hh:201
std::list< LidarListener * > m_lidarListeners
Definition: channel.hh:364
static CRL_CONSTEXPR uint16_t HEADER_GROUP
Definition: Protocol.h:72
static void * rxThread(void *userDataP)
Definition: dispatch.cc:698
void dispatchLidar(utility::BufferStream &buffer, lidar::Header &header)
Definition: dispatch.cc:118
std::list< ImuListener * > m_imuListeners
Definition: channel.hh:366
std::vector< utility::BufferStreamWriter * > BufferPool
Definition: channel.hh:317
static CRL_CONSTEXPR uint16_t TYPE_MAG
static CRL_CONSTEXPR IdType ID
Definition: SysPpsMessage.h:49
static CRL_CONSTEXPR Type Type_Gyroscope
static DataSource sourceWireToApi(wire::SourceType mask)
Definition: channel.cc:379
void dispatchImage(utility::BufferStream &buffer, image::Header &header)
Definition: dispatch.cc:102


multisense_lib
Author(s):
autogenerated on Sat Apr 6 2019 02:16:46