dispatch.cc
Go to the documentation of this file.
1 
37 #include "details/channel.hh"
38 
40 
43 
49 
51 
53 
55 
57 
59 
70 
72 
76 
79 
80 #include <limits>
81 
82 namespace crl {
83 namespace multisense {
84 namespace details {
85 namespace {
86 
87 //
88 // Default UDP assembler
89 
90 void defaultUdpAssembler(utility::BufferStreamWriter& stream,
91  const uint8_t *dataP,
92  uint32_t offset,
93  uint32_t length)
94 {
95  stream.seek(offset);
96  stream.write(dataP, length);
97 }
98 
99 } // anonymous
100 
101 //
102 // Publish an image
103 
105  image::Header& header)
106 {
108 
109  std::list<ImageListener*>::const_iterator it;
110 
111  for(it = m_imageListeners.begin();
112  it != m_imageListeners.end();
113  it ++)
114  (*it)->dispatch(buffer, header);
115 }
116 
117 //
118 // Publish a laser scan
119 
121  lidar::Header& header)
122 {
124 
125  std::list<LidarListener*>::const_iterator it;
126 
127  for(it = m_lidarListeners.begin();
128  it != m_lidarListeners.end();
129  it ++)
130  (*it)->dispatch(buffer, header);
131 }
132 //
133 // Publish a PPS event
134 
136 {
138 
139  std::list<PpsListener*>::const_iterator it;
140 
141  for(it = m_ppsListeners.begin();
142  it != m_ppsListeners.end();
143  it ++)
144  (*it)->dispatch(header);
145 }
146 
147 //
148 // Publish an IMU event
149 
151 {
153 
154  std::list<ImuListener*>::const_iterator it;
155 
156  for(it = m_imuListeners.begin();
157  it != m_imuListeners.end();
158  it ++)
159  (*it)->dispatch(header);
160 }
161 
162 //
163 // Dispatch incoming messages
164 
166 {
167  utility::BufferStreamReader stream(buffer);
168 
169  //
170  // Extract the type and version fields, which are stored
171  // first in the stream
172 
173  wire::IdType id;
174  wire::VersionType version;
175 
176  stream & id;
177  stream & version;
178 
179  //
180  // Handle the message.
181  //
182  // Simple, low-rate messages are dynamically stored
183  // off for possible presentation to the user in a signal
184  // handler.
185  //
186  // Larger data types use a threaded dispatch
187  // mechanism with a reference-counted buffer back-end.
188 
189  switch(id) {
191  {
192  wire::LidarData scan(stream, version);
194 
195  const int32_t scanArc = static_cast<int32_t> (utility::degreesToRadians(270.0) * 1e6); // microradians
196  const uint32_t maxRange = static_cast<uint32_t> (30.0 * 1e3); // mm
197 
198  if (false == m_networkTimeSyncEnabled) {
199 
200  header.timeStartSeconds = scan.timeStartSeconds;
202  header.timeEndSeconds = scan.timeEndSeconds;
204 
205  } else {
206 
207  sensorToLocalTime(static_cast<double>(scan.timeStartSeconds) +
208  1e-6 * static_cast<double>(scan.timeStartMicroSeconds),
209  header.timeStartSeconds, header.timeStartMicroSeconds);
210 
211  sensorToLocalTime(static_cast<double>(scan.timeEndSeconds) +
212  1e-6 * static_cast<double>(scan.timeEndMicroSeconds),
213  header.timeEndSeconds, header.timeEndMicroSeconds);
214  }
215 
216  header.scanId = scan.scanCount;
217  header.spindleAngleStart = scan.angleStart;
218  header.spindleAngleEnd = scan.angleEnd;
219  header.scanArc = scanArc;
220  header.maxRange = maxRange;
221  header.pointCount = scan.points;
222  header.rangesP = scan.distanceP;
223  header.intensitiesP = scan.intensityP;
224 
225  dispatchLidar(buffer, header);
226 
227  break;
228  }
230  {
231  wire::ImageMeta *metaP = new (std::nothrow) wire::ImageMeta(stream, version);
232 
233  if (NULL == metaP)
234  CRL_EXCEPTION_RAW("unable to allocate metadata");
235 
236  m_imageMetaCache.insert(metaP->frameId, metaP); // destroys oldest
237 
238  break;
239  }
241  {
242  wire::JpegImage image(stream, version);
243 
244  const wire::ImageMeta *metaP = m_imageMetaCache.find(image.frameId);
245  if (NULL == metaP)
246  break;
247  //CRL_EXCEPTION("no meta cached for frameId %d", image.frameId);
248 
250 
251  getImageTime(metaP, header.timeSeconds, header.timeMicroSeconds);
252 
253  header.source = sourceWireToApi(image.source);
254  header.bitsPerPixel = 0;
255  header.width = image.width;
256  header.height = image.height;
257  header.frameId = image.frameId;
258  header.exposure = metaP->exposureTime;
259  header.gain = metaP->gain;
260  header.framesPerSecond = metaP->framesPerSecond;
261  header.imageDataP = image.dataP;
262  header.imageLength = image.length;
263 
264  dispatchImage(buffer, header);
265 
266  break;
267  }
268  case MSG_ID(wire::Image::ID):
269  {
270  wire::Image image(stream, version);
271 
272  const wire::ImageMeta *metaP = m_imageMetaCache.find(image.frameId);
273  if (NULL == metaP)
274  break;
275  //CRL_EXCEPTION("no meta cached for frameId %d", image.frameId);
276 
278 
279  getImageTime(metaP, header.timeSeconds, header.timeMicroSeconds);
280 
281  header.source = sourceWireToApi(image.source);
282  header.bitsPerPixel = image.bitsPerPixel;
283  header.width = image.width;
284  header.height = image.height;
285  header.frameId = image.frameId;
286  header.exposure = metaP->exposureTime;
287  header.gain = metaP->gain;
288  header.framesPerSecond = metaP->framesPerSecond;
289  header.imageDataP = image.dataP;
290  header.imageLength = static_cast<uint32_t>(std::ceil(((double) image.bitsPerPixel / 8.0) * image.width * image.height));
291 
292  dispatchImage(buffer, header);
293 
294  break;
295  }
297  {
298  wire::Disparity image(stream, version);
299 
300  const wire::ImageMeta *metaP = m_imageMetaCache.find(image.frameId);
301  if (NULL == metaP)
302  break;
303  //CRL_EXCEPTION("no meta cached for frameId %d", image.frameId);
304 
306 
307  getImageTime(metaP, header.timeSeconds, header.timeMicroSeconds);
308 
309  header.source = Source_Disparity;
311  header.width = image.width;
312  header.height = image.height;
313  header.frameId = image.frameId;
314  header.exposure = metaP->exposureTime;
315  header.gain = metaP->gain;
316  header.framesPerSecond = metaP->framesPerSecond;
317  header.imageDataP = image.dataP;
318  header.imageLength = static_cast<uint32_t>(std::ceil(((double) wire::Disparity::API_BITS_PER_PIXEL / 8.0) * image.width * image.height));
319 
320  dispatchImage(buffer, header);
321 
322  break;
323  }
324  case MSG_ID(wire::SysPps::ID):
325  {
326  wire::SysPps pps(stream, version);
327 
329 
330  header.sensorTime = pps.ppsNanoSeconds;
331 
332  sensorToLocalTime(static_cast<double>(pps.ppsNanoSeconds) / 1e9,
333  header.timeSeconds, header.timeMicroSeconds);
334 
335  dispatchPps(header);
336 
337  break;
338  }
340  {
341  wire::ImuData imu(stream, version);
342 
344 
345  header.sequence = imu.sequence;
346  header.samples.resize(imu.samples.size());
347 
348  for(uint32_t i=0; i<imu.samples.size(); i++) {
349 
350  const wire::ImuSample& w = imu.samples[i];
351  imu::Sample& a = header.samples[i];
352 
354  {
356  }
357  else {
358  if (false == m_networkTimeSyncEnabled) {
359 
361 
362  } else
363  sensorToLocalTime(static_cast<double>(w.timeNanoSeconds) / 1e9,
365  }
366 
367  switch(w.type) {
371  default: CRL_EXCEPTION("unknown wire IMU type: %d", w.type);
372  }
373 
374  a.x = w.x; a.y = w.y; a.z = w.z;
375  }
376 
377  dispatchImu(header);
378 
379  break;
380  }
381  case MSG_ID(wire::Ack::ID):
382  break; // handle below
384  m_messages.store(wire::CamConfig(stream, version));
385  break;
387  m_messages.store(wire::CamHistory(stream, version));
388  break;
390  m_messages.store(wire::LedStatus(stream, version));
391  break;
393  m_messages.store(wire::LedSensorStatus(stream, version));
394  break;
396  m_messages.store(wire::MotorPoll(stream, version));
397  break;
399  m_messages.store(wire::SysFlashResponse(stream, version));
400  break;
402  m_messages.store(wire::SysDeviceInfo(stream, version));
403  break;
405  m_messages.store(wire::SysCameraCalibration(stream, version));
406  break;
408  m_messages.store(wire::SysSensorCalibration(stream, version));
409  break;
411  m_messages.store(wire::SysTransmitDelay(stream, version));
412  break;
414  m_messages.store(wire::SysLidarCalibration(stream, version));
415  break;
416  case MSG_ID(wire::SysMtu::ID):
417  m_messages.store(wire::SysMtu(stream, version));
418  break;
420  m_messages.store(wire::SysNetwork(stream, version));
421  break;
423  m_messages.store(wire::SysDeviceModes(stream, version));
424  break;
426  m_messages.store(wire::VersionResponse(stream, version));
427  break;
429  m_messages.store(wire::StatusResponse(stream, version));
430  break;
432  m_messages.store(wire::ImuConfig(stream, version));
433  break;
435  m_messages.store(wire::ImuInfo(stream, version));
436  break;
438  m_messages.store(wire::SysTestMtuResponse(stream, version));
439  break;
441  m_messages.store(wire::SysDirectedStreams(stream, version));
442  break;
445  break;
446  default:
447 
448  CRL_DEBUG("unknown message received: id=%d, version=%d\n",
449  id, version);
450  return;
451  }
452 
453  //
454  // Signal any listeners (_after_ the message is stored/dispatched)
455  //
456  // A [n]ack is a special case where we signal
457  // the returned status code of the ack'd command,
458  // otherwise we signal valid reception of this message.
459 
460  switch(id) {
461  case MSG_ID(wire::Ack::ID):
462  m_watch.signal(wire::Ack(stream, version));
463  break;
464  default:
465  m_watch.signal(id);
466  break;
467  }
468 }
469 
470 //
471 // Get a UDP assembler for this message type. We are given
472 // the first UDP packet in the stream
473 
474 impl::UdpAssembler impl::getUdpAssembler(const uint8_t *firstDatagramP,
475  uint32_t length)
476 {
477  //
478  // Get the message type, it is stored after wire::Header
479 
480  utility::BufferStreamReader stream(firstDatagramP, length);
481  stream.seek(sizeof(wire::Header));
482 
483  wire::IdType messageType;
484  stream & messageType;
485 
486  //
487  // See if a custom handler has been registered
488 
489  UdpAssemblerMap::const_iterator it = m_udpAssemblerMap.find(messageType);
490 
491  if (m_udpAssemblerMap.end() != it)
492  return it->second;
493  else
494  return defaultUdpAssembler;
495 }
496 
497 //
498 // Find a suitably sized buffer for the incoming message
499 
501 {
502  BufferPool *bP = NULL;
503 
504  if (messageLength <= RX_POOL_SMALL_BUFFER_SIZE)
505  bP = &(m_rxSmallBufferPool);
506  else if (messageLength <= RX_POOL_LARGE_BUFFER_SIZE)
507  bP = &(m_rxLargeBufferPool);
508  else
509  CRL_EXCEPTION("message too large: %d bytes", messageLength);
510 
511  //
512  // TODO: re-think the shared() mechanism of the buffers, so we do not
513  // have to walk this vector.
514 
515  BufferPool::const_iterator it = bP->begin();
516  for(; it != bP->end(); it++)
517  if (false == (*it)->shared())
518  return *(*it);
519 
520  CRL_EXCEPTION("no free RX buffers (%d in use by consumers)\n", bP->size());
521 }
522 
523 //
524 // Unwrap a 16-bit wire sequence ID into a unique 64-bit local ID
525 
526 const int64_t& impl::unwrapSequenceId(uint16_t wireId)
527 {
528  //
529  // Look for a sequence change
530 
531  if (wireId != m_lastRxSeqId) {
532 
533  const uint16_t ID_MAX = std::numeric_limits<uint16_t>::max();
534  const uint16_t ID_CENTER = ID_MAX / 2;
535 
536  //
537  // Seed
538 
539  if (-1 == m_lastRxSeqId)
541 
542  //
543  // Detect forward 16-bit wrap
544 
545  else if (wireId < ID_CENTER &&
546  m_lastRxSeqId > ID_CENTER) {
547 
548  m_unWrappedRxSeqId += 1 + (ID_MAX - m_lastRxSeqId) + wireId;
549 
550  //
551  // Normal case
552 
553  } else
554  m_unWrappedRxSeqId += wireId - m_lastRxSeqId;
555 
556  //
557  // Remember change
558 
559  m_lastRxSeqId = wireId;
560  }
561 
562  return m_unWrappedRxSeqId;
563 }
564 
565 //
566 // Handles any incoming packets
567 
569 {
571 
572  for(;;) {
573 
574  //
575  // Receive the packet
576 
577 // disable MSVC warning for narrowing conversion.
578 #ifdef WIN32
579 #pragma warning (push)
580 #pragma warning (disable : 4267)
581 #endif
582  const int bytesRead = recvfrom(m_serverSocket,
583  (char*)m_incomingBuffer.data(),
584  m_incomingBuffer.size(),
585  0, NULL, NULL);
586 #ifdef WIN32
587 #pragma warning (pop)
588 #endif
589 
590  //
591  // Nothing left to read
592 
593  if (bytesRead < 0)
594  break;
595 
596  //
597  // Check for undersized packets
598 
599  else if (bytesRead < (int)sizeof(wire::Header))
600  CRL_EXCEPTION("undersized packet: %d/%d bytes\n",
601  bytesRead, sizeof(wire::Header));
602 
603  //
604  // For convenience below
605 
606  const uint8_t *inP = reinterpret_cast<const uint8_t*>(m_incomingBuffer.data());
607 
608  //
609  // Validate the header
610 
611  const wire::Header& header = *(reinterpret_cast<const wire::Header*>(inP));
612 
613  if (wire::HEADER_MAGIC != header.magic)
614  CRL_EXCEPTION("bad protocol magic: 0x%x, expecting 0x%x",
615  header.magic, wire::HEADER_MAGIC);
616  else if (wire::HEADER_VERSION != header.version)
617  CRL_EXCEPTION("bad protocol version: 0x%x, expecting 0x%x",
618  header.version, wire::HEADER_VERSION);
619  else if (wire::HEADER_GROUP != header.group)
620  CRL_EXCEPTION("bad protocol group: 0x%x, expecting 0x%x",
621  header.group, wire::HEADER_GROUP);
622 
623  //
624  // Unwrap the sequence identifier
625 
626  const int64_t& sequence = unwrapSequenceId(header.sequenceIdentifier);
627 
628  //
629  // See if we are already tracking this messge ID
630 
631  UdpTracker *trP = m_udpTrackerCache.find(sequence);
632  if (NULL == trP) {
633 
634  //
635  // If we drop first packet, we will drop entire message. Currently we
636  // require the first datagram in order to assign an assembler.
637  // TODO: re-think this.
638 
639  if (0 != header.byteOffset)
640  continue;
641  else {
642 
643  //
644  // Create a new tracker for this sequence id.
645 
646  trP = new UdpTracker(header.messageLength,
647  getUdpAssembler(inP, bytesRead),
648  findFreeBuffer(header.messageLength));
649  }
650  }
651 
652  //
653  // Assemble the datagram into the message stream, returns true if the
654  // assembly is complete.
655 
656  if (true == trP->assemble(bytesRead - sizeof(wire::Header),
657  header.byteOffset,
658  &(inP[sizeof(wire::Header)]))) {
659 
660  //
661  // Dispatch to any listeners
662 
663  dispatch(trP->stream());
664 
665  //
666  // Release the tracker
667 
668  if (1 == trP->packets())
669  delete trP; // has not yet been cached
670  else
671  m_udpTrackerCache.remove(sequence);
672 
673  } else if (1 == trP->packets()) {
674 
675  //
676  // Cache the tracker, as more UDP packets are
677  // forthcoming for this message.
678 
679  m_udpTrackerCache.insert(sequence, trP);
680  }
681  }
682 }
683 
684 //
685 // This thread waits for UDP packets
686 
687 #if WIN32
688 DWORD impl::rxThread(void *userDataP)
689 #else
690 void *impl::rxThread(void *userDataP)
691 #endif
692 {
693  impl *selfP = reinterpret_cast<impl*>(userDataP);
694  const impl::socket_t server = selfP->m_serverSocket;
695  fd_set readSet;
696 
697  //
698  // Loop until shutdown
699 
700  while(selfP->m_threadsRunning) {
701 
702  //
703  // Add the server socket to the read set.
704 
705  FD_ZERO(&readSet);
706  FD_SET(server, &readSet);
707 
708  //
709  // Wait for a new packet to arrive, timing out every once in awhile
710 
711  struct timeval tv = {0, 200000}; // 5Hz
712 #ifdef WIN32
713  // Windows is "special" and doesn't have the same call semantics as posix
714  const int result = select (1, &readSet, NULL, NULL, &tv);
715 #else
716  const int result = select (server + 1, &readSet, NULL, NULL, &tv);
717 #endif
718  if (result <= 0)
719  continue;
720 
721  //
722  // Let the comm object handle decoding
723 
724  try {
725 
726  selfP->handle();
727 
728  } catch (const std::exception& e) {
729 
730  CRL_DEBUG("exception while decoding packet: %s\n", e.what());
731 
732  } catch ( ... ) {
733 
734  CRL_DEBUG_RAW("unknown exception while decoding packet\n");
735  }
736  }
737 
738  return NULL;
739 }
740 
741 }}} // namespaces
std::list< ImageListener * > m_imageListeners
Definition: channel.hh:427
#define CRL_EXCEPTION(fmt,...)
Definition: Exception.hh:71
static CRL_CONSTEXPR Type Type_Accelerometer
#define CRL_DEBUG_RAW(fmt)
Definition: Exception.hh:90
static CRL_CONSTEXPR IdType ID
Definition: SysMtuMessage.h:50
static CRL_CONSTEXPR IdType ID
Definition: AckMessage.h:49
double sensorToLocalTime(const double &sensorTime)
Definition: channel.cc:521
static CRL_CONSTEXPR DataSource Source_Disparity
bool assemble(uint32_t bytes, uint32_t offset, const uint8_t *dataP)
Definition: channel.hh:317
std::vector< Sample > samples
UdpAssemblerMap m_udpAssemblerMap
Definition: channel.hh:396
void insert(KEY key, DATA *data)
Definition: storage.hh:165
std::list< PpsListener * > m_ppsListeners
Definition: channel.hh:429
std_msgs::Header * header(M &m)
DepthCache< int64_t, wire::ImageMeta > m_imageMetaCache
Definition: channel.hh:389
static CRL_CONSTEXPR Type Type_Magnetometer
void dispatch(utility::BufferStreamWriter &buffer)
Definition: dispatch.cc:165
DepthCache< int64_t, UdpTracker > m_udpTrackerCache
Definition: channel.hh:376
static CRL_CONSTEXPR IdType ID
UdpAssembler getUdpAssembler(const uint8_t *udpDatagramP, uint32_t length)
Definition: dispatch.cc:474
static CRL_CONSTEXPR uint16_t HEADER_VERSION
Definition: Protocol.h:76
void getImageTime(const WireT *wire, uint32_t &seconds, uint32_t &microSeconds)
Definition: channel.hh:227
std::vector< uint8_t > m_incomingBuffer
Definition: channel.hh:364
#define MSG_ID(x)
Definition: Protocol.h:273
static CRL_CONSTEXPR uint32_t RX_POOL_SMALL_BUFFER_SIZE
Definition: channel.hh:267
void signal(wire::IdType id, Status status=Status_Ok)
Definition: signal.hh:57
static CRL_CONSTEXPR uint8_t API_BITS_PER_PIXEL
void(* UdpAssembler)(utility::BufferStreamWriter &stream, const uint8_t *dataP, uint32_t offset, uint32_t length)
Definition: channel.hh:209
utility::Mutex m_dispatchLock
Definition: channel.hh:401
const IntensityType * intensitiesP
#define CRL_EXCEPTION_RAW(fmt)
Definition: Exception.hh:77
static CRL_CONSTEXPR IdType ID
static CRL_CONSTEXPR uint16_t TYPE_GYRO
Definition: channel.cc:56
static CRL_CONSTEXPR uint16_t TYPE_ACCEL
utility::BufferStreamWriter & stream()
Definition: channel.hh:314
const int64_t & unwrapSequenceId(uint16_t id)
Definition: dispatch.cc:526
utility::BufferStreamWriter & findFreeBuffer(uint32_t messageLength)
Definition: dispatch.cc:500
static CRL_CONSTEXPR uint16_t HEADER_MAGIC
Definition: Protocol.h:75
void dispatchImu(imu::Header &header)
Definition: dispatch.cc:150
void dispatchPps(pps::Header &header)
Definition: dispatch.cc:135
Type degreesToRadians(Type const &value)
Definition: Units.hh:55
#define CRL_DEBUG(fmt,...)
Definition: Exception.hh:83
static CRL_CONSTEXPR uint32_t RX_POOL_LARGE_BUFFER_SIZE
Definition: channel.hh:265
std::list< LidarListener * > m_lidarListeners
Definition: channel.hh:428
void toHeaderTime(T nanoSeconds, uint32_t &seconds, uint32_t &microSeconds) const
Definition: channel.hh:215
static CRL_CONSTEXPR uint16_t HEADER_GROUP
Definition: Protocol.h:81
static void * rxThread(void *userDataP)
Definition: dispatch.cc:690
void dispatchLidar(utility::BufferStream &buffer, lidar::Header &header)
Definition: dispatch.cc:120
std::list< ImuListener * > m_imuListeners
Definition: channel.hh:430
std::vector< utility::BufferStreamWriter * > BufferPool
Definition: channel.hh:381
static CRL_CONSTEXPR uint16_t TYPE_MAG
static CRL_CONSTEXPR IdType ID
Definition: SysPpsMessage.h:49
static CRL_CONSTEXPR Type Type_Gyroscope
static DataSource sourceWireToApi(wire::SourceType mask)
Definition: channel.cc:399
void dispatchImage(utility::BufferStream &buffer, image::Header &header)
Definition: dispatch.cc:104


multisense_lib
Author(s):
autogenerated on Sun Mar 14 2021 02:34:50