00001
00037 #include "details/channel.hh"
00038
00039 #include "details/wire/AckMessage.h"
00040
00041 #include "details/wire/VersionResponseMessage.h"
00042 #include "details/wire/StatusResponseMessage.h"
00043
00044 #include "details/wire/CamConfigMessage.h"
00045 #include "details/wire/ImageMessage.h"
00046 #include "details/wire/JpegMessage.h"
00047 #include "details/wire/ImageMetaMessage.h"
00048 #include "details/wire/DisparityMessage.h"
00049
00050 #include "details/wire/CamHistoryMessage.h"
00051
00052 #include "details/wire/LidarDataMessage.h"
00053
00054 #include "details/wire/LedStatusMessage.h"
00055
00056 #include "details/wire/SysMtuMessage.h"
00057 #include "details/wire/SysNetworkMessage.h"
00058 #include "details/wire/SysFlashResponseMessage.h"
00059 #include "details/wire/SysDeviceInfoMessage.h"
00060 #include "details/wire/SysCameraCalibrationMessage.h"
00061 #include "details/wire/SysLidarCalibrationMessage.h"
00062 #include "details/wire/SysDeviceModesMessage.h"
00063
00064 #include "details/wire/SysPpsMessage.h"
00065
00066 #include "details/wire/ImuDataMessage.h"
00067 #include "details/wire/ImuConfigMessage.h"
00068 #include "details/wire/ImuInfoMessage.h"
00069
00070 #include "details/wire/SysTestMtuResponseMessage.h"
00071 #include "details/wire/SysDirectedStreamsMessage.h"
00072
00073 #include <limits>
00074
00075 namespace crl {
00076 namespace multisense {
00077 namespace details {
00078 namespace {
00079
00080
00081
00082
00083 void defaultUdpAssembler(utility::BufferStreamWriter& stream,
00084 const uint8_t *dataP,
00085 uint32_t offset,
00086 uint32_t length)
00087 {
00088 stream.seek(offset);
00089 stream.write(dataP, length);
00090 }
00091
00092 };
00093
00094
00095
00096
00097 void impl::dispatchImage(utility::BufferStream& buffer,
00098 image::Header& header)
00099 {
00100 utility::ScopedLock lock(m_dispatchLock);
00101
00102 std::list<ImageListener*>::const_iterator it;
00103
00104 for(it = m_imageListeners.begin();
00105 it != m_imageListeners.end();
00106 it ++)
00107 (*it)->dispatch(buffer, header);
00108 }
00109
00110
00111
00112
00113 void impl::dispatchLidar(utility::BufferStream& buffer,
00114 lidar::Header& header)
00115 {
00116 utility::ScopedLock lock(m_dispatchLock);
00117
00118 std::list<LidarListener*>::const_iterator it;
00119
00120 for(it = m_lidarListeners.begin();
00121 it != m_lidarListeners.end();
00122 it ++)
00123 (*it)->dispatch(buffer, header);
00124 }
00125
00126
00127
00128
00129 void impl::dispatchPps(pps::Header& header)
00130 {
00131 utility::ScopedLock lock(m_dispatchLock);
00132
00133 std::list<PpsListener*>::const_iterator it;
00134
00135 for(it = m_ppsListeners.begin();
00136 it != m_ppsListeners.end();
00137 it ++)
00138 (*it)->dispatch(header);
00139 }
00140
00141
00142
00143
00144 void impl::dispatchImu(imu::Header& header)
00145 {
00146 utility::ScopedLock lock(m_dispatchLock);
00147
00148 std::list<ImuListener*>::const_iterator it;
00149
00150 for(it = m_imuListeners.begin();
00151 it != m_imuListeners.end();
00152 it ++)
00153 (*it)->dispatch(header);
00154 }
00155
00156
00157
00158
00159 void impl::dispatch(utility::BufferStreamWriter& buffer)
00160 {
00161 utility::BufferStreamReader stream(buffer);
00162
00163
00164
00165
00166
00167 wire::IdType id;
00168 wire::VersionType version;
00169
00170 stream & id;
00171 stream & version;
00172
00173
00174
00175
00176
00177
00178
00179
00180
00181
00182
00183 switch(id) {
00184 case MSG_ID(wire::LidarData::ID):
00185 {
00186 wire::LidarData scan(stream, version);
00187 lidar::Header header;
00188
00189 const int32_t scanArc = static_cast<int32_t> (utility::degreesToRadians(270.0) * 1e6);
00190 const uint32_t maxRange = static_cast<uint32_t> (30.0 * 1e3);
00191
00192 if (false == m_networkTimeSyncEnabled) {
00193
00194 header.timeStartSeconds = scan.timeStartSeconds;
00195 header.timeStartMicroSeconds = scan.timeStartMicroSeconds;
00196 header.timeEndSeconds = scan.timeEndSeconds;
00197 header.timeEndMicroSeconds = scan.timeEndMicroSeconds;
00198
00199 } else {
00200
00201 sensorToLocalTime(static_cast<double>(scan.timeStartSeconds) +
00202 1e-6 * static_cast<double>(scan.timeStartMicroSeconds),
00203 header.timeStartSeconds, header.timeStartMicroSeconds);
00204
00205 sensorToLocalTime(static_cast<double>(scan.timeEndSeconds) +
00206 1e-6 * static_cast<double>(scan.timeEndMicroSeconds),
00207 header.timeEndSeconds, header.timeEndMicroSeconds);
00208 }
00209
00210 header.scanId = scan.scanCount;
00211 header.spindleAngleStart = scan.angleStart;
00212 header.spindleAngleEnd = scan.angleEnd;
00213 header.scanArc = scanArc;
00214 header.maxRange = maxRange;
00215 header.pointCount = scan.points;
00216 header.rangesP = scan.distanceP;
00217 header.intensitiesP = scan.intensityP;
00218
00219 dispatchLidar(buffer, header);
00220
00221 break;
00222 }
00223 case MSG_ID(wire::ImageMeta::ID):
00224 {
00225 wire::ImageMeta *metaP = new (std::nothrow) wire::ImageMeta(stream, version);
00226
00227 if (NULL == metaP)
00228 CRL_EXCEPTION("unable to allocate metadata");
00229
00230 m_imageMetaCache.insert(metaP->frameId, metaP);
00231
00232 break;
00233 }
00234 case MSG_ID(wire::JpegImage::ID):
00235 {
00236 wire::JpegImage image(stream, version);
00237
00238 const wire::ImageMeta *metaP = m_imageMetaCache.find(image.frameId);
00239 if (NULL == metaP)
00240 break;
00241
00242
00243 image::Header header;
00244
00245 if (false == m_networkTimeSyncEnabled) {
00246
00247 header.timeSeconds = metaP->timeSeconds;
00248 header.timeMicroSeconds = metaP->timeMicroSeconds;
00249
00250 } else
00251 sensorToLocalTime(static_cast<double>(metaP->timeSeconds) +
00252 1e-6 * static_cast<double>(metaP->timeMicroSeconds),
00253 header.timeSeconds, header.timeMicroSeconds);
00254
00255 header.source = sourceWireToApi(image.source);
00256 header.bitsPerPixel = 0;
00257 header.width = image.width;
00258 header.height = image.height;
00259 header.frameId = image.frameId;
00260 header.exposure = metaP->exposureTime;
00261 header.gain = metaP->gain;
00262 header.framesPerSecond = metaP->framesPerSecond;
00263 header.imageDataP = image.dataP;
00264 header.imageLength = image.length;
00265
00266 dispatchImage(buffer, header);
00267
00268 break;
00269 }
00270 case MSG_ID(wire::Image::ID):
00271 {
00272 wire::Image image(stream, version);
00273
00274 const wire::ImageMeta *metaP = m_imageMetaCache.find(image.frameId);
00275 if (NULL == metaP)
00276 break;
00277
00278
00279 image::Header header;
00280
00281 if (false == m_networkTimeSyncEnabled) {
00282
00283 header.timeSeconds = metaP->timeSeconds;
00284 header.timeMicroSeconds = metaP->timeMicroSeconds;
00285
00286 } else
00287 sensorToLocalTime(static_cast<double>(metaP->timeSeconds) +
00288 1e-6 * static_cast<double>(metaP->timeMicroSeconds),
00289 header.timeSeconds, header.timeMicroSeconds);
00290
00291 header.source = sourceWireToApi(image.source);
00292 header.bitsPerPixel = image.bitsPerPixel;
00293 header.width = image.width;
00294 header.height = image.height;
00295 header.frameId = image.frameId;
00296 header.exposure = metaP->exposureTime;
00297 header.gain = metaP->gain;
00298 header.framesPerSecond = metaP->framesPerSecond;
00299 header.imageDataP = image.dataP;
00300 header.imageLength = static_cast<uint32_t>(std::ceil(((double) image.bitsPerPixel / 8.0) * image.width * image.height));
00301
00302 dispatchImage(buffer, header);
00303
00304 break;
00305 }
00306 case MSG_ID(wire::Disparity::ID):
00307 {
00308 wire::Disparity image(stream, version);
00309
00310 const wire::ImageMeta *metaP = m_imageMetaCache.find(image.frameId);
00311 if (NULL == metaP)
00312 break;
00313
00314
00315 image::Header header;
00316
00317 if (false == m_networkTimeSyncEnabled) {
00318
00319 header.timeSeconds = metaP->timeSeconds;
00320 header.timeMicroSeconds = metaP->timeMicroSeconds;
00321
00322 } else
00323 sensorToLocalTime(static_cast<double>(metaP->timeSeconds) +
00324 1e-6 * static_cast<double>(metaP->timeMicroSeconds),
00325 header.timeSeconds, header.timeMicroSeconds);
00326
00327 header.source = Source_Disparity;
00328 header.bitsPerPixel = wire::Disparity::API_BITS_PER_PIXEL;
00329 header.width = image.width;
00330 header.height = image.height;
00331 header.frameId = image.frameId;
00332 header.exposure = metaP->exposureTime;
00333 header.gain = metaP->gain;
00334 header.framesPerSecond = metaP->framesPerSecond;
00335 header.imageDataP = image.dataP;
00336
00337 dispatchImage(buffer, header);
00338
00339 break;
00340 }
00341 case MSG_ID(wire::SysPps::ID):
00342 {
00343 wire::SysPps pps(stream, version);
00344
00345 pps::Header header;
00346
00347 header.sensorTime = pps.ppsNanoSeconds;
00348
00349 sensorToLocalTime(static_cast<double>(pps.ppsNanoSeconds) / 1e9,
00350 header.timeSeconds, header.timeMicroSeconds);
00351
00352 dispatchPps(header);
00353
00354 break;
00355 }
00356 case MSG_ID(wire::ImuData::ID):
00357 {
00358 wire::ImuData imu(stream, version);
00359
00360 imu::Header header;
00361
00362 header.sequence = imu.sequence;
00363 header.samples.resize(imu.samples.size());
00364
00365 for(uint32_t i=0; i<imu.samples.size(); i++) {
00366
00367 const wire::ImuSample& w = imu.samples[i];
00368 imu::Sample& a = header.samples[i];
00369
00370 if (false == m_networkTimeSyncEnabled) {
00371
00372 const int64_t oneBillion = static_cast<int64_t>(1e9);
00373
00374 a.timeSeconds = static_cast<uint32_t>(w.timeNanoSeconds / oneBillion);
00375 a.timeMicroSeconds = static_cast<uint32_t>((w.timeNanoSeconds % oneBillion) /
00376 static_cast<int64_t>(1000));
00377
00378 } else
00379 sensorToLocalTime(static_cast<double>(w.timeNanoSeconds) / 1e9,
00380 a.timeSeconds, a.timeMicroSeconds);
00381
00382 switch(w.type) {
00383 case wire::ImuSample::TYPE_ACCEL: a.type = imu::Sample::Type_Accelerometer; break;
00384 case wire::ImuSample::TYPE_GYRO : a.type = imu::Sample::Type_Gyroscope; break;
00385 case wire::ImuSample::TYPE_MAG : a.type = imu::Sample::Type_Magnetometer; break;
00386 default: CRL_EXCEPTION("unknown wire IMU type: %d", w.type);
00387 }
00388
00389 a.x = w.x; a.y = w.y; a.z = w.z;
00390 }
00391
00392 dispatchImu(header);
00393
00394 break;
00395 }
00396 case MSG_ID(wire::Ack::ID):
00397 break;
00398 case MSG_ID(wire::CamConfig::ID):
00399 m_messages.store(wire::CamConfig(stream, version));
00400 break;
00401 case MSG_ID(wire::CamHistory::ID):
00402 m_messages.store(wire::CamHistory(stream, version));
00403 break;
00404 case MSG_ID(wire::LedStatus::ID):
00405 m_messages.store(wire::LedStatus(stream, version));
00406 break;
00407 case MSG_ID(wire::SysFlashResponse::ID):
00408 m_messages.store(wire::SysFlashResponse(stream, version));
00409 break;
00410 case MSG_ID(wire::SysDeviceInfo::ID):
00411 m_messages.store(wire::SysDeviceInfo(stream, version));
00412 break;
00413 case MSG_ID(wire::SysCameraCalibration::ID):
00414 m_messages.store(wire::SysCameraCalibration(stream, version));
00415 break;
00416 case MSG_ID(wire::SysLidarCalibration::ID):
00417 m_messages.store(wire::SysLidarCalibration(stream, version));
00418 break;
00419 case MSG_ID(wire::SysMtu::ID):
00420 m_messages.store(wire::SysMtu(stream, version));
00421 break;
00422 case MSG_ID(wire::SysNetwork::ID):
00423 m_messages.store(wire::SysNetwork(stream, version));
00424 break;
00425 case MSG_ID(wire::SysDeviceModes::ID):
00426 m_messages.store(wire::SysDeviceModes(stream, version));
00427 break;
00428 case MSG_ID(wire::VersionResponse::ID):
00429 m_messages.store(wire::VersionResponse(stream, version));
00430 break;
00431 case MSG_ID(wire::StatusResponse::ID):
00432 m_messages.store(wire::StatusResponse(stream, version));
00433 break;
00434 case MSG_ID(wire::ImuConfig::ID):
00435 m_messages.store(wire::ImuConfig(stream, version));
00436 break;
00437 case MSG_ID(wire::ImuInfo::ID):
00438 m_messages.store(wire::ImuInfo(stream, version));
00439 break;
00440 case MSG_ID(wire::SysTestMtuResponse::ID):
00441 m_messages.store(wire::SysTestMtuResponse(stream, version));
00442 break;
00443 case MSG_ID(wire::SysDirectedStreams::ID):
00444 m_messages.store(wire::SysDirectedStreams(stream, version));
00445 break;
00446 default:
00447
00448 CRL_DEBUG("unknown message received: id=%d, version=%d\n",
00449 id, version);
00450 return;
00451 }
00452
00453
00454
00455
00456
00457
00458
00459
00460 switch(id) {
00461 case MSG_ID(wire::Ack::ID):
00462 m_watch.signal(wire::Ack(stream, version));
00463 break;
00464 default:
00465 m_watch.signal(id);
00466 break;
00467 }
00468 }
00469
00470
00471
00472
00473
00474 impl::UdpAssembler impl::getUdpAssembler(const uint8_t *firstDatagramP,
00475 uint32_t length)
00476 {
00477
00478
00479
00480 utility::BufferStreamReader stream(firstDatagramP, length);
00481 stream.seek(sizeof(wire::Header));
00482
00483 wire::IdType messageType;
00484 stream & messageType;
00485
00486
00487
00488
00489 UdpAssemblerMap::const_iterator it = m_udpAssemblerMap.find(messageType);
00490
00491 if (m_udpAssemblerMap.end() != it)
00492 return it->second;
00493 else
00494 return defaultUdpAssembler;
00495 }
00496
00497
00498
00499
00500 utility::BufferStreamWriter& impl::findFreeBuffer(uint32_t messageLength)
00501 {
00502 BufferPool *bP = NULL;
00503
00504 if (messageLength <= RX_POOL_SMALL_BUFFER_SIZE)
00505 bP = &(m_rxSmallBufferPool);
00506 else if (messageLength <= RX_POOL_LARGE_BUFFER_SIZE)
00507 bP = &(m_rxLargeBufferPool);
00508 else
00509 CRL_EXCEPTION("message too large: %d bytes", messageLength);
00510
00511
00512
00513
00514
00515 BufferPool::const_iterator it = bP->begin();
00516 for(; it != bP->end(); it++)
00517 if (false == (*it)->shared())
00518 return *(*it);
00519
00520 CRL_EXCEPTION("no free RX buffers (%d in use by consumers)\n", bP->size());
00521 }
00522
00523
00524
00525
00526 const int64_t& impl::unwrapSequenceId(uint16_t wireId)
00527 {
00528
00529
00530
00531 if (wireId != m_lastRxSeqId) {
00532
00533 const uint16_t ID_MAX = std::numeric_limits<uint16_t>::max();
00534 const uint16_t ID_CENTER = ID_MAX / 2;
00535
00536
00537
00538
00539 if (-1 == m_lastRxSeqId)
00540 m_unWrappedRxSeqId = m_lastRxSeqId = wireId;
00541
00542
00543
00544
00545 else if (wireId < ID_CENTER &&
00546 m_lastRxSeqId > ID_CENTER) {
00547
00548 m_unWrappedRxSeqId += 1 + (ID_MAX - m_lastRxSeqId) + wireId;
00549
00550
00551
00552
00553 } else
00554 m_unWrappedRxSeqId += wireId - m_lastRxSeqId;
00555
00556
00557
00558
00559 m_lastRxSeqId = wireId;
00560 }
00561
00562 return m_unWrappedRxSeqId;
00563 }
00564
00565
00566
00567
00568 void impl::handle()
00569 {
00570 utility::ScopedLock lock(m_rxLock);
00571
00572 for(;;) {
00573
00574
00575
00576
00577 const int bytesRead = recvfrom(m_serverSocket,
00578 (char*)m_incomingBuffer.data(),
00579 m_incomingBuffer.size(),
00580 0, NULL, NULL);
00581
00582
00583
00584 if (bytesRead < 0)
00585 break;
00586
00587
00588
00589
00590 else if (bytesRead < (int)sizeof(wire::Header))
00591 CRL_EXCEPTION("undersized packet: %d/%d bytes\n",
00592 bytesRead, sizeof(wire::Header));
00593
00594
00595
00596
00597 const uint8_t *inP = reinterpret_cast<const uint8_t*>(m_incomingBuffer.data());
00598
00599
00600
00601
00602 const wire::Header& header = *(reinterpret_cast<const wire::Header*>(inP));
00603
00604 if (wire::HEADER_MAGIC != header.magic)
00605 CRL_EXCEPTION("bad protocol magic: 0x%x, expecting 0x%x",
00606 header.magic, wire::HEADER_MAGIC);
00607 else if (wire::HEADER_VERSION != header.version)
00608 CRL_EXCEPTION("bad protocol version: 0x%x, expecting 0x%x",
00609 header.version, wire::HEADER_VERSION);
00610 else if (wire::HEADER_GROUP != header.group)
00611 CRL_EXCEPTION("bad protocol group: 0x%x, expecting 0x%x",
00612 header.group, wire::HEADER_GROUP);
00613
00614
00615
00616
00617 const int64_t& sequence = unwrapSequenceId(header.sequenceIdentifier);
00618
00619
00620
00621
00622 UdpTracker *trP = m_udpTrackerCache.find(sequence);
00623 if (NULL == trP) {
00624
00625
00626
00627
00628
00629
00630 if (0 != header.byteOffset)
00631 continue;
00632 else {
00633
00634
00635
00636
00637 trP = new UdpTracker(header.messageLength,
00638 getUdpAssembler(inP, bytesRead),
00639 findFreeBuffer(header.messageLength));
00640 }
00641 }
00642
00643
00644
00645
00646
00647 if (true == trP->assemble(bytesRead - sizeof(wire::Header),
00648 header.byteOffset,
00649 &(inP[sizeof(wire::Header)]))) {
00650
00651
00652
00653
00654 dispatch(trP->stream());
00655
00656
00657
00658
00659 if (1 == trP->packets())
00660 delete trP;
00661 else
00662 m_udpTrackerCache.remove(sequence);
00663
00664 } else if (1 == trP->packets()) {
00665
00666
00667
00668
00669
00670 m_udpTrackerCache.insert(sequence, trP);
00671 }
00672 }
00673 }
00674
00675
00676
00677
00678 #if WIN32
00679 DWORD impl::rxThread(void *userDataP)
00680 #else
00681 void *impl::rxThread(void *userDataP)
00682 #endif
00683 {
00684 impl *selfP = reinterpret_cast<impl*>(userDataP);
00685 const int server = selfP->m_serverSocket;
00686 fd_set readSet;
00687
00688
00689
00690
00691 while(selfP->m_threadsRunning) {
00692
00693
00694
00695
00696 FD_ZERO(&readSet);
00697 FD_SET(server, &readSet);
00698
00699
00700
00701
00702 struct timeval tv = {0, 200000};
00703 const int result = select(server+1, &readSet, NULL, NULL, &tv);
00704 if (result <= 0)
00705 continue;
00706
00707
00708
00709
00710 try {
00711
00712 selfP->handle();
00713
00714 } catch (const std::exception& e) {
00715
00716 CRL_DEBUG("exception while decoding packet: %s\n", e.what());
00717
00718 } catch ( ... ) {
00719
00720 CRL_DEBUG("unknown exception while decoding packet\n");
00721 }
00722 }
00723
00724 return NULL;
00725 }
00726
00727 }}};