dispatch.cc
Go to the documentation of this file.
00001 
00037 #include "details/channel.hh"
00038 
00039 #include "details/wire/AckMessage.h"
00040 
00041 #include "details/wire/VersionResponseMessage.h"
00042 #include "details/wire/StatusResponseMessage.h"
00043 
00044 #include "details/wire/CamConfigMessage.h"
00045 #include "details/wire/ImageMessage.h"
00046 #include "details/wire/JpegMessage.h"
00047 #include "details/wire/ImageMetaMessage.h"
00048 #include "details/wire/DisparityMessage.h"
00049 
00050 #include "details/wire/CamHistoryMessage.h"
00051 
00052 #include "details/wire/LidarDataMessage.h"
00053 
00054 #include "details/wire/LedStatusMessage.h"
00055 
00056 #include "details/wire/SysMtuMessage.h"
00057 #include "details/wire/SysNetworkMessage.h"
00058 #include "details/wire/SysFlashResponseMessage.h"
00059 #include "details/wire/SysDeviceInfoMessage.h"
00060 #include "details/wire/SysCameraCalibrationMessage.h"
00061 #include "details/wire/SysLidarCalibrationMessage.h"
00062 #include "details/wire/SysDeviceModesMessage.h"
00063 
00064 #include "details/wire/SysPpsMessage.h"
00065 
00066 #include "details/wire/ImuDataMessage.h"
00067 #include "details/wire/ImuConfigMessage.h"
00068 #include "details/wire/ImuInfoMessage.h"
00069 
00070 #include "details/wire/SysTestMtuResponseMessage.h"
00071 #include "details/wire/SysDirectedStreamsMessage.h"
00072 
00073 #include <limits>
00074 
00075 namespace crl {
00076 namespace multisense {
00077 namespace details {
00078 namespace {
00079 
00080 //
00081 // Default UDP assembler
00082 
00083 void defaultUdpAssembler(utility::BufferStreamWriter& stream,
00084                          const uint8_t               *dataP,
00085                          uint32_t                     offset,
00086                          uint32_t                     length)
00087 {
00088     stream.seek(offset);
00089     stream.write(dataP, length);
00090 }
00091 
00092 }; // anonymous
00093 
00094 //
00095 // Publish an image 
00096 
00097 void impl::dispatchImage(utility::BufferStream& buffer,
00098                          image::Header&         header)
00099 {
00100     utility::ScopedLock lock(m_dispatchLock);
00101 
00102     std::list<ImageListener*>::const_iterator it;
00103 
00104     for(it  = m_imageListeners.begin();
00105         it != m_imageListeners.end();
00106         it ++)
00107         (*it)->dispatch(buffer, header);
00108 }
00109 
00110 //
00111 // Publish a laser scan
00112 
00113 void impl::dispatchLidar(utility::BufferStream& buffer,
00114                          lidar::Header&         header)
00115 {
00116     utility::ScopedLock lock(m_dispatchLock);
00117 
00118     std::list<LidarListener*>::const_iterator it;
00119 
00120     for(it  = m_lidarListeners.begin();
00121         it != m_lidarListeners.end();
00122         it ++)
00123         (*it)->dispatch(buffer, header);
00124 }
00125 
00126 //
00127 // Publish a PPS event
00128 
00129 void impl::dispatchPps(pps::Header& header)
00130 {
00131     utility::ScopedLock lock(m_dispatchLock);
00132 
00133     std::list<PpsListener*>::const_iterator it;
00134 
00135     for(it  = m_ppsListeners.begin();
00136         it != m_ppsListeners.end();
00137         it ++)
00138         (*it)->dispatch(header);
00139 }
00140 
00141 //
00142 // Publish an IMU event
00143 
00144 void impl::dispatchImu(imu::Header& header)
00145 {
00146     utility::ScopedLock lock(m_dispatchLock);
00147 
00148     std::list<ImuListener*>::const_iterator it;
00149 
00150     for(it  = m_imuListeners.begin();
00151         it != m_imuListeners.end();
00152         it ++)
00153         (*it)->dispatch(header);
00154 }
00155 
00156 //
00157 // Dispatch incoming messages
00158 
00159 void impl::dispatch(utility::BufferStreamWriter& buffer)
00160 {
00161     utility::BufferStreamReader stream(buffer);
00162 
00163     //
00164     // Extract the type and version fields, which are stored
00165     // first in the stream
00166 
00167     wire::IdType      id;
00168     wire::VersionType version;
00169 
00170     stream & id;
00171     stream & version;
00172     
00173     //
00174     // Handle the message. 
00175     //
00176     // Simple, low-rate messages are dynamically stored 
00177     // off for possible presentation to the user in a signal 
00178     // handler. 
00179     // 
00180     // Larger data types use a threaded dispatch 
00181     // mechanism with a reference-counted buffer back-end.
00182 
00183     switch(id) {
00184     case MSG_ID(wire::LidarData::ID):
00185     {
00186         wire::LidarData scan(stream, version);
00187         lidar::Header   header;
00188 
00189         const int32_t  scanArc  = static_cast<int32_t> (utility::degreesToRadians(270.0) * 1e6); // microradians
00190         const uint32_t maxRange = static_cast<uint32_t> (30.0 * 1e3); // mm
00191 
00192         if (false == m_networkTimeSyncEnabled) {
00193 
00194             header.timeStartSeconds      = scan.timeStartSeconds;
00195             header.timeStartMicroSeconds = scan.timeStartMicroSeconds;
00196             header.timeEndSeconds        = scan.timeEndSeconds;
00197             header.timeEndMicroSeconds   = scan.timeEndMicroSeconds;
00198 
00199         } else {
00200 
00201             sensorToLocalTime(static_cast<double>(scan.timeStartSeconds) + 
00202                               1e-6 * static_cast<double>(scan.timeStartMicroSeconds),
00203                               header.timeStartSeconds, header.timeStartMicroSeconds);
00204 
00205             sensorToLocalTime(static_cast<double>(scan.timeEndSeconds) + 
00206                               1e-6 * static_cast<double>(scan.timeEndMicroSeconds),
00207                               header.timeEndSeconds, header.timeEndMicroSeconds);
00208         }
00209 
00210         header.scanId            = scan.scanCount;
00211         header.spindleAngleStart = scan.angleStart;
00212         header.spindleAngleEnd   = scan.angleEnd;
00213         header.scanArc           = scanArc;
00214         header.maxRange          = maxRange;
00215         header.pointCount        = scan.points;
00216         header.rangesP           = scan.distanceP;
00217         header.intensitiesP      = scan.intensityP;
00218 
00219         dispatchLidar(buffer, header);
00220 
00221         break;
00222     }
00223     case MSG_ID(wire::ImageMeta::ID):
00224     {
00225         wire::ImageMeta *metaP = new (std::nothrow) wire::ImageMeta(stream, version);
00226 
00227         if (NULL == metaP)
00228             CRL_EXCEPTION("unable to allocate metadata");
00229 
00230         m_imageMetaCache.insert(metaP->frameId, metaP); // destroys oldest
00231 
00232         break;
00233     }
00234     case MSG_ID(wire::JpegImage::ID):
00235     {
00236         wire::JpegImage image(stream, version);
00237 
00238         const wire::ImageMeta *metaP = m_imageMetaCache.find(image.frameId);
00239         if (NULL == metaP)
00240             break;
00241             //CRL_EXCEPTION("no meta cached for frameId %d", image.frameId);
00242 
00243         image::Header header;
00244 
00245         if (false == m_networkTimeSyncEnabled) {
00246 
00247             header.timeSeconds      = metaP->timeSeconds;
00248             header.timeMicroSeconds = metaP->timeMicroSeconds;
00249 
00250         } else
00251             sensorToLocalTime(static_cast<double>(metaP->timeSeconds) + 
00252                               1e-6 * static_cast<double>(metaP->timeMicroSeconds),
00253                               header.timeSeconds, header.timeMicroSeconds);
00254 
00255         header.source           = sourceWireToApi(image.source);
00256         header.bitsPerPixel     = 0;
00257         header.width            = image.width;
00258         header.height           = image.height;
00259         header.frameId          = image.frameId;
00260         header.exposure         = metaP->exposureTime;
00261         header.gain             = metaP->gain;
00262         header.framesPerSecond  = metaP->framesPerSecond;
00263         header.imageDataP       = image.dataP;
00264         header.imageLength      = image.length;
00265         
00266         dispatchImage(buffer, header);
00267 
00268         break;
00269     }
00270     case MSG_ID(wire::Image::ID):
00271     {
00272         wire::Image image(stream, version);
00273 
00274         const wire::ImageMeta *metaP = m_imageMetaCache.find(image.frameId);
00275         if (NULL == metaP)
00276             break;
00277             //CRL_EXCEPTION("no meta cached for frameId %d", image.frameId);
00278 
00279         image::Header header;
00280 
00281         if (false == m_networkTimeSyncEnabled) {
00282 
00283             header.timeSeconds      = metaP->timeSeconds;
00284             header.timeMicroSeconds = metaP->timeMicroSeconds;
00285 
00286         } else
00287             sensorToLocalTime(static_cast<double>(metaP->timeSeconds) + 
00288                               1e-6 * static_cast<double>(metaP->timeMicroSeconds),
00289                               header.timeSeconds, header.timeMicroSeconds);
00290 
00291         header.source           = sourceWireToApi(image.source);
00292         header.bitsPerPixel     = image.bitsPerPixel;
00293         header.width            = image.width;
00294         header.height           = image.height;
00295         header.frameId          = image.frameId;
00296         header.exposure         = metaP->exposureTime;
00297         header.gain             = metaP->gain;
00298         header.framesPerSecond  = metaP->framesPerSecond;
00299         header.imageDataP       = image.dataP;
00300         header.imageLength      = static_cast<uint32_t>(std::ceil(((double) image.bitsPerPixel / 8.0) * image.width * image.height));
00301 
00302         dispatchImage(buffer, header);
00303 
00304         break;
00305     }
00306     case MSG_ID(wire::Disparity::ID):
00307     {
00308         wire::Disparity image(stream, version);
00309 
00310         const wire::ImageMeta *metaP = m_imageMetaCache.find(image.frameId);
00311         if (NULL == metaP)
00312             break;
00313             //CRL_EXCEPTION("no meta cached for frameId %d", image.frameId);
00314         
00315         image::Header header;
00316 
00317         if (false == m_networkTimeSyncEnabled) {
00318 
00319             header.timeSeconds      = metaP->timeSeconds;
00320             header.timeMicroSeconds = metaP->timeMicroSeconds;
00321 
00322         } else
00323             sensorToLocalTime(static_cast<double>(metaP->timeSeconds) + 
00324                               1e-6 * static_cast<double>(metaP->timeMicroSeconds),
00325                               header.timeSeconds, header.timeMicroSeconds);
00326         
00327         header.source           = Source_Disparity;
00328         header.bitsPerPixel     = wire::Disparity::API_BITS_PER_PIXEL;
00329         header.width            = image.width;
00330         header.height           = image.height;
00331         header.frameId          = image.frameId;
00332         header.exposure         = metaP->exposureTime;
00333         header.gain             = metaP->gain;
00334         header.framesPerSecond  = metaP->framesPerSecond;
00335         header.imageDataP       = image.dataP;
00336 
00337         dispatchImage(buffer, header);
00338 
00339         break;
00340     }
00341     case MSG_ID(wire::SysPps::ID):
00342     {
00343         wire::SysPps pps(stream, version);
00344 
00345         pps::Header header;
00346 
00347         header.sensorTime = pps.ppsNanoSeconds;
00348 
00349         sensorToLocalTime(static_cast<double>(pps.ppsNanoSeconds) / 1e9,
00350                           header.timeSeconds, header.timeMicroSeconds);
00351 
00352         dispatchPps(header);
00353 
00354         break;
00355     }
00356     case MSG_ID(wire::ImuData::ID):
00357     {
00358         wire::ImuData imu(stream, version);
00359 
00360         imu::Header header;
00361 
00362         header.sequence = imu.sequence;
00363         header.samples.resize(imu.samples.size());
00364         
00365         for(uint32_t i=0; i<imu.samples.size(); i++) {
00366 
00367             const wire::ImuSample& w = imu.samples[i];
00368             imu::Sample&           a = header.samples[i];
00369 
00370             if (false == m_networkTimeSyncEnabled) {
00371 
00372                 const int64_t oneBillion = static_cast<int64_t>(1e9);
00373 
00374                 a.timeSeconds      = static_cast<uint32_t>(w.timeNanoSeconds / oneBillion);
00375                 a.timeMicroSeconds = static_cast<uint32_t>((w.timeNanoSeconds % oneBillion) / 
00376                                                            static_cast<int64_t>(1000));
00377 
00378             } else
00379                 sensorToLocalTime(static_cast<double>(w.timeNanoSeconds) / 1e9,
00380                                   a.timeSeconds, a.timeMicroSeconds);
00381 
00382             switch(w.type) {
00383             case wire::ImuSample::TYPE_ACCEL: a.type = imu::Sample::Type_Accelerometer; break;
00384             case wire::ImuSample::TYPE_GYRO : a.type = imu::Sample::Type_Gyroscope;     break;
00385             case wire::ImuSample::TYPE_MAG  : a.type = imu::Sample::Type_Magnetometer;  break;
00386             default: CRL_EXCEPTION("unknown wire IMU type: %d", w.type);
00387             }
00388 
00389             a.x = w.x; a.y = w.y; a.z = w.z;
00390         }
00391 
00392         dispatchImu(header);
00393 
00394         break;
00395     }
00396     case MSG_ID(wire::Ack::ID):
00397         break; // handle below
00398     case MSG_ID(wire::CamConfig::ID):
00399         m_messages.store(wire::CamConfig(stream, version));
00400         break;
00401     case MSG_ID(wire::CamHistory::ID):
00402         m_messages.store(wire::CamHistory(stream, version));
00403         break;
00404     case MSG_ID(wire::LedStatus::ID):
00405         m_messages.store(wire::LedStatus(stream, version));
00406         break;
00407     case MSG_ID(wire::SysFlashResponse::ID):
00408         m_messages.store(wire::SysFlashResponse(stream, version));
00409         break;
00410     case MSG_ID(wire::SysDeviceInfo::ID):
00411         m_messages.store(wire::SysDeviceInfo(stream, version));
00412         break;
00413     case MSG_ID(wire::SysCameraCalibration::ID):
00414         m_messages.store(wire::SysCameraCalibration(stream, version));
00415         break;
00416     case MSG_ID(wire::SysLidarCalibration::ID):
00417         m_messages.store(wire::SysLidarCalibration(stream, version));
00418         break;
00419     case MSG_ID(wire::SysMtu::ID):
00420         m_messages.store(wire::SysMtu(stream, version));
00421         break;
00422     case MSG_ID(wire::SysNetwork::ID):
00423         m_messages.store(wire::SysNetwork(stream, version));
00424         break;
00425     case MSG_ID(wire::SysDeviceModes::ID):
00426         m_messages.store(wire::SysDeviceModes(stream, version));
00427         break;
00428     case MSG_ID(wire::VersionResponse::ID):
00429         m_messages.store(wire::VersionResponse(stream, version));
00430         break;
00431     case MSG_ID(wire::StatusResponse::ID):
00432         m_messages.store(wire::StatusResponse(stream, version));   
00433         break;
00434     case MSG_ID(wire::ImuConfig::ID):
00435         m_messages.store(wire::ImuConfig(stream, version));
00436         break;
00437     case MSG_ID(wire::ImuInfo::ID):
00438         m_messages.store(wire::ImuInfo(stream, version));
00439         break;
00440     case MSG_ID(wire::SysTestMtuResponse::ID):
00441         m_messages.store(wire::SysTestMtuResponse(stream, version));
00442         break;
00443     case MSG_ID(wire::SysDirectedStreams::ID):
00444         m_messages.store(wire::SysDirectedStreams(stream, version));
00445         break;
00446     default:
00447 
00448         CRL_DEBUG("unknown message received: id=%d, version=%d\n",
00449                   id, version);
00450         return;
00451     }
00452 
00453     //
00454     // Signal any listeners (_after_ the message is stored/dispatched)
00455     //
00456     // A [n]ack is a special case where we signal
00457     // the returned status code of the ack'd command, 
00458     // otherwise we signal valid reception of this message.
00459 
00460     switch(id) {
00461     case MSG_ID(wire::Ack::ID):
00462         m_watch.signal(wire::Ack(stream, version));
00463         break;
00464     default:    
00465         m_watch.signal(id);
00466         break;
00467     }
00468 }
00469 
00470 //
00471 // Get a UDP assembler for this message type. We are given
00472 // the first UDP packet in the stream
00473 
00474 impl::UdpAssembler impl::getUdpAssembler(const uint8_t *firstDatagramP,
00475                                          uint32_t       length)
00476 {
00477     //
00478     // Get the message type, it is stored after wire::Header
00479 
00480     utility::BufferStreamReader stream(firstDatagramP, length);
00481     stream.seek(sizeof(wire::Header));
00482 
00483     wire::IdType messageType; 
00484     stream & messageType;
00485 
00486     //
00487     // See if a custom handler has been registered
00488 
00489     UdpAssemblerMap::const_iterator it = m_udpAssemblerMap.find(messageType);
00490 
00491     if (m_udpAssemblerMap.end() != it)
00492         return it->second;
00493     else
00494         return defaultUdpAssembler;
00495 }
00496 
00497 //
00498 // Find a suitably sized buffer for the incoming message
00499 
00500 utility::BufferStreamWriter& impl::findFreeBuffer(uint32_t messageLength)
00501 {    
00502     BufferPool *bP = NULL;
00503     
00504     if (messageLength <= RX_POOL_SMALL_BUFFER_SIZE)
00505         bP = &(m_rxSmallBufferPool);
00506     else if (messageLength <= RX_POOL_LARGE_BUFFER_SIZE)
00507         bP = &(m_rxLargeBufferPool);
00508     else
00509         CRL_EXCEPTION("message too large: %d bytes", messageLength);
00510 
00511     //
00512     // TODO: re-think the shared() mechanism of the buffers, so we do not
00513     //       have to walk this vector.
00514 
00515     BufferPool::const_iterator it = bP->begin();
00516     for(; it != bP->end(); it++)
00517         if (false == (*it)->shared())
00518             return *(*it);
00519 
00520     CRL_EXCEPTION("no free RX buffers (%d in use by consumers)\n", bP->size());
00521 }
00522 
00523 //
00524 // Unwrap a 16-bit wire sequence ID into a unique 64-bit local ID
00525 
00526 const int64_t& impl::unwrapSequenceId(uint16_t wireId)
00527 {
00528     //
00529     // Look for a sequence change
00530 
00531     if (wireId != m_lastRxSeqId) {
00532 
00533         const uint16_t ID_MAX    = std::numeric_limits<uint16_t>::max();
00534         const uint16_t ID_CENTER = ID_MAX / 2;
00535 
00536         //
00537         // Seed
00538 
00539         if (-1 == m_lastRxSeqId)
00540                         m_unWrappedRxSeqId = m_lastRxSeqId = wireId;
00541 
00542         //
00543         // Detect forward 16-bit wrap
00544 
00545         else if (wireId        < ID_CENTER   &&
00546                  m_lastRxSeqId > ID_CENTER) {
00547 
00548             m_unWrappedRxSeqId += 1 + (ID_MAX - m_lastRxSeqId) + wireId;
00549 
00550         //
00551         // Normal case
00552 
00553         } else
00554             m_unWrappedRxSeqId += wireId - m_lastRxSeqId;
00555 
00556         //
00557         // Remember change
00558 
00559         m_lastRxSeqId = wireId;
00560     }
00561 
00562     return m_unWrappedRxSeqId;
00563 }
00564 
00565 //
00566 // Handles any incoming packets
00567 
00568 void impl::handle()
00569 {
00570     utility::ScopedLock lock(m_rxLock);
00571 
00572     for(;;) {
00573  
00574         //
00575         // Receive the packet
00576         
00577         const int bytesRead = recvfrom(m_serverSocket,
00578                                            (char*)m_incomingBuffer.data(),
00579                                            m_incomingBuffer.size(),
00580                                            0, NULL, NULL);
00581         //
00582         // Nothing left to read
00583         
00584         if (bytesRead < 0)
00585             break;
00586 
00587         //
00588         // Check for undersized packets
00589 
00590         else if (bytesRead < (int)sizeof(wire::Header))
00591             CRL_EXCEPTION("undersized packet: %d/%d bytes\n",
00592                           bytesRead, sizeof(wire::Header));
00593 
00594         //
00595         // For convenience below
00596 
00597         const uint8_t *inP = reinterpret_cast<const uint8_t*>(m_incomingBuffer.data());
00598 
00599         //
00600         // Validate the header
00601 
00602         const wire::Header& header = *(reinterpret_cast<const wire::Header*>(inP));
00603 
00604         if (wire::HEADER_MAGIC != header.magic)
00605             CRL_EXCEPTION("bad protocol magic: 0x%x, expecting 0x%x",
00606                           header.magic, wire::HEADER_MAGIC);
00607         else if (wire::HEADER_VERSION != header.version)
00608             CRL_EXCEPTION("bad protocol version: 0x%x, expecting 0x%x",
00609                           header.version, wire::HEADER_VERSION);
00610         else if (wire::HEADER_GROUP != header.group)
00611             CRL_EXCEPTION("bad protocol group: 0x%x, expecting 0x%x",
00612                           header.group, wire::HEADER_GROUP);
00613 
00614         //
00615         // Unwrap the sequence identifier
00616 
00617         const int64_t& sequence = unwrapSequenceId(header.sequenceIdentifier);
00618 
00619         //
00620         // See if we are already tracking this messge ID
00621 
00622         UdpTracker *trP = m_udpTrackerCache.find(sequence);
00623         if (NULL == trP) {
00624 
00625             //
00626             // If we drop first packet, we will drop entire message. Currently we 
00627             // require the first datagram in order to assign an assembler.
00628             // TODO: re-think this.
00629 
00630             if (0 != header.byteOffset)
00631                 continue;
00632             else {
00633 
00634                 //
00635                 // Create a new tracker for this sequence id.
00636 
00637                 trP = new UdpTracker(header.messageLength,
00638                                      getUdpAssembler(inP, bytesRead),
00639                                      findFreeBuffer(header.messageLength));
00640             }
00641         }
00642      
00643         //
00644         // Assemble the datagram into the message stream, returns true if the
00645         // assembly is complete.
00646 
00647         if (true == trP->assemble(bytesRead - sizeof(wire::Header),
00648                                   header.byteOffset,
00649                                   &(inP[sizeof(wire::Header)]))) {
00650 
00651             //
00652             // Dispatch to any listeners
00653 
00654             dispatch(trP->stream());
00655 
00656             //
00657             // Release the tracker
00658 
00659             if (1 == trP->packets())
00660                 delete trP; // has not yet been cached
00661             else
00662                 m_udpTrackerCache.remove(sequence);
00663 
00664         } else if (1 == trP->packets()) {
00665 
00666             //
00667             // Cache the tracker, as more UDP packets are
00668             // forthcoming for this message.
00669 
00670             m_udpTrackerCache.insert(sequence, trP);
00671         }
00672     }
00673 }
00674 
00675 //
00676 // This thread waits for UDP packets
00677 
00678 #if WIN32
00679 DWORD impl::rxThread(void *userDataP)
00680 #else
00681 void *impl::rxThread(void *userDataP)
00682 #endif
00683 {
00684     impl     *selfP  = reinterpret_cast<impl*>(userDataP);
00685     const int server = selfP->m_serverSocket;
00686     fd_set    readSet;
00687 
00688     //
00689     // Loop until shutdown
00690 
00691     while(selfP->m_threadsRunning) {
00692 
00693         //
00694         // Add the server socket to the read set.
00695 
00696         FD_ZERO(&readSet);
00697         FD_SET(server, &readSet);
00698 
00699         //
00700         // Wait for a new packet to arrive, timing out every once in awhile
00701 
00702         struct timeval tv = {0, 200000}; // 5Hz
00703         const int result  = select(server+1, &readSet, NULL, NULL, &tv);
00704         if (result <= 0)
00705             continue;
00706 
00707         //
00708         // Let the comm object handle decoding
00709 
00710         try {
00711 
00712             selfP->handle();
00713 
00714         } catch (const std::exception& e) {
00715                     
00716             CRL_DEBUG("exception while decoding packet: %s\n", e.what());
00717 
00718         } catch ( ... ) {
00719             
00720             CRL_DEBUG("unknown exception while decoding packet\n");
00721         }
00722     }
00723 
00724     return NULL;
00725 }
00726 
00727 }}}; // namespaces


multisense_lib
Author(s):
autogenerated on Thu Aug 27 2015 14:01:11