00001
00037 #include "details/channel.hh"
00038 #include "details/query.hh"
00039
00040 #include "details/wire/DisparityMessage.h"
00041 #include "details/wire/SysMtuMessage.h"
00042 #include "details/wire/SysGetMtuMessage.h"
00043 #include "details/wire/StatusRequestMessage.h"
00044 #include "details/wire/StatusResponseMessage.h"
00045 #include "details/wire/VersionRequestMessage.h"
00046 #include "details/wire/SysDeviceInfoMessage.h"
00047
00048 #include "details/utility/Functional.hh"
00049
00050 #ifndef WIN32
00051 #include <netdb.h>
00052 #endif
00053 #include <errno.h>
00054 #include <fcntl.h>
00055
00056 namespace crl {
00057 namespace multisense {
00058 namespace details {
00059
00060
00061
00062
00063 impl::impl(const std::string& address) :
00064 m_serverSocket(-1),
00065 m_serverSocketPort(0),
00066 m_sensorAddress(),
00067 m_sensorMtu(MAX_MTU_SIZE),
00068 m_incomingBuffer(MAX_MTU_SIZE),
00069 m_txSeqId(0),
00070 m_lastRxSeqId(-1),
00071 m_unWrappedRxSeqId(0),
00072 m_udpTrackerCache(UDP_TRACKER_CACHE_DEPTH, 0),
00073 m_rxLargeBufferPool(),
00074 m_rxSmallBufferPool(),
00075 m_imageMetaCache(IMAGE_META_CACHE_DEPTH, 0),
00076 m_udpAssemblerMap(),
00077 m_dispatchLock(),
00078 m_streamLock(),
00079 m_threadsRunning(false),
00080 m_rxThreadP(NULL),
00081 m_rxLock(),
00082 m_statusThreadP(NULL),
00083 m_imageListeners(),
00084 m_lidarListeners(),
00085 m_ppsListeners(),
00086 m_imuListeners(),
00087 m_watch(),
00088 m_messages(),
00089 m_streamsEnabled(0),
00090 m_timeLock(),
00091 m_timeOffsetInit(false),
00092 m_timeOffset(0),
00093 m_networkTimeSyncEnabled(true),
00094 m_sensorVersion()
00095 {
00096 #if WIN32
00097 WSADATA wsaData;
00098 int result = WSAStartup (MAKEWORD (0x02, 0x02), &wsaData);
00099 if (result != 0)
00100 CRL_EXCEPTION("WSAStartup() failed: %d", result);
00101 #endif
00102
00103
00104
00105
00106 struct hostent *hostP = gethostbyname(address.c_str());
00107 if (NULL == hostP)
00108 CRL_EXCEPTION("unable to resolve \"%s\": %s",
00109 address.c_str(), strerror(errno));
00110
00111
00112
00113
00114 in_addr addr;
00115
00116 memcpy(&(addr.s_addr), hostP->h_addr, hostP->h_length);
00117 memset(&m_sensorAddress, 0, sizeof(m_sensorAddress));
00118
00119 m_sensorAddress.sin_family = AF_INET;
00120 m_sensorAddress.sin_port = htons(DEFAULT_SENSOR_TX_PORT);
00121 m_sensorAddress.sin_addr = addr;
00122
00123
00124
00125
00126 for(uint32_t i=0; i<RX_POOL_LARGE_BUFFER_COUNT; i++)
00127 m_rxLargeBufferPool.push_back(new utility::BufferStreamWriter(RX_POOL_LARGE_BUFFER_SIZE));
00128 for(uint32_t i=0; i<RX_POOL_SMALL_BUFFER_COUNT; i++)
00129 m_rxSmallBufferPool.push_back(new utility::BufferStreamWriter(RX_POOL_SMALL_BUFFER_SIZE));
00130
00131
00132
00133
00134 bind();
00135
00136
00137
00138
00139 m_udpAssemblerMap[MSG_ID(wire::Disparity::ID)] = wire::Disparity::assembler;
00140
00141
00142
00143
00144 m_threadsRunning = true;
00145 m_rxThreadP = new utility::Thread(rxThread, this);
00146
00147
00148
00149
00150 wire::SysMtu mtu;
00151
00152 Status status = waitData(wire::SysGetMtu(), mtu);
00153 if (Status_Ok != status) {
00154 cleanup();
00155 CRL_EXCEPTION("failed to establish comms with the sensor at \"%s\"",
00156 address.c_str());
00157 } else {
00158
00159
00160
00161
00162 m_sensorMtu = mtu.mtu;
00163 }
00164
00165
00166
00167
00168 status = waitData(wire::VersionRequest(), m_sensorVersion);
00169 if (Status_Ok != status) {
00170 cleanup();
00171 CRL_EXCEPTION("failed to request version info from sensor at \"%s\"",
00172 address.c_str());
00173 }
00174
00175
00176
00177
00178 m_statusThreadP = new utility::Thread(statusThread, this);
00179 }
00180
00181
00182
00183
00184 void impl::cleanup()
00185 {
00186 m_threadsRunning = false;
00187
00188 if (m_rxThreadP)
00189 delete m_rxThreadP;
00190 if (m_statusThreadP)
00191 delete m_statusThreadP;
00192
00193 std::list<ImageListener*>::const_iterator iti;
00194 for(iti = m_imageListeners.begin();
00195 iti != m_imageListeners.end();
00196 iti ++)
00197 delete *iti;
00198 std::list<LidarListener*>::const_iterator itl;
00199 for(itl = m_lidarListeners.begin();
00200 itl != m_lidarListeners.end();
00201 itl ++)
00202 delete *itl;
00203 std::list<PpsListener*>::const_iterator itp;
00204 for(itp = m_ppsListeners.begin();
00205 itp != m_ppsListeners.end();
00206 itp ++)
00207 delete *itp;
00208 std::list<ImuListener*>::const_iterator itm;
00209 for(itm = m_imuListeners.begin();
00210 itm != m_imuListeners.end();
00211 itm ++)
00212 delete *itm;
00213
00214 BufferPool::const_iterator it;
00215 for(it = m_rxLargeBufferPool.begin();
00216 it != m_rxLargeBufferPool.end();
00217 ++it)
00218 delete *it;
00219 for(it = m_rxSmallBufferPool.begin();
00220 it != m_rxSmallBufferPool.end();
00221 ++it)
00222 delete *it;
00223
00224 if (m_serverSocket > 0)
00225 closesocket(m_serverSocket);
00226
00227 #if WIN32
00228 WSACleanup ();
00229 #endif
00230 }
00231
00232
00233
00234
00235 impl::~impl()
00236 {
00237 cleanup();
00238 }
00239
00240
00241
00242
00243
00244 void impl::bind()
00245 {
00246
00247
00248
00249 m_serverSocket = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP);
00250 if (m_serverSocket < 0)
00251 CRL_EXCEPTION("failed to create the UDP socket: %s",
00252 strerror(errno));
00253
00254
00255
00256 #if WIN32
00257 u_long ioctl_arg = 1;
00258 if (0 != ioctlsocket(m_serverSocket, FIONBIO, &ioctl_arg))
00259 CRL_EXCEPTION("failed to make a socket non-blocking: %d",WSAGetLastError ());
00260 #else
00261 const int flags = fcntl(m_serverSocket, F_GETFL, 0);
00262
00263 if (0 != fcntl(m_serverSocket, F_SETFL, flags | O_NONBLOCK))
00264 CRL_EXCEPTION("failed to make a socket non-blocking: %s",
00265 strerror(errno));
00266 #endif
00267
00268
00269
00270
00271 int reuseSocket = 1;
00272
00273 if (0 != setsockopt(m_serverSocket, SOL_SOCKET, SO_REUSEADDR, (char*) &reuseSocket,
00274 sizeof(reuseSocket)))
00275 CRL_EXCEPTION("failed to turn on socket reuse flag: %s",
00276 strerror(errno));
00277
00278
00279
00280
00281 int bufferSize = 48 * 1024 * 1024;
00282
00283 if (0 != setsockopt(m_serverSocket, SOL_SOCKET, SO_RCVBUF, (char*) &bufferSize,
00284 sizeof(bufferSize)) ||
00285 0 != setsockopt(m_serverSocket, SOL_SOCKET, SO_SNDBUF, (char*) &bufferSize,
00286 sizeof(bufferSize)))
00287 CRL_EXCEPTION("failed to adjust socket buffer sizes (%d bytes): %s",
00288 bufferSize, strerror(errno));
00289
00290
00291
00292
00293 struct sockaddr_in address;
00294
00295 address.sin_family = AF_INET;
00296 address.sin_port = htons(0);
00297 address.sin_addr.s_addr = htonl(INADDR_ANY);
00298
00299 if (0 != ::bind(m_serverSocket, (struct sockaddr*) &address, sizeof(address)))
00300 CRL_EXCEPTION("failed to bind the server socket to system-assigned port: %s",
00301 strerror(errno));
00302
00303
00304
00305 #if WIN32
00306 int len = sizeof(address);
00307 #else
00308 socklen_t len = sizeof(address);
00309 #endif
00310 if (0 != getsockname(m_serverSocket, (struct sockaddr*) &address, &len))
00311 CRL_EXCEPTION("getsockname() failed: %s", strerror(errno));
00312 m_serverSocketPort = htons(address.sin_port);
00313 }
00314
00315
00316
00317
00318 void impl::publish(const utility::BufferStreamWriter& stream)
00319 {
00320
00321
00322
00323 wire::Header& header = *(reinterpret_cast<wire::Header*>(stream.data()));
00324
00325 header.magic = wire::HEADER_MAGIC;
00326 header.version = wire::HEADER_VERSION;
00327 header.group = wire::HEADER_GROUP;
00328 header.flags = 0;
00329 #if WIN32
00330
00331 header.sequenceIdentifier = InterlockedIncrement16((short*)&m_txSeqId);
00332 #else
00333
00334 header.sequenceIdentifier = __sync_fetch_and_add(&m_txSeqId, 1);
00335 #endif
00336 header.messageLength = stream.tell() - sizeof(wire::Header);
00337 header.byteOffset = 0;
00338
00339
00340
00341
00342 const int32_t ret = sendto(m_serverSocket, (char*)stream.data(), stream.tell(), 0,
00343 (struct sockaddr *) &m_sensorAddress,
00344 sizeof(m_sensorAddress));
00345
00346 if (static_cast<size_t>(ret) != stream.tell())
00347 CRL_EXCEPTION("error sending data to sensor, %d/%d bytes written: %s",
00348 ret, stream.tell(), strerror(errno));
00349 }
00350
00351
00352
00353
00354
00355 wire::SourceType impl::sourceApiToWire(DataSource mask)
00356 {
00357 wire::SourceType wire_mask = 0;
00358
00359 if (mask & Source_Raw_Left) wire_mask |= wire::SOURCE_RAW_LEFT;
00360 if (mask & Source_Raw_Right) wire_mask |= wire::SOURCE_RAW_RIGHT;
00361 if (mask & Source_Luma_Left) wire_mask |= wire::SOURCE_LUMA_LEFT;
00362 if (mask & Source_Luma_Right) wire_mask |= wire::SOURCE_LUMA_RIGHT;
00363 if (mask & Source_Luma_Rectified_Left) wire_mask |= wire::SOURCE_LUMA_RECT_LEFT;
00364 if (mask & Source_Luma_Rectified_Right) wire_mask |= wire::SOURCE_LUMA_RECT_RIGHT;
00365 if (mask & Source_Chroma_Left) wire_mask |= wire::SOURCE_CHROMA_LEFT;
00366 if (mask & Source_Chroma_Right) wire_mask |= wire::SOURCE_CHROMA_RIGHT;
00367 if (mask & Source_Disparity) wire_mask |= wire::SOURCE_DISPARITY;
00368 if (mask & Source_Disparity_Right) wire_mask |= wire::SOURCE_DISPARITY_RIGHT;
00369 if (mask & Source_Disparity_Cost) wire_mask |= wire::SOURCE_DISPARITY_COST;
00370 if (mask & Source_Jpeg_Left) wire_mask |= wire::SOURCE_JPEG_LEFT;
00371 if (mask & Source_Rgb_Left) wire_mask |= wire::SOURCE_RGB_LEFT;
00372 if (mask & Source_Lidar_Scan) wire_mask |= wire::SOURCE_LIDAR_SCAN;
00373 if (mask & Source_Imu) wire_mask |= wire::SOURCE_IMU;
00374 if (mask & Source_Pps) wire_mask |= wire::SOURCE_PPS;
00375
00376 return wire_mask;
00377 };
00378
00379 DataSource impl::sourceWireToApi(wire::SourceType mask)
00380 {
00381 DataSource api_mask = 0;
00382
00383 if (mask & wire::SOURCE_RAW_LEFT) api_mask |= Source_Raw_Left;
00384 if (mask & wire::SOURCE_RAW_RIGHT) api_mask |= Source_Raw_Right;
00385 if (mask & wire::SOURCE_LUMA_LEFT) api_mask |= Source_Luma_Left;
00386 if (mask & wire::SOURCE_LUMA_RIGHT) api_mask |= Source_Luma_Right;
00387 if (mask & wire::SOURCE_LUMA_RECT_LEFT) api_mask |= Source_Luma_Rectified_Left;
00388 if (mask & wire::SOURCE_LUMA_RECT_RIGHT) api_mask |= Source_Luma_Rectified_Right;
00389 if (mask & wire::SOURCE_CHROMA_LEFT) api_mask |= Source_Chroma_Left;
00390 if (mask & wire::SOURCE_CHROMA_RIGHT) api_mask |= Source_Chroma_Right;
00391 if (mask & wire::SOURCE_DISPARITY) api_mask |= Source_Disparity;
00392 if (mask & wire::SOURCE_DISPARITY_RIGHT) api_mask |= Source_Disparity_Right;
00393 if (mask & wire::SOURCE_DISPARITY_COST) api_mask |= Source_Disparity_Cost;
00394 if (mask & wire::SOURCE_JPEG_LEFT) api_mask |= Source_Jpeg_Left;
00395 if (mask & wire::SOURCE_RGB_LEFT) api_mask |= Source_Rgb_Left;
00396 if (mask & wire::SOURCE_LIDAR_SCAN) api_mask |= Source_Lidar_Scan;
00397 if (mask & wire::SOURCE_IMU) api_mask |= Source_Imu;
00398 if (mask & wire::SOURCE_PPS) api_mask |= Source_Pps;
00399
00400 return api_mask;
00401 };
00402
00403 uint32_t impl::hardwareApiToWire(uint32_t a)
00404 {
00405 switch(a) {
00406 case system::DeviceInfo::HARDWARE_REV_MULTISENSE_SL: return wire::SysDeviceInfo::HARDWARE_REV_MULTISENSE_SL;
00407 case system::DeviceInfo::HARDWARE_REV_MULTISENSE_S7: return wire::SysDeviceInfo::HARDWARE_REV_MULTISENSE_S7;
00408 case system::DeviceInfo::HARDWARE_REV_MULTISENSE_M: return wire::SysDeviceInfo::HARDWARE_REV_MULTISENSE_M;
00409 case system::DeviceInfo::HARDWARE_REV_MULTISENSE_S7S: return wire::SysDeviceInfo::HARDWARE_REV_MULTISENSE_S7S;
00410 case system::DeviceInfo::HARDWARE_REV_MULTISENSE_S21: return wire::SysDeviceInfo::HARDWARE_REV_MULTISENSE_S21;
00411 case system::DeviceInfo::HARDWARE_REV_MULTISENSE_ST21: return wire::SysDeviceInfo::HARDWARE_REV_MULTISENSE_ST21;
00412 case system::DeviceInfo::HARDWARE_REV_BCAM: return wire::SysDeviceInfo::HARDWARE_REV_BCAM;
00413 default:
00414 CRL_DEBUG("unknown API hardware type \"%d\"\n", a);
00415 return a;
00416 }
00417 }
00418 uint32_t impl::hardwareWireToApi(uint32_t w)
00419 {
00420 switch(w) {
00421 case wire::SysDeviceInfo::HARDWARE_REV_MULTISENSE_SL: return system::DeviceInfo::HARDWARE_REV_MULTISENSE_SL;
00422 case wire::SysDeviceInfo::HARDWARE_REV_MULTISENSE_S7: return system::DeviceInfo::HARDWARE_REV_MULTISENSE_S7;
00423 case wire::SysDeviceInfo::HARDWARE_REV_MULTISENSE_M: return system::DeviceInfo::HARDWARE_REV_MULTISENSE_M;
00424 case wire::SysDeviceInfo::HARDWARE_REV_MULTISENSE_S7S: return system::DeviceInfo::HARDWARE_REV_MULTISENSE_S7S;
00425 case wire::SysDeviceInfo::HARDWARE_REV_MULTISENSE_S21: return system::DeviceInfo::HARDWARE_REV_MULTISENSE_S21;
00426 case wire::SysDeviceInfo::HARDWARE_REV_MULTISENSE_ST21: return system::DeviceInfo::HARDWARE_REV_MULTISENSE_ST21;
00427 case wire::SysDeviceInfo::HARDWARE_REV_BCAM: return system::DeviceInfo::HARDWARE_REV_BCAM;
00428 default:
00429 CRL_DEBUG("unknown WIRE hardware type \"%d\"\n", w);
00430 return w;
00431 }
00432 }
00433 uint32_t impl::imagerApiToWire(uint32_t a)
00434 {
00435 switch(a) {
00436 case system::DeviceInfo::IMAGER_TYPE_CMV2000_GREY: return wire::SysDeviceInfo::IMAGER_TYPE_CMV2000_GREY;
00437 case system::DeviceInfo::IMAGER_TYPE_CMV2000_COLOR: return wire::SysDeviceInfo::IMAGER_TYPE_CMV2000_COLOR;
00438 case system::DeviceInfo::IMAGER_TYPE_CMV4000_GREY: return wire::SysDeviceInfo::IMAGER_TYPE_CMV4000_GREY;
00439 case system::DeviceInfo::IMAGER_TYPE_CMV4000_COLOR: return wire::SysDeviceInfo::IMAGER_TYPE_CMV4000_COLOR;
00440 case system::DeviceInfo::IMAGER_TYPE_IMX104_COLOR: return wire::SysDeviceInfo::IMAGER_TYPE_IMX104_COLOR;
00441 default:
00442 CRL_DEBUG("unknown API imager type \"%d\"\n", a);
00443 return a;
00444 }
00445 }
00446 uint32_t impl::imagerWireToApi(uint32_t w)
00447 {
00448 switch(w) {
00449 case wire::SysDeviceInfo::IMAGER_TYPE_CMV2000_GREY: return system::DeviceInfo::IMAGER_TYPE_CMV2000_GREY;
00450 case wire::SysDeviceInfo::IMAGER_TYPE_CMV2000_COLOR: return system::DeviceInfo::IMAGER_TYPE_CMV2000_COLOR;
00451 case wire::SysDeviceInfo::IMAGER_TYPE_CMV4000_GREY: return system::DeviceInfo::IMAGER_TYPE_CMV4000_GREY;
00452 case wire::SysDeviceInfo::IMAGER_TYPE_CMV4000_COLOR: return system::DeviceInfo::IMAGER_TYPE_CMV4000_COLOR;
00453 case wire::SysDeviceInfo::IMAGER_TYPE_IMX104_COLOR: return system::DeviceInfo::IMAGER_TYPE_IMX104_COLOR;
00454 default:
00455 CRL_DEBUG("unknown WIRE imager type \"%d\"\n", w);
00456 return w;
00457 }
00458 }
00459
00460
00461
00462
00463 void impl::applySensorTimeOffset(const double& offset)
00464 {
00465 utility::ScopedLock lock(m_timeLock);
00466
00467 if (false == m_timeOffsetInit) {
00468 m_timeOffset = offset;
00469 m_timeOffsetInit = true;
00470 return;
00471 }
00472
00473 const double samples = static_cast<double>(TIME_SYNC_OFFSET_DECAY);
00474
00475 m_timeOffset = utility::decayedAverage(m_timeOffset, samples, offset);
00476 }
00477
00478
00479
00480
00481 double impl::sensorToLocalTime(const double& sensorTime)
00482 {
00483 utility::ScopedLock lock(m_timeLock);
00484 return m_timeOffset + sensorTime;
00485 }
00486
00487
00488
00489
00490 void impl::sensorToLocalTime(const double& sensorTime,
00491 uint32_t& seconds,
00492 uint32_t& microseconds)
00493 {
00494 double corrected = sensorToLocalTime(sensorTime);
00495 seconds = static_cast<uint32_t>(corrected);
00496 microseconds = static_cast<uint32_t>(1e6 * (corrected - static_cast<double>(seconds)));
00497 }
00498
00499
00500
00501
00502 #ifdef WIN32
00503 DWORD impl::statusThread(void *userDataP)
00504 #else
00505 void *impl::statusThread(void *userDataP)
00506 #endif
00507 {
00508 impl *selfP = reinterpret_cast<impl*>(userDataP);
00509
00510
00511
00512
00513 while(selfP->m_threadsRunning) {
00514
00515 try {
00516
00517
00518
00519
00520 ScopedWatch ack(wire::StatusResponse::ID, selfP->m_watch);
00521
00522
00523
00524
00525 const double ping = utility::TimeStamp::getCurrentTime();
00526 selfP->publish(wire::StatusRequest());
00527
00528
00529
00530
00531 Status status;
00532 if (ack.wait(status, 0.010)) {
00533
00534
00535
00536
00537 const double pong = utility::TimeStamp::getCurrentTime();
00538
00539
00540
00541
00542 wire::StatusResponse msg;
00543 selfP->m_messages.extract(msg);
00544
00545
00546
00547
00548
00549 const double latency = (pong - ping) / 2.0;
00550
00551
00552
00553
00554 const double offset = (ping + latency) - static_cast<double>(msg.uptime);
00555 selfP->applySensorTimeOffset(offset);
00556
00557
00558
00559
00560 selfP->m_statusResponseMessage = msg;
00561 }
00562
00563 } catch (const std::exception& e) {
00564
00565 CRL_DEBUG("exception: %s\n", e.what());
00566
00567 } catch (...) {
00568
00569 CRL_DEBUG("unknown exception\n");
00570 }
00571
00572
00573
00574
00575 usleep(static_cast<unsigned int> (1e6));
00576 }
00577
00578 return NULL;
00579 }
00580
00581 };
00582
00583 Channel* Channel::Create(const std::string& address)
00584 {
00585 try {
00586
00587 return new details::impl(address);
00588
00589 } catch (const std::exception& e) {
00590
00591 CRL_DEBUG("exception: %s\n", e.what());
00592 return NULL;
00593 }
00594 }
00595
00596 void Channel::Destroy(Channel *instanceP)
00597 {
00598 try {
00599
00600 if (instanceP)
00601 delete static_cast<details::impl*>(instanceP);
00602
00603 } catch (const std::exception& e) {
00604
00605 CRL_DEBUG("exception: %s\n", e.what());
00606 }
00607 }
00608
00609 const char *Channel::statusString(Status status)
00610 {
00611 switch(status) {
00612 case Status_Ok: return "Ok";
00613 case Status_TimedOut: return "Timed out";
00614 case Status_Error: return "Error";
00615 case Status_Failed: return "Failed";
00616 case Status_Unsupported: return "Unsupported";
00617 case Status_Unknown: return "Unknown command";
00618 case Status_Exception: return "Exception";
00619 }
00620
00621 return "Unknown Error";
00622 }
00623
00624 };
00625 };