00001
00037 #include "details/channel.hh"
00038 #include "details/query.hh"
00039
00040 #include "details/wire/DisparityMessage.h"
00041 #include "details/wire/SysMtuMessage.h"
00042 #include "details/wire/SysGetMtuMessage.h"
00043 #include "details/wire/StatusRequestMessage.h"
00044 #include "details/wire/StatusResponseMessage.h"
00045 #include "details/wire/VersionRequestMessage.h"
00046 #include "details/wire/SysDeviceInfoMessage.h"
00047
00048 #include "details/utility/Functional.hh"
00049
00050 #ifndef WIN32
00051 #include <netdb.h>
00052 #endif
00053 #include <errno.h>
00054 #include <fcntl.h>
00055
00056 namespace crl {
00057 namespace multisense {
00058 namespace details {
00059
00060
00061
00062
00063 impl::impl(const std::string& address) :
00064 m_serverSocket(-1),
00065 m_serverSocketPort(0),
00066 m_sensorAddress(),
00067 m_sensorMtu(MAX_MTU_SIZE),
00068 m_incomingBuffer(MAX_MTU_SIZE),
00069 m_txSeqId(0),
00070 m_lastRxSeqId(-1),
00071 m_unWrappedRxSeqId(0),
00072 m_udpTrackerCache(UDP_TRACKER_CACHE_DEPTH, 0),
00073 m_rxLargeBufferPool(),
00074 m_rxSmallBufferPool(),
00075 m_imageMetaCache(IMAGE_META_CACHE_DEPTH, 0),
00076 m_udpAssemblerMap(),
00077 m_dispatchLock(),
00078 m_streamLock(),
00079 m_threadsRunning(false),
00080 m_rxThreadP(NULL),
00081 m_rxLock(),
00082 m_statusThreadP(NULL),
00083 m_imageListeners(),
00084 m_lidarListeners(),
00085 m_ppsListeners(),
00086 m_imuListeners(),
00087 m_watch(),
00088 m_messages(),
00089 m_streamsEnabled(0),
00090 m_timeLock(),
00091 m_timeOffsetInit(false),
00092 m_timeOffset(0),
00093 m_networkTimeSyncEnabled(true),
00094 m_sensorVersion()
00095 {
00096 #if WIN32
00097 WSADATA wsaData;
00098 int result = WSAStartup (MAKEWORD (0x02, 0x02), &wsaData);
00099 if (result != 0)
00100 CRL_EXCEPTION("WSAStartup() failed: %d", result);
00101 #endif
00102
00103
00104
00105
00106 struct hostent *hostP = gethostbyname(address.c_str());
00107 if (NULL == hostP)
00108 CRL_EXCEPTION("unable to resolve \"%s\": %s",
00109 address.c_str(), strerror(errno));
00110
00111
00112
00113
00114 in_addr addr;
00115
00116 memcpy(&(addr.s_addr), hostP->h_addr, hostP->h_length);
00117 memset(&m_sensorAddress, 0, sizeof(m_sensorAddress));
00118
00119 m_sensorAddress.sin_family = AF_INET;
00120 m_sensorAddress.sin_port = htons(DEFAULT_SENSOR_TX_PORT);
00121 m_sensorAddress.sin_addr = addr;
00122
00123
00124
00125
00126 for(uint32_t i=0; i<RX_POOL_LARGE_BUFFER_COUNT; i++)
00127 m_rxLargeBufferPool.push_back(new utility::BufferStreamWriter(RX_POOL_LARGE_BUFFER_SIZE));
00128 for(uint32_t i=0; i<RX_POOL_SMALL_BUFFER_COUNT; i++)
00129 m_rxSmallBufferPool.push_back(new utility::BufferStreamWriter(RX_POOL_SMALL_BUFFER_SIZE));
00130
00131
00132
00133
00134 bind();
00135
00136
00137
00138
00139 m_udpAssemblerMap[MSG_ID(wire::Disparity::ID)] = wire::Disparity::assembler;
00140
00141
00142
00143
00144 m_threadsRunning = true;
00145 m_rxThreadP = new utility::Thread(rxThread, this);
00146
00147
00148
00149
00150 wire::SysMtu mtu;
00151
00152 Status status = waitData(wire::SysGetMtu(), mtu);
00153 if (Status_Ok != status) {
00154 cleanup();
00155 CRL_EXCEPTION("failed to establish comms with the sensor at \"%s\"",
00156 address.c_str());
00157 } else {
00158
00159
00160
00161
00162 m_sensorMtu = mtu.mtu;
00163 }
00164
00165
00166
00167
00168 status = waitData(wire::VersionRequest(), m_sensorVersion);
00169 if (Status_Ok != status) {
00170 cleanup();
00171 CRL_EXCEPTION("failed to request version info from sensor at \"%s\"",
00172 address.c_str());
00173 }
00174
00175
00176
00177
00178 m_statusThreadP = new utility::Thread(statusThread, this);
00179 }
00180
00181
00182
00183
00184 void impl::cleanup()
00185 {
00186 m_threadsRunning = false;
00187
00188 if (m_rxThreadP)
00189 delete m_rxThreadP;
00190 if (m_statusThreadP)
00191 delete m_statusThreadP;
00192
00193 std::list<ImageListener*>::const_iterator iti;
00194 for(iti = m_imageListeners.begin();
00195 iti != m_imageListeners.end();
00196 iti ++)
00197 delete *iti;
00198 std::list<LidarListener*>::const_iterator itl;
00199 for(itl = m_lidarListeners.begin();
00200 itl != m_lidarListeners.end();
00201 itl ++)
00202 delete *itl;
00203 std::list<PpsListener*>::const_iterator itp;
00204 for(itp = m_ppsListeners.begin();
00205 itp != m_ppsListeners.end();
00206 itp ++)
00207 delete *itp;
00208 std::list<ImuListener*>::const_iterator itm;
00209 for(itm = m_imuListeners.begin();
00210 itm != m_imuListeners.end();
00211 itm ++)
00212 delete *itm;
00213
00214 BufferPool::const_iterator it;
00215 for(it = m_rxLargeBufferPool.begin();
00216 it != m_rxLargeBufferPool.end();
00217 ++it)
00218 delete *it;
00219 for(it = m_rxSmallBufferPool.begin();
00220 it != m_rxSmallBufferPool.end();
00221 ++it)
00222 delete *it;
00223
00224 if (m_serverSocket > 0)
00225 closesocket(m_serverSocket);
00226
00227 #if WIN32
00228 WSACleanup ();
00229 #endif
00230 }
00231
00232
00233
00234
00235 impl::~impl()
00236 {
00237 cleanup();
00238 }
00239
00240
00241
00242
00243
00244 void impl::bind()
00245 {
00246
00247
00248
00249 m_serverSocket = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP);
00250 if (m_serverSocket < 0)
00251 CRL_EXCEPTION("failed to create the UDP socket: %s",
00252 strerror(errno));
00253
00254
00255
00256 #if WIN32
00257 u_long ioctl_arg = 1;
00258 if (0 != ioctlsocket(m_serverSocket, FIONBIO, &ioctl_arg))
00259 CRL_EXCEPTION("failed to make a socket non-blocking: %d",WSAGetLastError ());
00260 #else
00261 const int flags = fcntl(m_serverSocket, F_GETFL, 0);
00262
00263 if (0 != fcntl(m_serverSocket, F_SETFL, flags | O_NONBLOCK))
00264 CRL_EXCEPTION("failed to make a socket non-blocking: %s",
00265 strerror(errno));
00266 #endif
00267
00268
00269
00270
00271 int reuseSocket = 1;
00272
00273 if (0 != setsockopt(m_serverSocket, SOL_SOCKET, SO_REUSEADDR, (char*) &reuseSocket,
00274 sizeof(reuseSocket)))
00275 CRL_EXCEPTION("failed to turn on socket reuse flag: %s",
00276 strerror(errno));
00277
00278
00279
00280
00281 int bufferSize = 48 * 1024 * 1024;
00282
00283 if (0 != setsockopt(m_serverSocket, SOL_SOCKET, SO_RCVBUF, (char*) &bufferSize,
00284 sizeof(bufferSize)) ||
00285 0 != setsockopt(m_serverSocket, SOL_SOCKET, SO_SNDBUF, (char*) &bufferSize,
00286 sizeof(bufferSize)))
00287 CRL_EXCEPTION("failed to adjust socket buffer sizes (%d bytes): %s",
00288 bufferSize, strerror(errno));
00289
00290
00291
00292
00293 struct sockaddr_in address;
00294
00295 address.sin_family = AF_INET;
00296 address.sin_port = htons(0);
00297 address.sin_addr.s_addr = htonl(INADDR_ANY);
00298
00299 if (0 != ::bind(m_serverSocket, (struct sockaddr*) &address, sizeof(address)))
00300 CRL_EXCEPTION("failed to bind the server socket to system-assigned port: %s",
00301 strerror(errno));
00302
00303
00304
00305 #if WIN32
00306 int len = sizeof(address);
00307 #else
00308 socklen_t len = sizeof(address);
00309 #endif
00310 if (0 != getsockname(m_serverSocket, (struct sockaddr*) &address, &len))
00311 CRL_EXCEPTION("getsockname() failed: %s", strerror(errno));
00312 m_serverSocketPort = htons(address.sin_port);
00313 }
00314
00315
00316
00317
00318 void impl::publish(const utility::BufferStreamWriter& stream)
00319 {
00320
00321
00322
00323 wire::Header& header = *(reinterpret_cast<wire::Header*>(stream.data()));
00324
00325 header.magic = wire::HEADER_MAGIC;
00326 header.version = wire::HEADER_VERSION;
00327 header.group = wire::HEADER_GROUP;
00328 header.flags = 0;
00329 #if WIN32
00330
00331 header.sequenceIdentifier = InterlockedIncrement16((short*)&m_txSeqId);
00332 #else
00333
00334 header.sequenceIdentifier = __sync_fetch_and_add(&m_txSeqId, 1);
00335 #endif
00336 header.messageLength = stream.tell() - sizeof(wire::Header);
00337 header.byteOffset = 0;
00338
00339
00340
00341
00342 const int32_t ret = sendto(m_serverSocket, (char*)stream.data(), stream.tell(), 0,
00343 (struct sockaddr *) &m_sensorAddress,
00344 sizeof(m_sensorAddress));
00345
00346 if (static_cast<size_t>(ret) != stream.tell())
00347 CRL_EXCEPTION("error sending data to sensor, %d/%d bytes written: %s",
00348 ret, stream.tell(), strerror(errno));
00349 }
00350
00351
00352
00353
00354
00355 wire::SourceType impl::sourceApiToWire(DataSource mask)
00356 {
00357 wire::SourceType wire_mask = 0;
00358
00359 if (mask & Source_Raw_Left) wire_mask |= wire::SOURCE_RAW_LEFT;
00360 if (mask & Source_Raw_Right) wire_mask |= wire::SOURCE_RAW_RIGHT;
00361 if (mask & Source_Luma_Left) wire_mask |= wire::SOURCE_LUMA_LEFT;
00362 if (mask & Source_Luma_Right) wire_mask |= wire::SOURCE_LUMA_RIGHT;
00363 if (mask & Source_Luma_Rectified_Left) wire_mask |= wire::SOURCE_LUMA_RECT_LEFT;
00364 if (mask & Source_Luma_Rectified_Right) wire_mask |= wire::SOURCE_LUMA_RECT_RIGHT;
00365 if (mask & Source_Chroma_Left) wire_mask |= wire::SOURCE_CHROMA_LEFT;
00366 if (mask & Source_Chroma_Right) wire_mask |= wire::SOURCE_CHROMA_RIGHT;
00367 if (mask & Source_Disparity) wire_mask |= wire::SOURCE_DISPARITY;
00368 if (mask & Source_Disparity_Right) wire_mask |= wire::SOURCE_DISPARITY_RIGHT;
00369 if (mask & Source_Disparity_Cost) wire_mask |= wire::SOURCE_DISPARITY_COST;
00370 if (mask & Source_Jpeg_Left) wire_mask |= wire::SOURCE_JPEG_LEFT;
00371 if (mask & Source_Rgb_Left) wire_mask |= wire::SOURCE_RGB_LEFT;
00372 if (mask & Source_Lidar_Scan) wire_mask |= wire::SOURCE_LIDAR_SCAN;
00373 if (mask & Source_Imu) wire_mask |= wire::SOURCE_IMU;
00374
00375 return wire_mask;
00376 };
00377
00378 DataSource impl::sourceWireToApi(wire::SourceType mask)
00379 {
00380 DataSource api_mask = 0;
00381
00382 if (mask & wire::SOURCE_RAW_LEFT) api_mask |= Source_Raw_Left;
00383 if (mask & wire::SOURCE_RAW_RIGHT) api_mask |= Source_Raw_Right;
00384 if (mask & wire::SOURCE_LUMA_LEFT) api_mask |= Source_Luma_Left;
00385 if (mask & wire::SOURCE_LUMA_RIGHT) api_mask |= Source_Luma_Right;
00386 if (mask & wire::SOURCE_LUMA_RECT_LEFT) api_mask |= Source_Luma_Rectified_Left;
00387 if (mask & wire::SOURCE_LUMA_RECT_RIGHT) api_mask |= Source_Luma_Rectified_Right;
00388 if (mask & wire::SOURCE_CHROMA_LEFT) api_mask |= Source_Chroma_Left;
00389 if (mask & wire::SOURCE_CHROMA_RIGHT) api_mask |= Source_Chroma_Right;
00390 if (mask & wire::SOURCE_DISPARITY) api_mask |= Source_Disparity;
00391 if (mask & wire::SOURCE_DISPARITY_RIGHT) api_mask |= Source_Disparity_Right;
00392 if (mask & wire::SOURCE_DISPARITY_COST) api_mask |= Source_Disparity_Cost;
00393 if (mask & wire::SOURCE_JPEG_LEFT) api_mask |= Source_Jpeg_Left;
00394 if (mask & wire::SOURCE_RGB_LEFT) api_mask |= Source_Rgb_Left;
00395 if (mask & wire::SOURCE_LIDAR_SCAN) api_mask |= Source_Lidar_Scan;
00396 if (mask & wire::SOURCE_IMU) api_mask |= Source_Imu;
00397
00398 return api_mask;
00399 };
00400
00401 uint32_t impl::hardwareApiToWire(uint32_t a)
00402 {
00403 switch(a) {
00404 case system::DeviceInfo::HARDWARE_REV_MULTISENSE_SL: return wire::SysDeviceInfo::HARDWARE_REV_MULTISENSE_SL;
00405 case system::DeviceInfo::HARDWARE_REV_MULTISENSE_S7: return wire::SysDeviceInfo::HARDWARE_REV_MULTISENSE_S7;
00406 case system::DeviceInfo::HARDWARE_REV_MULTISENSE_M: return wire::SysDeviceInfo::HARDWARE_REV_MULTISENSE_M;
00407 case system::DeviceInfo::HARDWARE_REV_MULTISENSE_S7S: return wire::SysDeviceInfo::HARDWARE_REV_MULTISENSE_S7S;
00408 case system::DeviceInfo::HARDWARE_REV_MULTISENSE_S21: return wire::SysDeviceInfo::HARDWARE_REV_MULTISENSE_S21;
00409 case system::DeviceInfo::HARDWARE_REV_MULTISENSE_ST21: return wire::SysDeviceInfo::HARDWARE_REV_MULTISENSE_ST21;
00410 case system::DeviceInfo::HARDWARE_REV_BCAM: return wire::SysDeviceInfo::HARDWARE_REV_BCAM;
00411 default:
00412 CRL_DEBUG("unknown API hardware type \"%d\"\n", a);
00413 return a;
00414 }
00415 }
00416 uint32_t impl::hardwareWireToApi(uint32_t w)
00417 {
00418 switch(w) {
00419 case wire::SysDeviceInfo::HARDWARE_REV_MULTISENSE_SL: return system::DeviceInfo::HARDWARE_REV_MULTISENSE_SL;
00420 case wire::SysDeviceInfo::HARDWARE_REV_MULTISENSE_S7: return system::DeviceInfo::HARDWARE_REV_MULTISENSE_S7;
00421 case wire::SysDeviceInfo::HARDWARE_REV_MULTISENSE_M: return system::DeviceInfo::HARDWARE_REV_MULTISENSE_M;
00422 case wire::SysDeviceInfo::HARDWARE_REV_MULTISENSE_S7S: return system::DeviceInfo::HARDWARE_REV_MULTISENSE_S7S;
00423 case wire::SysDeviceInfo::HARDWARE_REV_MULTISENSE_S21: return system::DeviceInfo::HARDWARE_REV_MULTISENSE_S21;
00424 case wire::SysDeviceInfo::HARDWARE_REV_MULTISENSE_ST21: return system::DeviceInfo::HARDWARE_REV_MULTISENSE_ST21;
00425 case wire::SysDeviceInfo::HARDWARE_REV_BCAM: return system::DeviceInfo::HARDWARE_REV_BCAM;
00426 default:
00427 CRL_DEBUG("unknown WIRE hardware type \"%d\"\n", w);
00428 return w;
00429 }
00430 }
00431 uint32_t impl::imagerApiToWire(uint32_t a)
00432 {
00433 switch(a) {
00434 case system::DeviceInfo::IMAGER_TYPE_CMV2000_GREY: return wire::SysDeviceInfo::IMAGER_TYPE_CMV2000_GREY;
00435 case system::DeviceInfo::IMAGER_TYPE_CMV2000_COLOR: return wire::SysDeviceInfo::IMAGER_TYPE_CMV2000_COLOR;
00436 case system::DeviceInfo::IMAGER_TYPE_CMV4000_GREY: return wire::SysDeviceInfo::IMAGER_TYPE_CMV4000_GREY;
00437 case system::DeviceInfo::IMAGER_TYPE_CMV4000_COLOR: return wire::SysDeviceInfo::IMAGER_TYPE_CMV4000_COLOR;
00438 case system::DeviceInfo::IMAGER_TYPE_IMX104_COLOR: return wire::SysDeviceInfo::IMAGER_TYPE_IMX104_COLOR;
00439 default:
00440 CRL_DEBUG("unknown API imager type \"%d\"\n", a);
00441 return a;
00442 }
00443 }
00444 uint32_t impl::imagerWireToApi(uint32_t w)
00445 {
00446 switch(w) {
00447 case wire::SysDeviceInfo::IMAGER_TYPE_CMV2000_GREY: return system::DeviceInfo::IMAGER_TYPE_CMV2000_GREY;
00448 case wire::SysDeviceInfo::IMAGER_TYPE_CMV2000_COLOR: return system::DeviceInfo::IMAGER_TYPE_CMV2000_COLOR;
00449 case wire::SysDeviceInfo::IMAGER_TYPE_CMV4000_GREY: return system::DeviceInfo::IMAGER_TYPE_CMV4000_GREY;
00450 case wire::SysDeviceInfo::IMAGER_TYPE_CMV4000_COLOR: return system::DeviceInfo::IMAGER_TYPE_CMV4000_COLOR;
00451 case wire::SysDeviceInfo::IMAGER_TYPE_IMX104_COLOR: return system::DeviceInfo::IMAGER_TYPE_IMX104_COLOR;
00452 default:
00453 CRL_DEBUG("unknown WIRE imager type \"%d\"\n", w);
00454 return w;
00455 }
00456 }
00457
00458
00459
00460
00461 void impl::applySensorTimeOffset(const double& offset)
00462 {
00463 utility::ScopedLock lock(m_timeLock);
00464
00465 if (false == m_timeOffsetInit) {
00466 m_timeOffset = offset;
00467 m_timeOffsetInit = true;
00468 return;
00469 }
00470
00471 const double samples = static_cast<double>(TIME_SYNC_OFFSET_DECAY);
00472
00473 m_timeOffset = utility::decayedAverage(m_timeOffset, samples, offset);
00474 }
00475
00476
00477
00478
00479 double impl::sensorToLocalTime(const double& sensorTime)
00480 {
00481 utility::ScopedLock lock(m_timeLock);
00482 return m_timeOffset + sensorTime;
00483 }
00484
00485
00486
00487
00488 void impl::sensorToLocalTime(const double& sensorTime,
00489 uint32_t& seconds,
00490 uint32_t& microseconds)
00491 {
00492 double corrected = sensorToLocalTime(sensorTime);
00493 seconds = static_cast<uint32_t>(corrected);
00494 microseconds = static_cast<uint32_t>(1e6 * (corrected - static_cast<double>(seconds)));
00495 }
00496
00497
00498
00499
00500 #ifdef WIN32
00501 DWORD impl::statusThread(void *userDataP)
00502 #else
00503 void *impl::statusThread(void *userDataP)
00504 #endif
00505 {
00506 impl *selfP = reinterpret_cast<impl*>(userDataP);
00507
00508
00509
00510
00511 while(selfP->m_threadsRunning) {
00512
00513 try {
00514
00515
00516
00517
00518 ScopedWatch ack(wire::StatusResponse::ID, selfP->m_watch);
00519
00520
00521
00522
00523 const double ping = utility::TimeStamp::getCurrentTime();
00524 selfP->publish(wire::StatusRequest());
00525
00526
00527
00528
00529 Status status;
00530 if (ack.wait(status, 0.010)) {
00531
00532
00533
00534
00535 const double pong = utility::TimeStamp::getCurrentTime();
00536
00537
00538
00539
00540 wire::StatusResponse msg;
00541 selfP->m_messages.extract(msg);
00542
00543
00544
00545
00546 const double latency = (pong - ping) / 2.0;
00547
00548
00549
00550
00551 const double offset = (ping + latency) - static_cast<double>(msg.uptime);
00552 selfP->applySensorTimeOffset(offset);
00553 }
00554
00555 } catch (const std::exception& e) {
00556
00557 CRL_DEBUG("exception: %s\n", e.what());
00558
00559 } catch (...) {
00560
00561 CRL_DEBUG("unknown exception\n");
00562 }
00563
00564
00565
00566
00567 usleep(static_cast<unsigned int> (1e6));
00568 }
00569
00570 return NULL;
00571 }
00572
00573 };
00574
00575 Channel* Channel::Create(const std::string& address)
00576 {
00577 try {
00578
00579 return new details::impl(address);
00580
00581 } catch (const std::exception& e) {
00582
00583 CRL_DEBUG("exception: %s\n", e.what());
00584 return NULL;
00585 }
00586 }
00587
00588 void Channel::Destroy(Channel *instanceP)
00589 {
00590 try {
00591
00592 if (instanceP)
00593 delete static_cast<details::impl*>(instanceP);
00594
00595 } catch (const std::exception& e) {
00596
00597 CRL_DEBUG("exception: %s\n", e.what());
00598 }
00599 }
00600
00601 const char *Channel::statusString(Status status)
00602 {
00603 switch(status) {
00604 case Status_Ok: return "Ok";
00605 case Status_TimedOut: return "Timed out";
00606 case Status_Error: return "Error";
00607 case Status_Failed: return "Failed";
00608 case Status_Unsupported: return "Unsupported";
00609 case Status_Unknown: return "Unknown command";
00610 case Status_Exception: return "Exception";
00611 }
00612
00613 return "Unknown Error";
00614 }
00615
00616 };
00617 };