00001
00002
00003
00004
00005
00006
00007
00008 #include <tk_mkinterface/MKInterfaceConnection.hpp>
00009
00010 #include <telekyb_serial/SerialHelper.hpp>
00011
00012
00013
00014 #include <boost/regex.hpp>
00015 #include <boost/filesystem.hpp>
00016
00017 namespace fs = boost::filesystem;
00018
00019
00020 #define ALLADDR 'a'
00021 #define FCADDR 'b'
00022 #define NCADDR 'c'
00023 #define MK3MAGADDR 'd'
00024
00025
00026 #define SETVALUE_CMD_OUT 's'
00027 #define UPDATEVALUE_CMD_OUT 'u'
00028 #define ONLY_CMD_OUT 'b'
00029 #define ACTIVEDATAIDS_OUT 'l'
00030
00031
00032 #define VALUE_IN 'S'
00033 #define ACTIVEDATA_IN 'D'
00034 #define ACTIVEDATAIDS_IN 'L'
00035
00036
00037
00038 #define SYNC_SLEEP_USEC 25000 // 25ms every 4th command at 120hz
00039 #define MKVALUE_RESENDS 40
00040
00041 #define BUFFER_SIZE 256
00042
00043 #define SERIALDEVICE_MAXRETRIES 25
00044
00045 namespace TELEKYB_NAMESPACE
00046 {
00047
00048 const char MKInterfaceConnection::setPrefix[] = {'#', FCADDR ,SETVALUE_CMD_OUT};
00049 const char MKInterfaceConnection::updatePrefix[] = {'#', FCADDR ,UPDATEVALUE_CMD_OUT};
00050 const char MKInterfaceConnection::cmdOnlyPrefix[] = {'#', FCADDR ,ONLY_CMD_OUT};
00051 const char MKInterfaceConnection::setActiveDataIDsPrefix[] = {'#', FCADDR ,ACTIVEDATAIDS_OUT};
00052
00053
00054 MKInterfaceConnection::MKInterfaceConnection(const std::string& devicePath)
00055 : ThreadedSerialDevice(devicePath)
00056 {
00057
00058 setTermiosAttrCFlag(options.tTermiosCFlags->getValue());
00059 setTermiosAttrSpeed(options.tBaudRate->getValue().value(),options.tBaudRate->getValue().value());
00060
00061
00062 for (int i = 0; i < ACTIVEDATA_SIZE; i++) {
00063 activeDataIDs.ids[i] = i;
00064 }
00065
00066
00067 registerSerialDeviceListener(this);
00068
00069
00070 syncSetValue = NULL;
00071 syncUpdateValue = NULL;
00072 syncActiveDataIDsRequest = NULL;
00073
00074 currentSendQueue = SENDQUEUE_ACTIVEDATAIDS;
00075 }
00076
00077 MKInterfaceConnection::~MKInterfaceConnection()
00078 {
00079
00080
00081
00082 }
00083
00084 void MKInterfaceConnection::handleReadSerialData(const std::vector<char>& data)
00085 {
00086
00087
00088
00089
00090
00091
00092
00093
00094 if (!SerialHelper::checkCRC(&data[0], data.size())) {
00095 ROS_WARN("Recv Package with wrong CRC!");
00096 return;
00097 } else {
00098
00099 }
00100
00101
00102
00103
00104
00105 if (data[0] != '#') {
00106 ROS_ERROR("Received an unknown Message from the MK.");
00107 return;
00108 }
00109
00110
00111 switch (data[1]) {
00112 case FCADDR: {
00113 switch (data[2]) {
00114 case ACTIVEDATA_IN: {
00115
00116 if (syncActiveDataIDsRequest) {
00117 break;
00118 }
00119
00120
00121 MKActiveValues activeDataValues;
00122 SerialHelper::decodeData((char*)&activeDataValues, sizeof(activeDataValues), &data[3]);
00123
00124
00125 for (int i = 0; i < ACTIVEDATA_SIZE; i++) {
00126 MKSingleValuePacket packet(activeDataIDs.ids[i], activeDataValues.values[i]);
00127 handleRecvSingleValuePacket(packet);
00128 }
00129
00130 break;
00131 }
00132 case ACTIVEDATAIDS_IN: {
00133 MKActiveIDs newActiveDataIDs;
00134 SerialHelper::decodeData((char*)&newActiveDataIDs, sizeof(newActiveDataIDs), &data[3]);
00135
00136 activeDataIDs = newActiveDataIDs;
00137
00138
00139
00140
00141 if (syncActiveDataIDsRequest && *syncActiveDataIDsRequest == activeDataIDs) {
00142
00143 syncActiveDataIDsRequest = NULL;
00144 }
00145
00146
00147 if (activeDataIDs == activeDataIDsQueue.front()) {
00148
00149 boost::mutex::scoped_lock queueLock(queueMutex);
00150 activeDataIDsQueue.pop();
00151 }
00152
00153
00154 break;
00155 }
00156 case VALUE_IN: {
00157 MKSingleValuePacket singleValuePacket(0,0);
00158 SerialHelper::decodeData((char*)&singleValuePacket, sizeof(singleValuePacket), &data[3]);
00159 handleRecvSingleValuePacket(singleValuePacket);
00160
00161 break;
00162 }
00163 default:
00164 ROS_ERROR("FC Received an Message with unknown Command Field %c", data[2]);
00165 break;
00166
00167 }
00168 break;
00169 }
00170 default:
00171 ROS_ERROR("Received an Message with unknown Adress Field %c", data[1]);
00172 break;
00173 }
00174
00175 }
00176
00177
00178 void MKInterfaceConnection::handleRecvSingleValuePacket(MKSingleValuePacket packet)
00179 {
00180
00181 MKValue* newValue = mkData.setValue(packet);
00182
00183 if (!newValue) {
00184 ROS_ERROR("mkData.setValue(packet) returned NULL-Pointer. This should never happen! MK Code wrong!?!");
00185 return;
00186 }
00187
00188
00189
00190 if (syncUpdateValue && *syncUpdateValue == packet.id) {
00191
00192 syncUpdateValue = NULL;
00193 }
00194
00195
00196 if (syncSetValue && *syncSetValue == packet) {
00197
00198
00199 syncSetValue = NULL;
00200 }
00201
00202
00203
00204
00205
00206 if (!updateQueue.empty() && packet.id == updateQueue.front()) {
00207 boost::mutex::scoped_lock queueLock(queueMutex);
00208 updateQueue.pop();
00209 }
00210
00211 if (!setQueue.empty() && packet == setQueue.front()) {
00212 boost::mutex::scoped_lock queueLock(queueMutex);
00213 setQueue.pop();
00214 }
00215 }
00216
00217
00218
00219 void MKInterfaceConnection::writeCommand(const MKCommandsPacket& command)
00220 {
00221 char buffer[BUFFER_SIZE];
00222 char msg[BUFFER_SIZE];
00223 int nBytes = 0;
00224
00225 boost::mutex::scoped_lock queueLock(queueMutex);
00226
00227 if (currentSendQueue == SENDQUEUE_ACTIVEDATAIDS && !activeDataIDsQueue.empty()
00228 && asyncActiveIDsTimer.frequency() < options.tAsyncSendFrequency->getValue()) {
00229 asyncActiveIDsTimer.reset();
00230
00231 MKActiveIDs activeDataIDsRequest = activeDataIDsQueue.front();
00232 queueLock.unlock();
00233
00234 memcpy(msg,&command, sizeof(command));
00235 memcpy(msg + sizeof(command), &activeDataIDsRequest, sizeof(activeDataIDsRequest));
00236
00237 nBytes = SerialHelper::encodeData(buffer, setActiveDataIDsPrefix, sizeof(updatePrefix),
00238 msg, sizeof(command) + sizeof(activeDataIDsRequest));
00239
00240 } else if (currentSendQueue == SENDQUEUE_SET && !setQueue.empty()
00241 && asyncSetTimer.frequency() < options.tAsyncSendFrequency->getValue()) {
00242 asyncSetTimer.reset();
00243
00244 MKSingleValuePacket setValueRequest = setQueue.front();
00245 queueLock.unlock();
00246
00247
00248 memcpy(msg,&command, sizeof(command));
00249 memcpy(msg + sizeof(command), &setValueRequest, sizeof(setValueRequest));
00250
00251 nBytes = SerialHelper::encodeData(buffer, setPrefix, sizeof(setPrefix),
00252 msg, sizeof(command) + sizeof(setValueRequest));
00253
00254 } else if (currentSendQueue == SENDQUEUE_UPDATE && !updateQueue.empty()
00255 && asyncUpdateTimer.frequency() < options.tAsyncSendFrequency->getValue()) {
00256 asyncUpdateTimer.reset();
00257 MKInt updateValueRequest = updateQueue.front();
00258 queueLock.unlock();
00259
00260 ROS_INFO("Sending updateValue with ID: %d", updateValueRequest);
00261
00262 memcpy(msg,&command, sizeof(command));
00263 memcpy(msg + sizeof(command), &updateValueRequest, sizeof(updateValueRequest));
00264
00265 nBytes = SerialHelper::encodeData(buffer, updatePrefix, sizeof(updatePrefix),
00266 msg, sizeof(command) + sizeof(updateValueRequest));
00267
00268 } else {
00269 nBytes = SerialHelper::encodeData(buffer, cmdOnlyPrefix, sizeof(cmdOnlyPrefix),
00270 (const char*)&command, sizeof(command));
00271 }
00272
00273
00274 try {
00275 writeDevice(buffer,nBytes);
00276 } catch (SerialException &e) {
00277 e.process();
00278 }
00279
00280 currentSendQueue = (SendQueue)((currentSendQueue+1) % SENDQUEUE_SIZE);
00281 }
00282 void MKInterfaceConnection::writeSetValue()
00283 {
00284 if (!syncSetValue) {
00285
00286 return;
00287 }
00288
00289 MKSingleValuePacket packet = *syncSetValue;
00290
00291
00292 char msg[BUFFER_SIZE];
00293 memcpy(msg,&lastCmd, sizeof(lastCmd));
00294 memcpy(msg + sizeof(lastCmd), &packet, sizeof(packet));
00295
00296 char buffer[BUFFER_SIZE];
00297 int nBytes = SerialHelper::encodeData(buffer, setPrefix, sizeof(setPrefix), msg, sizeof(lastCmd) + sizeof(packet));
00298
00299
00300 try {
00301 writeDevice(buffer,nBytes);
00302 } catch (SerialException &e) {
00303 e.process();
00304 }
00305 }
00306
00307 void MKInterfaceConnection::writeUpdateValue()
00308 {
00309
00310 if (!syncUpdateValue) {
00311
00312 return;
00313 }
00314
00315 MKInt valueID = *syncUpdateValue;
00316
00317
00318 char msg[BUFFER_SIZE];
00319 memcpy(msg,&lastCmd, sizeof(lastCmd));
00320 memcpy(msg + sizeof(lastCmd), &valueID, sizeof(valueID));
00321
00322 char buffer[BUFFER_SIZE];
00323 int nBytes = SerialHelper::encodeData(buffer, updatePrefix, sizeof(updatePrefix), msg, sizeof(lastCmd) + sizeof(valueID));
00324
00325
00326 try {
00327 writeDevice(buffer,nBytes);
00328 } catch (SerialException &e) {
00329 e.process();
00330 }
00331 }
00332
00333 void MKInterfaceConnection::writeActiveDataIDsRequest()
00334 {
00335 if (!syncActiveDataIDsRequest) {
00336
00337 return;
00338 }
00339
00340 MKActiveIDs activeDataIDsRequestCopy = *syncActiveDataIDsRequest;
00341
00342 char msg[BUFFER_SIZE];
00343 memcpy(msg,&lastCmd, sizeof(lastCmd));
00344 memcpy(msg + sizeof(lastCmd), &activeDataIDsRequestCopy, sizeof(activeDataIDsRequestCopy));
00345
00346 char buffer[BUFFER_SIZE];
00347 int nBytes = SerialHelper::encodeData(buffer, setActiveDataIDsPrefix, sizeof(updatePrefix), msg, sizeof(lastCmd) + sizeof(activeDataIDsRequestCopy));
00348
00349
00350 try {
00351 writeDevice(buffer,nBytes);
00352 } catch (SerialException &e) {
00353 e.process();
00354 }
00355 }
00356
00357
00358 void MKInterfaceConnection::setLastCmd(const MKCommandsPacket& command)
00359 {
00360 boost::mutex::scoped_lock(lastCmdMutex);
00361 lastCmd = command;
00362 }
00363
00364
00365 void MKInterfaceConnection::sendCommand(const MKCommandsPacket& command)
00366 {
00367
00368 writeCommand(command);
00369
00370 setLastCmd(command);
00371 }
00372
00373
00374 bool MKInterfaceConnection::setValue(MKSingleValuePacket value)
00375 {
00376
00377 MKValue* mkValue = mkData.getValueByID(value.id);
00378 if (!mkValue) {
00379 ROS_ERROR("Called (sync) setValue with unknown MKValue ID.");
00380 return false;
00381 }
00382
00383 boost::mutex::scoped_lock(syncSetValueMutex);
00384 syncSetValue = &value;
00385
00386
00387 Time sleepTime(0,SYNC_SLEEP_USEC);
00388
00389 for (int i = 0; i < MKVALUE_RESENDS; ++i) {
00390
00391
00392 writeSetValue();
00393
00394
00395 sleepTime.sleep();
00396
00397
00398 if (syncSetValue == NULL) {
00399 return true;
00400 }
00401 }
00402
00403
00404 syncSetValue = NULL;
00405 return false;
00406 }
00407
00408
00409 bool MKInterfaceConnection::updateValue(MKInt id)
00410 {
00411
00412 MKValue* mkValue = mkData.getValueByID(id);
00413 if (!mkValue) {
00414 ROS_ERROR("Called (sync) updateValue with unknown MKValue ID.");
00415 return false;
00416 }
00417
00418 boost::mutex::scoped_lock(syncUpdateValueMutex);
00419 syncUpdateValue = &id;
00420
00421
00422 Time sleepTime(0,SYNC_SLEEP_USEC);
00423
00424 for (int i = 0; i < MKVALUE_RESENDS; ++i) {
00425
00426
00427 writeUpdateValue();
00428
00429
00430 sleepTime.sleep();
00431
00432
00433 if (syncUpdateValue == NULL) {
00434 return true;
00435 }
00436 }
00437
00438
00439 syncUpdateValue = NULL;
00440
00441 return false;
00442 }
00443
00444 bool MKInterfaceConnection::setActiveDataIDs(MKActiveIDs activeDataIDs_)
00445 {
00446
00447 if (! isValidMKActiveIDs(activeDataIDs_)) {
00448 ROS_ERROR("MKInterfaceConnection::setActiveDataIDs with inValid MKActiveIDs Object");
00449 return false;
00450 }
00451
00452 boost::mutex::scoped_lock(syncActiveDataIDRequestMutex);
00453 syncActiveDataIDsRequest = &activeDataIDs_;
00454
00455
00456 Time sleepTime(0,SYNC_SLEEP_USEC);
00457
00458 for (int i = 0; i < MKVALUE_RESENDS; ++i) {
00459
00460
00461 writeActiveDataIDsRequest();
00462
00463
00464 sleepTime.sleep();
00465
00466
00467 if (syncActiveDataIDsRequest == NULL) {
00468 return true;
00469 }
00470 }
00471
00472
00473 syncActiveDataIDsRequest = NULL;
00474
00475 return false;
00476 }
00477
00478
00479 void MKInterfaceConnection::setValueAsync(MKSingleValuePacket value)
00480 {
00481
00482 if (setQueue.size() > SET_QUEUE_SIZE) {
00483 ROS_ERROR("Set Queue is full! Limit: %d", SET_QUEUE_SIZE);
00484 return;
00485 }
00486
00487
00488 MKValue* mkValue = mkData.getValueByID(value.id);
00489 if (!mkValue) {
00490 ROS_ERROR("Called setValueAsync with unknown MKValue ID.");
00491 return;
00492 }
00493
00494
00495 boost::mutex::scoped_lock queueLock(queueMutex);
00496 setQueue.push(value);
00497 ROS_INFO("Added %d ID to SetQueue", value.id);
00498 }
00499
00500
00501 void MKInterfaceConnection::updateValueAsync(MKInt id)
00502 {
00503
00504 if (updateQueue.size() > UPDATE_QUEUE_SIZE) {
00505 ROS_ERROR("Update Queue is full! Limit: %d", UPDATE_QUEUE_SIZE);
00506 return;
00507 }
00508
00509
00510 MKValue* mkValue = mkData.getValueByID(id);
00511 if (!mkValue) {
00512 ROS_ERROR("Called updateValueAsync with unknown MKValue ID.");
00513 return;
00514 }
00515
00516
00517 boost::mutex::scoped_lock queueLock(queueMutex);
00518 updateQueue.push(id);
00519 ROS_INFO("Added %d ID to UpdateQueue", id);
00520 }
00521
00522 void MKInterfaceConnection::setActiveDataIDsAsync(MKActiveIDs activeDataIDs_)
00523 {
00524
00525 if (activeDataIDsQueue.size() > ACTIVE_IDS_QUEUE_SIZE) {
00526 ROS_ERROR("Active Data IDs Queue is full! Limit: %d", ACTIVE_IDS_QUEUE_SIZE);
00527 return;
00528 }
00529
00530
00531 if (! isValidMKActiveIDs(activeDataIDs_)) {
00532 ROS_ERROR("MKInterfaceConnection::setActiveDataIDsAsync with inValid MKActiveIDs Object");
00533 return;
00534 }
00535
00536 boost::mutex::scoped_lock queueLock(queueMutex);
00537 activeDataIDsQueue.push(activeDataIDs_);
00538 }
00539
00540 MKActiveIDs MKInterfaceConnection::getActiveDataIDs() const
00541 {
00542 return activeDataIDs;
00543 }
00544
00545 const MKData& MKInterfaceConnection::getMKDataRef() const
00546 {
00547 return mkData;
00548 }
00549
00550 void MKInterfaceConnection::registerMKDataListener(MKDataListener* listener)
00551 {
00552 mkData.registerMKDataListener(listener);
00553 }
00554
00555 void MKInterfaceConnection::unRegisterMKDataListener(MKDataListener* listener)
00556 {
00557 mkData.unRegisterMKDataListener(listener);
00558 }
00559
00560
00561
00562 MKInterfaceConnection* MKInterfaceConnection::findConnection(
00563 const std::string& serialDeviceDirectory,
00564 const std::string& serialDeviceNameRegEx,
00565 std::vector<MKSingleValuePacket> conditions)
00566 {
00567 MKInterfaceConnection* connection = NULL;
00568
00569 if (!fs::is_directory(serialDeviceDirectory)) {
00570 ROS_ERROR_STREAM(serialDeviceDirectory << "does not exist.");
00571 return connection;
00572 }
00573
00574 boost::regex filter( serialDeviceNameRegEx );
00575
00576 fs::directory_iterator end_itr;
00577
00578 std::vector<fs::path> serialPaths;
00579 for (fs::directory_iterator it( serialDeviceDirectory ); it != end_itr; ++it)
00580 {
00581 if ( ! boost::regex_match(it->path().filename().string(), filter)) {
00582
00583 continue;
00584 }
00585
00586
00587 serialPaths.push_back(it->path());
00588 }
00589
00590
00591
00592
00593 std::vector<int> serialRetries(serialPaths.size(), 0);
00594
00595 bool serialBreakCondition = false;
00596
00597 while (connection == NULL && !serialBreakCondition)
00598 {
00599
00600 serialBreakCondition = true;
00601 for (unsigned int path = 0; path < serialPaths.size(); ++path) {
00602
00603
00604 if (serialRetries[path] == -1 || serialRetries[path] > SERIALDEVICE_MAXRETRIES) {
00605 continue;
00606 }
00607 serialBreakCondition = false;
00608
00609
00610 try {
00611 connection = new MKInterfaceConnection(serialPaths[path].string());
00612 } catch (SerialException &e) {
00613 if (e.code != SerialExceptionCode::LOCKED) {
00614 e.process();
00615
00616 serialRetries[path] = -1;
00617 } else {
00618
00619
00620 if (serialRetries[path] >= SERIALDEVICE_MAXRETRIES) {
00621 ROS_ERROR("Unable to access serial Device after %d tries.", serialRetries[path]);
00622 }
00623 serialRetries[path]++;
00624 }
00625 continue;
00626 }
00627
00628
00629 for (unsigned int i = 0; i < conditions.size(); ++i)
00630 {
00631
00632
00633 MKValue* valuePtr = connection->getMKDataRef().getValueByID(conditions[i].id);
00634
00635
00636 if (!connection->updateValue(conditions[i].id)) {
00637 ROS_ERROR("Unable to update Value from QC");
00638 delete connection; connection = NULL;
00639 break;
00640 }
00641
00642 if ( ! valuePtr->equals(conditions[i]) ) {
00643
00644
00645 ROS_ERROR_STREAM("Mismatch condition! Name: "
00646 << MKDataDefines::MKDATAIDS_NAMES[conditions[i].id]
00647 << " (" << conditions[i].id << ") Is: " << valuePtr->getValue() << " Should be: " << conditions[i].value);
00648 delete connection; connection = NULL;
00649 break;
00650 }
00651 }
00652
00653
00654 if ( connection ) {
00655
00656 ROS_INFO_STREAM("Found matching MKInterfaceConnection on serial: " << serialPaths[path].string());
00657 break;
00658 } else {
00659
00660 serialRetries[path] = -1;
00661 }
00662
00663 }
00664
00665
00666 if (! connection) {
00667 usleep(50 * 1000);
00668 }
00669 }
00670
00671 if (!connection) {
00672
00673 ROS_ERROR("Unable to find connection that matches conditions:");
00674 for (unsigned int i = 0; i < conditions.size(); ++i)
00675 {
00676 ROS_ERROR_STREAM("Name: "
00677 << MKDataDefines::MKDATAIDS_NAMES[conditions[i].id]
00678 << " (" << conditions[i].id << ") Value: " << conditions[i].value);
00679 }
00680 }
00681
00682 return connection;
00683 }
00684
00685 }
00686