00001
00037 #include "details/channel.hh"
00038
00039 #include "details/wire/AckMessage.h"
00040
00041 #include "details/wire/VersionResponseMessage.h"
00042 #include "details/wire/StatusResponseMessage.h"
00043
00044 #include "details/wire/CamConfigMessage.h"
00045 #include "details/wire/ImageMessage.h"
00046 #include "details/wire/JpegMessage.h"
00047 #include "details/wire/ImageMetaMessage.h"
00048 #include "details/wire/DisparityMessage.h"
00049
00050 #include "details/wire/CamHistoryMessage.h"
00051
00052 #include "details/wire/LidarDataMessage.h"
00053
00054 #include "details/wire/LedStatusMessage.h"
00055
00056 #include "details/wire/LedSensorStatusMessage.h"
00057
00058 #include "details/wire/SysMtuMessage.h"
00059 #include "details/wire/SysNetworkMessage.h"
00060 #include "details/wire/SysFlashResponseMessage.h"
00061 #include "details/wire/SysDeviceInfoMessage.h"
00062 #include "details/wire/SysCameraCalibrationMessage.h"
00063 #include "details/wire/SysSensorCalibrationMessage.h"
00064 #include "details/wire/SysTransmitDelayMessage.h"
00065 #include "details/wire/SysLidarCalibrationMessage.h"
00066 #include "details/wire/SysDeviceModesMessage.h"
00067 #include "details/wire/SysExternalCalibrationMessage.h"
00068
00069 #include "details/wire/SysPpsMessage.h"
00070
00071 #include "details/wire/ImuDataMessage.h"
00072 #include "details/wire/ImuConfigMessage.h"
00073 #include "details/wire/ImuInfoMessage.h"
00074
00075 #include "details/wire/SysTestMtuResponseMessage.h"
00076 #include "details/wire/SysDirectedStreamsMessage.h"
00077
00078 #include <limits>
00079
00080 namespace crl {
00081 namespace multisense {
00082 namespace details {
00083 namespace {
00084
00085
00086
00087
00088 void defaultUdpAssembler(utility::BufferStreamWriter& stream,
00089 const uint8_t *dataP,
00090 uint32_t offset,
00091 uint32_t length)
00092 {
00093 stream.seek(offset);
00094 stream.write(dataP, length);
00095 }
00096
00097 };
00098
00099
00100
00101
00102 void impl::dispatchImage(utility::BufferStream& buffer,
00103 image::Header& header)
00104 {
00105 utility::ScopedLock lock(m_dispatchLock);
00106
00107 std::list<ImageListener*>::const_iterator it;
00108
00109 for(it = m_imageListeners.begin();
00110 it != m_imageListeners.end();
00111 it ++)
00112 (*it)->dispatch(buffer, header);
00113 }
00114
00115
00116
00117
00118 void impl::dispatchLidar(utility::BufferStream& buffer,
00119 lidar::Header& header)
00120 {
00121 utility::ScopedLock lock(m_dispatchLock);
00122
00123 std::list<LidarListener*>::const_iterator it;
00124
00125 for(it = m_lidarListeners.begin();
00126 it != m_lidarListeners.end();
00127 it ++)
00128 (*it)->dispatch(buffer, header);
00129 }
00130
00131
00132
00133 void impl::dispatchPps(pps::Header& header)
00134 {
00135 utility::ScopedLock lock(m_dispatchLock);
00136
00137 std::list<PpsListener*>::const_iterator it;
00138
00139 for(it = m_ppsListeners.begin();
00140 it != m_ppsListeners.end();
00141 it ++)
00142 (*it)->dispatch(header);
00143 }
00144
00145
00146
00147
00148 void impl::dispatchImu(imu::Header& header)
00149 {
00150 utility::ScopedLock lock(m_dispatchLock);
00151
00152 std::list<ImuListener*>::const_iterator it;
00153
00154 for(it = m_imuListeners.begin();
00155 it != m_imuListeners.end();
00156 it ++)
00157 (*it)->dispatch(header);
00158 }
00159
00160
00161
00162
00163 void impl::dispatch(utility::BufferStreamWriter& buffer)
00164 {
00165 utility::BufferStreamReader stream(buffer);
00166
00167
00168
00169
00170
00171 wire::IdType id;
00172 wire::VersionType version;
00173
00174 stream & id;
00175 stream & version;
00176
00177
00178
00179
00180
00181
00182
00183
00184
00185
00186
00187 switch(id) {
00188 case MSG_ID(wire::LidarData::ID):
00189 {
00190 wire::LidarData scan(stream, version);
00191 lidar::Header header;
00192
00193 const int32_t scanArc = static_cast<int32_t> (utility::degreesToRadians(270.0) * 1e6);
00194 const uint32_t maxRange = static_cast<uint32_t> (30.0 * 1e3);
00195
00196 if (false == m_networkTimeSyncEnabled) {
00197
00198 header.timeStartSeconds = scan.timeStartSeconds;
00199 header.timeStartMicroSeconds = scan.timeStartMicroSeconds;
00200 header.timeEndSeconds = scan.timeEndSeconds;
00201 header.timeEndMicroSeconds = scan.timeEndMicroSeconds;
00202
00203 } else {
00204
00205 sensorToLocalTime(static_cast<double>(scan.timeStartSeconds) +
00206 1e-6 * static_cast<double>(scan.timeStartMicroSeconds),
00207 header.timeStartSeconds, header.timeStartMicroSeconds);
00208
00209 sensorToLocalTime(static_cast<double>(scan.timeEndSeconds) +
00210 1e-6 * static_cast<double>(scan.timeEndMicroSeconds),
00211 header.timeEndSeconds, header.timeEndMicroSeconds);
00212 }
00213
00214 header.scanId = scan.scanCount;
00215 header.spindleAngleStart = scan.angleStart;
00216 header.spindleAngleEnd = scan.angleEnd;
00217 header.scanArc = scanArc;
00218 header.maxRange = maxRange;
00219 header.pointCount = scan.points;
00220 header.rangesP = scan.distanceP;
00221 header.intensitiesP = scan.intensityP;
00222
00223 dispatchLidar(buffer, header);
00224
00225 break;
00226 }
00227 case MSG_ID(wire::ImageMeta::ID):
00228 {
00229 wire::ImageMeta *metaP = new (std::nothrow) wire::ImageMeta(stream, version);
00230
00231 if (NULL == metaP)
00232 CRL_EXCEPTION("unable to allocate metadata");
00233
00234 m_imageMetaCache.insert(metaP->frameId, metaP);
00235
00236 break;
00237 }
00238 case MSG_ID(wire::JpegImage::ID):
00239 {
00240 wire::JpegImage image(stream, version);
00241
00242 const wire::ImageMeta *metaP = m_imageMetaCache.find(image.frameId);
00243 if (NULL == metaP)
00244 break;
00245
00246
00247 image::Header header;
00248
00249 if (false == m_networkTimeSyncEnabled) {
00250
00251 header.timeSeconds = metaP->timeSeconds;
00252 header.timeMicroSeconds = metaP->timeMicroSeconds;
00253
00254 } else
00255 sensorToLocalTime(static_cast<double>(metaP->timeSeconds) +
00256 1e-6 * static_cast<double>(metaP->timeMicroSeconds),
00257 header.timeSeconds, header.timeMicroSeconds);
00258
00259 header.source = sourceWireToApi(image.source);
00260 header.bitsPerPixel = 0;
00261 header.width = image.width;
00262 header.height = image.height;
00263 header.frameId = image.frameId;
00264 header.exposure = metaP->exposureTime;
00265 header.gain = metaP->gain;
00266 header.framesPerSecond = metaP->framesPerSecond;
00267 header.imageDataP = image.dataP;
00268 header.imageLength = image.length;
00269
00270 dispatchImage(buffer, header);
00271
00272 break;
00273 }
00274 case MSG_ID(wire::Image::ID):
00275 {
00276 wire::Image image(stream, version);
00277
00278 const wire::ImageMeta *metaP = m_imageMetaCache.find(image.frameId);
00279 if (NULL == metaP)
00280 break;
00281
00282
00283 image::Header header;
00284
00285 if (false == m_networkTimeSyncEnabled) {
00286
00287 header.timeSeconds = metaP->timeSeconds;
00288 header.timeMicroSeconds = metaP->timeMicroSeconds;
00289
00290 } else
00291 sensorToLocalTime(static_cast<double>(metaP->timeSeconds) +
00292 1e-6 * static_cast<double>(metaP->timeMicroSeconds),
00293 header.timeSeconds, header.timeMicroSeconds);
00294
00295 header.source = sourceWireToApi(image.source);
00296 header.bitsPerPixel = image.bitsPerPixel;
00297 header.width = image.width;
00298 header.height = image.height;
00299 header.frameId = image.frameId;
00300 header.exposure = metaP->exposureTime;
00301 header.gain = metaP->gain;
00302 header.framesPerSecond = metaP->framesPerSecond;
00303 header.imageDataP = image.dataP;
00304 header.imageLength = static_cast<uint32_t>(std::ceil(((double) image.bitsPerPixel / 8.0) * image.width * image.height));
00305
00306 dispatchImage(buffer, header);
00307
00308 break;
00309 }
00310 case MSG_ID(wire::Disparity::ID):
00311 {
00312 wire::Disparity image(stream, version);
00313
00314 const wire::ImageMeta *metaP = m_imageMetaCache.find(image.frameId);
00315 if (NULL == metaP)
00316 break;
00317
00318
00319 image::Header header;
00320
00321 if (false == m_networkTimeSyncEnabled) {
00322
00323 header.timeSeconds = metaP->timeSeconds;
00324 header.timeMicroSeconds = metaP->timeMicroSeconds;
00325
00326 } else
00327 sensorToLocalTime(static_cast<double>(metaP->timeSeconds) +
00328 1e-6 * static_cast<double>(metaP->timeMicroSeconds),
00329 header.timeSeconds, header.timeMicroSeconds);
00330
00331 header.source = Source_Disparity;
00332 header.bitsPerPixel = wire::Disparity::API_BITS_PER_PIXEL;
00333 header.width = image.width;
00334 header.height = image.height;
00335 header.frameId = image.frameId;
00336 header.exposure = metaP->exposureTime;
00337 header.gain = metaP->gain;
00338 header.framesPerSecond = metaP->framesPerSecond;
00339 header.imageDataP = image.dataP;
00340 header.imageLength = static_cast<uint32_t>(std::ceil(((double) wire::Disparity::API_BITS_PER_PIXEL / 8.0) * image.width * image.height));
00341
00342 dispatchImage(buffer, header);
00343
00344 break;
00345 }
00346 case MSG_ID(wire::SysPps::ID):
00347 {
00348 wire::SysPps pps(stream, version);
00349
00350 pps::Header header;
00351
00352 header.sensorTime = pps.ppsNanoSeconds;
00353
00354 sensorToLocalTime(static_cast<double>(pps.ppsNanoSeconds) / 1e9,
00355 header.timeSeconds, header.timeMicroSeconds);
00356
00357 dispatchPps(header);
00358
00359 break;
00360 }
00361 case MSG_ID(wire::ImuData::ID):
00362 {
00363 wire::ImuData imu(stream, version);
00364
00365 imu::Header header;
00366
00367 header.sequence = imu.sequence;
00368 header.samples.resize(imu.samples.size());
00369
00370 for(uint32_t i=0; i<imu.samples.size(); i++) {
00371
00372 const wire::ImuSample& w = imu.samples[i];
00373 imu::Sample& a = header.samples[i];
00374
00375 if (false == m_networkTimeSyncEnabled) {
00376
00377 const int64_t oneBillion = static_cast<int64_t>(1e9);
00378
00379 a.timeSeconds = static_cast<uint32_t>(w.timeNanoSeconds / oneBillion);
00380 a.timeMicroSeconds = static_cast<uint32_t>((w.timeNanoSeconds % oneBillion) /
00381 static_cast<int64_t>(1000));
00382
00383 } else
00384 sensorToLocalTime(static_cast<double>(w.timeNanoSeconds) / 1e9,
00385 a.timeSeconds, a.timeMicroSeconds);
00386
00387 switch(w.type) {
00388 case wire::ImuSample::TYPE_ACCEL: a.type = imu::Sample::Type_Accelerometer; break;
00389 case wire::ImuSample::TYPE_GYRO : a.type = imu::Sample::Type_Gyroscope; break;
00390 case wire::ImuSample::TYPE_MAG : a.type = imu::Sample::Type_Magnetometer; break;
00391 default: CRL_EXCEPTION("unknown wire IMU type: %d", w.type);
00392 }
00393
00394 a.x = w.x; a.y = w.y; a.z = w.z;
00395 }
00396
00397 dispatchImu(header);
00398
00399 break;
00400 }
00401 case MSG_ID(wire::Ack::ID):
00402 break;
00403 case MSG_ID(wire::CamConfig::ID):
00404 m_messages.store(wire::CamConfig(stream, version));
00405 break;
00406 case MSG_ID(wire::CamHistory::ID):
00407 m_messages.store(wire::CamHistory(stream, version));
00408 break;
00409 case MSG_ID(wire::LedStatus::ID):
00410 m_messages.store(wire::LedStatus(stream, version));
00411 break;
00412 case MSG_ID(wire::LedSensorStatus::ID):
00413 m_messages.store(wire::LedSensorStatus(stream, version));
00414 break;
00415 case MSG_ID(wire::SysFlashResponse::ID):
00416 m_messages.store(wire::SysFlashResponse(stream, version));
00417 break;
00418 case MSG_ID(wire::SysDeviceInfo::ID):
00419 m_messages.store(wire::SysDeviceInfo(stream, version));
00420 break;
00421 case MSG_ID(wire::SysCameraCalibration::ID):
00422 m_messages.store(wire::SysCameraCalibration(stream, version));
00423 break;
00424 case MSG_ID(wire::SysSensorCalibration::ID):
00425 m_messages.store(wire::SysSensorCalibration(stream, version));
00426 break;
00427 case MSG_ID(wire::SysTransmitDelay::ID):
00428 m_messages.store(wire::SysTransmitDelay(stream, version));
00429 break;
00430 case MSG_ID(wire::SysLidarCalibration::ID):
00431 m_messages.store(wire::SysLidarCalibration(stream, version));
00432 break;
00433 case MSG_ID(wire::SysMtu::ID):
00434 m_messages.store(wire::SysMtu(stream, version));
00435 break;
00436 case MSG_ID(wire::SysNetwork::ID):
00437 m_messages.store(wire::SysNetwork(stream, version));
00438 break;
00439 case MSG_ID(wire::SysDeviceModes::ID):
00440 m_messages.store(wire::SysDeviceModes(stream, version));
00441 break;
00442 case MSG_ID(wire::VersionResponse::ID):
00443 m_messages.store(wire::VersionResponse(stream, version));
00444 break;
00445 case MSG_ID(wire::StatusResponse::ID):
00446 m_messages.store(wire::StatusResponse(stream, version));
00447 break;
00448 case MSG_ID(wire::ImuConfig::ID):
00449 m_messages.store(wire::ImuConfig(stream, version));
00450 break;
00451 case MSG_ID(wire::ImuInfo::ID):
00452 m_messages.store(wire::ImuInfo(stream, version));
00453 break;
00454 case MSG_ID(wire::SysTestMtuResponse::ID):
00455 m_messages.store(wire::SysTestMtuResponse(stream, version));
00456 break;
00457 case MSG_ID(wire::SysDirectedStreams::ID):
00458 m_messages.store(wire::SysDirectedStreams(stream, version));
00459 break;
00460 case MSG_ID(wire::SysExternalCalibration::ID):
00461 m_messages.store(wire::SysExternalCalibration(stream, version));
00462 break;
00463 default:
00464
00465 CRL_DEBUG("unknown message received: id=%d, version=%d\n",
00466 id, version);
00467 return;
00468 }
00469
00470
00471
00472
00473
00474
00475
00476
00477 switch(id) {
00478 case MSG_ID(wire::Ack::ID):
00479 m_watch.signal(wire::Ack(stream, version));
00480 break;
00481 default:
00482 m_watch.signal(id);
00483 break;
00484 }
00485 }
00486
00487
00488
00489
00490
00491 impl::UdpAssembler impl::getUdpAssembler(const uint8_t *firstDatagramP,
00492 uint32_t length)
00493 {
00494
00495
00496
00497 utility::BufferStreamReader stream(firstDatagramP, length);
00498 stream.seek(sizeof(wire::Header));
00499
00500 wire::IdType messageType;
00501 stream & messageType;
00502
00503
00504
00505
00506 UdpAssemblerMap::const_iterator it = m_udpAssemblerMap.find(messageType);
00507
00508 if (m_udpAssemblerMap.end() != it)
00509 return it->second;
00510 else
00511 return defaultUdpAssembler;
00512 }
00513
00514
00515
00516
00517 utility::BufferStreamWriter& impl::findFreeBuffer(uint32_t messageLength)
00518 {
00519 BufferPool *bP = NULL;
00520
00521 if (messageLength <= RX_POOL_SMALL_BUFFER_SIZE)
00522 bP = &(m_rxSmallBufferPool);
00523 else if (messageLength <= RX_POOL_LARGE_BUFFER_SIZE)
00524 bP = &(m_rxLargeBufferPool);
00525 else
00526 CRL_EXCEPTION("message too large: %d bytes", messageLength);
00527
00528
00529
00530
00531
00532 BufferPool::const_iterator it = bP->begin();
00533 for(; it != bP->end(); it++)
00534 if (false == (*it)->shared())
00535 return *(*it);
00536
00537 CRL_EXCEPTION("no free RX buffers (%d in use by consumers)\n", bP->size());
00538 }
00539
00540
00541
00542
00543 const int64_t& impl::unwrapSequenceId(uint16_t wireId)
00544 {
00545
00546
00547
00548 if (wireId != m_lastRxSeqId) {
00549
00550 const uint16_t ID_MAX = std::numeric_limits<uint16_t>::max();
00551 const uint16_t ID_CENTER = ID_MAX / 2;
00552
00553
00554
00555
00556 if (-1 == m_lastRxSeqId)
00557 m_unWrappedRxSeqId = m_lastRxSeqId = wireId;
00558
00559
00560
00561
00562 else if (wireId < ID_CENTER &&
00563 m_lastRxSeqId > ID_CENTER) {
00564
00565 m_unWrappedRxSeqId += 1 + (ID_MAX - m_lastRxSeqId) + wireId;
00566
00567
00568
00569
00570 } else
00571 m_unWrappedRxSeqId += wireId - m_lastRxSeqId;
00572
00573
00574
00575
00576 m_lastRxSeqId = wireId;
00577 }
00578
00579 return m_unWrappedRxSeqId;
00580 }
00581
00582
00583
00584
00585 void impl::handle()
00586 {
00587 utility::ScopedLock lock(m_rxLock);
00588
00589 for(;;) {
00590
00591
00592
00593
00594 const int bytesRead = recvfrom(m_serverSocket,
00595 (char*)m_incomingBuffer.data(),
00596 m_incomingBuffer.size(),
00597 0, NULL, NULL);
00598
00599
00600
00601 if (bytesRead < 0)
00602 break;
00603
00604
00605
00606
00607 else if (bytesRead < (int)sizeof(wire::Header))
00608 CRL_EXCEPTION("undersized packet: %d/%d bytes\n",
00609 bytesRead, sizeof(wire::Header));
00610
00611
00612
00613
00614 const uint8_t *inP = reinterpret_cast<const uint8_t*>(m_incomingBuffer.data());
00615
00616
00617
00618
00619 const wire::Header& header = *(reinterpret_cast<const wire::Header*>(inP));
00620
00621 if (wire::HEADER_MAGIC != header.magic)
00622 CRL_EXCEPTION("bad protocol magic: 0x%x, expecting 0x%x",
00623 header.magic, wire::HEADER_MAGIC);
00624 else if (wire::HEADER_VERSION != header.version)
00625 CRL_EXCEPTION("bad protocol version: 0x%x, expecting 0x%x",
00626 header.version, wire::HEADER_VERSION);
00627 else if (wire::HEADER_GROUP != header.group)
00628 CRL_EXCEPTION("bad protocol group: 0x%x, expecting 0x%x",
00629 header.group, wire::HEADER_GROUP);
00630
00631
00632
00633
00634 const int64_t& sequence = unwrapSequenceId(header.sequenceIdentifier);
00635
00636
00637
00638
00639 UdpTracker *trP = m_udpTrackerCache.find(sequence);
00640 if (NULL == trP) {
00641
00642
00643
00644
00645
00646
00647 if (0 != header.byteOffset)
00648 continue;
00649 else {
00650
00651
00652
00653
00654 trP = new UdpTracker(header.messageLength,
00655 getUdpAssembler(inP, bytesRead),
00656 findFreeBuffer(header.messageLength));
00657 }
00658 }
00659
00660
00661
00662
00663
00664 if (true == trP->assemble(bytesRead - sizeof(wire::Header),
00665 header.byteOffset,
00666 &(inP[sizeof(wire::Header)]))) {
00667
00668
00669
00670
00671 dispatch(trP->stream());
00672
00673
00674
00675
00676 if (1 == trP->packets())
00677 delete trP;
00678 else
00679 m_udpTrackerCache.remove(sequence);
00680
00681 } else if (1 == trP->packets()) {
00682
00683
00684
00685
00686
00687 m_udpTrackerCache.insert(sequence, trP);
00688 }
00689 }
00690 }
00691
00692
00693
00694
00695 #if WIN32
00696 DWORD impl::rxThread(void *userDataP)
00697 #else
00698 void *impl::rxThread(void *userDataP)
00699 #endif
00700 {
00701 impl *selfP = reinterpret_cast<impl*>(userDataP);
00702 const int server = selfP->m_serverSocket;
00703 fd_set readSet;
00704
00705
00706
00707
00708 while(selfP->m_threadsRunning) {
00709
00710
00711
00712
00713 FD_ZERO(&readSet);
00714 FD_SET(server, &readSet);
00715
00716
00717
00718
00719 struct timeval tv = {0, 200000};
00720 const int result = select(server+1, &readSet, NULL, NULL, &tv);
00721 if (result <= 0)
00722 continue;
00723
00724
00725
00726
00727 try {
00728
00729 selfP->handle();
00730
00731 } catch (const std::exception& e) {
00732
00733 CRL_DEBUG("exception while decoding packet: %s\n", e.what());
00734
00735 } catch ( ... ) {
00736
00737 CRL_DEBUG("unknown exception while decoding packet\n");
00738 }
00739 }
00740
00741 return NULL;
00742 }
00743
00744 }}};