Go to the documentation of this file.00001
00047 #include <string.h>
00048 #include <unistd.h>
00049 #include <cstdlib>
00050 #include <iostream>
00051 #include <vector>
00052 #include <string>
00053 #include <sstream>
00054 #include <ctime>
00055
00056 #include "husky_base/horizon_legacy/Transport.h"
00057 #include "husky_base/horizon_legacy/Number.h"
00058 #include "husky_base/horizon_legacy/Message.h"
00059 #include "husky_base/horizon_legacy/Message_request.h"
00060 #include "husky_base/horizon_legacy/Message_cmd.h"
00061 #include "husky_base/horizon_legacy/serial.h"
00062 #include "husky_base/horizon_legacy/Logger.h"
00063
00064 using namespace std;
00065
00066 namespace clearpath
00067 {
00068
00069 const char *Transport::counter_names[] = {
00070 "Garbled bytes",
00071 "Invalid messages",
00072 "Ignored acknowledgment",
00073 "Message queue overflow"
00074 };
00075
00076 TransportException::TransportException(const char *msg, enum errors ex_type)
00077 : Exception(msg), type(ex_type)
00078 {
00079 if (msg)
00080 {
00081 CPR_EXCEPT() << "TransportException " << (int) type << ": " << message << endl << std::flush;
00082 }
00083 }
00084
00085 BadAckException::BadAckException(unsigned int flag) :
00086 TransportException(NULL, TransportException::BAD_ACK_RESULT),
00087 ack_flag((enum ackFlags) flag)
00088 {
00089 switch (ack_flag)
00090 {
00091 case BAD_CHECKSUM:
00092 message = "Bad checksum";
00093 break;
00094 case BAD_TYPE:
00095 message = "Bad message type";
00096 break;
00097 case BAD_FORMAT:
00098 message = "Bad message format";
00099 break;
00100 case RANGE:
00101 message = "Range error";
00102 break;
00103 case OVER_FREQ:
00104 message = "Requested frequency too high";
00105 break;
00106 case OVER_SUBSCRIBE:
00107 message = "Too many subscriptions";
00108 break;
00109 default:
00110 message = "Unknown error code.";
00111 break;
00112 };
00113 stringstream ss;
00114
00115 CPR_EXCEPT() << "BadAckException (0x" << hex << flag << dec << "): " << message << endl << flush;
00116 }
00117
00118 #define CHECK_THROW_CONFIGURED() \
00119 do { \
00120 if( ! configured ) { \
00121 throw new TransportException("Transport not configured", TransportException::NOT_CONFIGURED); \
00122 } \
00123 } while( 0 )
00124
00129 Transport &Transport::instance()
00130 {
00131 static Transport instance;
00132 return instance;
00133 }
00134
00138 Transport::Transport() :
00139 configured(false),
00140 serial(0),
00141 retries(0)
00142 {
00143 for (int i = 0; i < NUM_COUNTERS; ++i)
00144 {
00145 counters[i] = 0;
00146 }
00147 }
00148
00149 Transport::~Transport()
00150 {
00151 close();
00152 }
00153
00164 void Transport::configure(const char *device, int retries)
00165 {
00166 if (configured)
00167 {
00168
00169 close();
00170 }
00171
00172
00173 resetCounters();
00174
00175 this->retries = retries;
00176
00177 if (!openComm(device))
00178 {
00179 configured = true;
00180 }
00181 else
00182 {
00183 throw new TransportException("Failed to open serial port", TransportException::CONFIGURE_FAIL);
00184 }
00185 }
00186
00192 int Transport::close()
00193 {
00194 int retval = 0;
00195 if (configured)
00196 {
00197 flush();
00198 retval = closeComm();
00199 }
00200 configured = false;
00201 return retval;
00202 }
00203
00208 int Transport::openComm(const char *device)
00209 {
00210 int tmp = OpenSerial(&(this->serial), device);
00211 if (tmp < 0)
00212 {
00213 return -1;
00214 }
00215 tmp = SetupSerial(this->serial);
00216 if (tmp < 0)
00217 {
00218 return -2;
00219 }
00220 return 0;
00221 }
00222
00226 int Transport::closeComm()
00227 {
00228 CloseSerial(this->serial);
00229
00230 return 0;
00231 }
00232
00241 Message *Transport::rxMessage()
00242 {
00243
00244
00245
00246
00247 static char rx_buf[Message::MAX_MSG_LENGTH];
00248 static size_t rx_inx = 0;
00249 static size_t msg_len = 0;
00250
00251 if (!rx_inx) { memset(rx_buf, 0xba, Message::MAX_MSG_LENGTH); }
00252
00253
00254
00255 while (ReadData(serial, rx_buf + rx_inx, 1) == 1)
00256 {
00257 switch (rx_inx)
00258 {
00259
00260
00261 case 0:
00262 if ((uint8_t) (rx_buf[0]) == (uint8_t) (Message::SOH))
00263 {
00264 rx_inx++;
00265 }
00266 else { counters[GARBLE_BYTES]++; }
00267 break;
00268
00269
00270 case 1:
00271 rx_inx++;
00272 break;
00273
00274
00275 case 2:
00276 rx_inx++;
00277 msg_len = rx_buf[1] + 3;
00278
00279
00280 if (static_cast<unsigned char>(rx_buf[1] ^ rx_buf[2]) != 0xFF ||
00281 (msg_len < Message::MIN_MSG_LENGTH))
00282 {
00283 counters[GARBLE_BYTES] += rx_inx;
00284 rx_inx = 0;
00285 }
00286
00287 break;
00288
00289
00290
00291
00292
00293
00294
00295 default:
00296 rx_inx++;
00297 if (rx_inx < msg_len) { break; }
00298
00299 rx_inx = 0;
00300 Message *msg = Message::factory(rx_buf, msg_len);
00301 return msg;
00302
00303 }
00304 }
00305
00306
00307 return NULL;
00308 }
00309
00316 Message *Transport::getAck()
00317 {
00318 Message *msg = NULL;
00319
00320 while ((msg = rxMessage()))
00321 {
00322
00323 if (msg->isData())
00324 {
00325 enqueueMessage(msg);
00326 continue;
00327 }
00328
00329
00330 if (!msg->isValid())
00331 {
00332 ++counters[INVALID_MSG];
00333 delete msg;
00334 continue;
00335 }
00336
00337 return msg;
00338 }
00339
00340 return NULL;
00341 }
00342
00349 void Transport::enqueueMessage(Message *msg)
00350 {
00351
00352 if (!msg->isValid())
00353 {
00354 ++counters[INVALID_MSG];
00355 delete msg;
00356 return;
00357 }
00358
00359
00360 rx_queue.push_back(msg);
00361
00362
00363 while (rx_queue.size() > MAX_QUEUE_LEN)
00364 {
00365 ++counters[QUEUE_FULL];
00366 delete rx_queue.front();
00367 rx_queue.pop_front();
00368 }
00369 }
00370
00371
00378 void Transport::poll()
00379 {
00380 CHECK_THROW_CONFIGURED();
00381
00382 Message *msg = NULL;
00383
00384 while ((msg = rxMessage()))
00385 {
00386
00387 if (!msg->isData())
00388 {
00389 ++counters[IGNORED_ACK];
00390 delete msg;
00391 continue;
00392 }
00393
00394
00395 enqueueMessage(msg);
00396 }
00397 }
00398
00406 void Transport::send(Message *m)
00407 {
00408 CHECK_THROW_CONFIGURED();
00409
00410 char skip_send = 0;
00411 Message *ack = NULL;
00412 int transmit_times = 0;
00413 short result_code;
00414
00415 poll();
00416
00417 while (1)
00418 {
00419
00420 if (transmit_times > this->retries)
00421 {
00422 break;
00423 }
00424
00425 if (!skip_send) { WriteData(serial, (char *) (m->data), m->total_len); }
00426
00427
00428 for (int i = 0; i < RETRY_DELAY_MS; ++i)
00429 {
00430 usleep(1000);
00431 if ((ack = getAck())) { break; }
00432 }
00433
00434
00435 if (ack == NULL)
00436 {
00437 skip_send = 0;
00438
00439 transmit_times++;
00440 continue;
00441 }
00442
00443
00444
00445
00446 result_code = btou(ack->getPayloadPointer(), 2);
00447 if (result_code > 0)
00448 {
00449 throw new BadAckException(result_code);
00450 }
00451 else
00452 {
00453
00454 break;
00455 }
00456
00457 transmit_times++;
00458 }
00459 if (ack == NULL)
00460 {
00461 throw new TransportException("Unacknowledged send", TransportException::UNACKNOWLEDGED_SEND);
00462 }
00463 delete ack;
00464
00465 m->is_sent = true;
00466 }
00467
00476 Message *Transport::popNext()
00477 {
00478 CHECK_THROW_CONFIGURED();
00479
00480 poll();
00481
00482 if (rx_queue.empty()) { return NULL; }
00483
00484 Message *next = rx_queue.front();
00485 rx_queue.pop_front();
00486 return next;
00487 }
00488
00499 Message *Transport::popNext(enum MessageTypes type)
00500 {
00501 CHECK_THROW_CONFIGURED();
00502
00503 poll();
00504
00505 Message *next;
00506 list<Message *>::iterator iter;
00507 for (iter = rx_queue.begin(); iter != rx_queue.end(); ++iter)
00508 {
00509 if ((*iter)->getType() == type)
00510 {
00511 next = *iter;
00512 rx_queue.erase(iter);
00513 return next;
00514 }
00515 }
00516 return NULL;
00517 }
00518
00525 Message *Transport::waitNext(double timeout)
00526 {
00527 CHECK_THROW_CONFIGURED();
00528
00529 double elapsed = 0.0;
00530 while (true)
00531 {
00532
00533 poll();
00534 if (!rx_queue.empty()) { return popNext(); }
00535
00536
00537 if ((timeout != 0.0) && (elapsed > timeout))
00538 {
00539
00540 return NULL;
00541 }
00542
00543
00544 usleep(1000);
00545 elapsed += 0.001;
00546 }
00547 }
00548
00557 Message *Transport::waitNext(enum MessageTypes type, double timeout)
00558 {
00559 CHECK_THROW_CONFIGURED();
00560
00561 double elapsed = 0.0;
00562 Message *msg;
00563
00564 while (true)
00565 {
00566
00567
00568
00569 poll();
00570 msg = popNext(type);
00571 if (msg) { return msg; }
00572
00573
00574 if ((timeout != 0.0) && (elapsed > timeout))
00575 {
00576
00577 return NULL;
00578 }
00579
00580
00581 usleep(1000);
00582 elapsed += 0.001;
00583 }
00584 }
00585
00593 void Transport::flush(list<Message *> *queue)
00594 {
00595 CHECK_THROW_CONFIGURED();
00596
00597 poll();
00598
00599
00600
00601 list<Message *>::iterator iter;
00602 for (iter = rx_queue.begin(); iter != rx_queue.end(); ++iter)
00603 {
00604 if (queue)
00605 {
00606 queue->push_back(*iter);
00607 }
00608 else
00609 {
00610 delete *iter;
00611 }
00612 }
00613
00614 rx_queue.clear();
00615 }
00616
00626 void Transport::flush(enum MessageTypes type, list<Message *> *queue)
00627 {
00628 CHECK_THROW_CONFIGURED();
00629
00630 poll();
00631
00632 list<Message *>::iterator iter = rx_queue.begin();
00633 while (iter != rx_queue.end())
00634 {
00635 if ((*iter)->getType() == type)
00636 {
00637
00638
00639 if (queue)
00640 {
00641 queue->push_back(*iter);
00642 }
00643 else
00644 {
00645 delete *iter;
00646 }
00647
00648 iter = rx_queue.erase(iter);
00649 }
00650 else
00651 {
00652
00653 iter++;
00654 }
00655 }
00656 }
00657
00661 void Transport::printCounters(ostream &stream)
00662 {
00663 stream << "Transport Counters" << endl;
00664 stream << "==================" << endl;
00665
00666 size_t longest_name = 0;
00667 size_t cur_len = 0;
00668 for (int i = 0; i < NUM_COUNTERS; ++i)
00669 {
00670 cur_len = strlen(counter_names[i]);
00671 if (cur_len > longest_name) { longest_name = cur_len; }
00672 }
00673
00674 for (int i = 0; i < NUM_COUNTERS; ++i)
00675 {
00676 cout.width(longest_name);
00677 cout << left << counter_names[i] << ": " << counters[i] << endl;
00678 }
00679
00680 cout.width(longest_name);
00681 cout << left << "Queue length" << ": " << rx_queue.size() << endl;
00682 }
00683
00687 void Transport::resetCounters()
00688 {
00689 for (int i = 0; i < NUM_COUNTERS; ++i)
00690 {
00691 counters[i] = 0;
00692 }
00693 }
00694
00695 };
00696