Go to the documentation of this file.00001
00047 #include <string.h>
00048 #include <unistd.h>
00049 #include <cstdlib>
00050 #include <iostream>
00051 #include <vector>
00052 #include <string>
00053 #include <sstream>
00054 #include <ctime>
00055
00056 #include "roch_base/core/Transport.h"
00057 #include "roch_base/core/Number.h"
00058 #include "roch_base/core/Message.h"
00059 #include "roch_base/core/Message_request.h"
00060 #include "roch_base/core/Message_cmd.h"
00061 #include "roch_base/core/serial.h"
00062 #include "roch_base/core/Logger.h"
00063
00064 using namespace std;
00065
00066 namespace sawyer
00067 {
00068
00069 const char *Transport::counter_names[] = {
00070 "Garbled bytes",
00071 "Invalid messages",
00072 "Ignored acknowledgment",
00073 "Message queue overflow"
00074 };
00075
00076 TransportException::TransportException(const char *msg, enum errors ex_type)
00077 : Exception(msg), type(ex_type)
00078 {
00079 if (msg)
00080 {
00081 CPR_EXCEPT() << "TransportException " << (int) type << ": " << message << endl << std::flush;
00082 }
00083 }
00084
00085 BadAckException::BadAckException(unsigned int flag) :
00086 TransportException(NULL, TransportException::BAD_ACK_RESULT),
00087 ack_flag((enum ackFlags) flag)
00088 {
00089 switch (ack_flag)
00090 {
00091 case BAD_CHECKSUM:
00092 message = "Bad checksum";
00093 break;
00094 case BAD_TYPE:
00095 message = "Bad message type";
00096 break;
00097 case BAD_FORMAT:
00098 message = "Bad message format";
00099 break;
00100 case RANGE:
00101 message = "Range error";
00102 break;
00103 case OVER_FREQ:
00104 message = "Requested frequency too high";
00105 break;
00106 case OVER_SUBSCRIBE:
00107 message = "Too many subscriptions";
00108 break;
00109 default:
00110 message = "Unknown error code.";
00111 break;
00112 };
00113 stringstream ss;
00114
00115 CPR_EXCEPT() << "BadAckException (0x" << hex << flag << dec << "): " << message << endl << flush;
00116 }
00117
00118 #define CHECK_THROW_CONFIGURED() \
00119 do { \
00120 if( ! configured ) { \
00121 throw new TransportException("Transport not configured", TransportException::NOT_CONFIGURED); \
00122 } \
00123 } while( 0 )
00124
00129 Transport &Transport::instance()
00130 {
00131 static Transport instance;
00132 return instance;
00133 }
00134
00138 Transport::Transport() :
00139 configured(false),
00140 serial(0),
00141 retries(0)
00142 {
00143 for (int i = 0; i < NUM_COUNTERS; ++i)
00144 {
00145 counters[i] = 0;
00146 }
00147 }
00148
00149 Transport::~Transport()
00150 {
00151 close();
00152 }
00153
00164 void Transport::configure(const char *device, int retries)
00165 {
00166
00167 if (configured)
00168 {
00169
00170 close();
00171 }
00172
00173
00174 resetCounters();
00175
00176 this->retries = retries;
00177 if (!openComm(device))
00178 {
00179 configured = true;
00180 }
00181 else
00182 {
00183 throw new TransportException("Failed to open serial port", TransportException::CONFIGURE_FAIL);
00184 }
00185 }
00186
00192 int Transport::close()
00193 {
00194 int retval = 0;
00195 if (configured)
00196 {
00197 flush();
00198 retval = closeComm();
00199 }
00200 configured = false;
00201 return retval;
00202 }
00203
00208 int Transport::openComm(const char *device)
00209 {
00210
00211 int tmp = rochDriver.OpenSerial(&(this->serial), device);
00212 if (tmp < 0)
00213 {
00214 return -1;
00215 }
00216 tmp = rochDriver.SetupSerial(this->serial);
00217 if (tmp < 0)
00218 {
00219 return -2;
00220 }
00221 return 0;
00222 }
00223
00227 int Transport::closeComm()
00228 {
00229 rochDriver.CloseSerial(this->serial);
00230
00231 return 0;
00232 }
00233
00242 Message *Transport::rxMessage()
00243 {
00244
00245
00246
00247
00248
00249 static char rx_buf[Message::MAX_MSG_LENGTH];
00250 static size_t rx_inx = 0;
00251 static size_t msg_len = 0;
00252
00253 if (!rx_inx) { memset(rx_buf, 0xba, Message::MAX_MSG_LENGTH); }
00254
00255 while ( rochDriver.ReadData(serial, rx_buf + rx_inx, 1)==1)
00256 {
00257 switch (rx_inx)
00258 {
00259
00260
00261 case 0:
00262 if ((uint8_t) (rx_buf[0]) == (uint8_t) (Message::SOH))
00263 {
00264 rx_inx++;
00265 }
00266 else { counters[GARBLE_BYTES]++; }
00267 break;
00268
00269
00270 case 1:
00271 rx_inx++;
00272 break;
00273
00274
00275 case 2:
00276 rx_inx++;
00277 msg_len = rx_buf[1] + 3;
00278
00279
00280 if (static_cast<unsigned char>(rx_buf[1] ^ rx_buf[2]) != 0xFF ||
00281 (msg_len < Message::MIN_MSG_LENGTH))
00282 {
00283 counters[GARBLE_BYTES] += rx_inx;
00284 rx_inx = 0;
00285 }
00286
00287 break;
00288
00289
00290 default:
00291 rx_inx++;
00292 if (rx_inx < msg_len) { break; }
00293
00294 rx_inx = 0;
00295 rochDriver.rawData_.clear();
00296 rochDriver.rawData_.rawData.length = msg_len;
00297 for (int i=0; i<msg_len; ++i) rochDriver.rawData_.rawData.data[i]= (unsigned char)(rx_buf[i]);
00298
00299 Message *msg = Message::factory(rx_buf, msg_len);
00300 return msg;
00301
00302 }
00303 }
00304
00305
00306 return NULL;
00307 }
00308
00315 Message *Transport::getAck()
00316 {
00317 Message *msg = NULL;
00318 while ((msg = rxMessage()))
00319 {
00320
00321 if (msg->isData())
00322 {
00323 rochDriver.rawData_.clear();
00324 rochDriver.rawData_.rawData.length = msg->getTotalLength();
00325 for (int i=0; i<msg->getTotalLength(); ++i) rochDriver.rawData_.rawData.data[i]= (unsigned char)(msg->data[i]);
00326 return msg;
00327 continue;
00328
00329 }
00330
00331
00332 if (!msg->isValid())
00333 {
00334 ++counters[INVALID_MSG];
00335 delete msg;
00336 continue;
00337 }
00338 return msg;
00339 }
00340 return NULL;
00341
00342 }
00343
00350 void Transport::enqueueMessage(Message *msg)
00351 {
00352
00353 if (!msg->isValid())
00354 {
00355 ++counters[INVALID_MSG];
00356 delete msg;
00357 return;
00358 }
00359
00360
00361 rx_queue.push_back(msg);
00362
00363
00364 while (rx_queue.size() > MAX_QUEUE_LEN)
00365 {
00366 ++counters[QUEUE_FULL];
00367 delete rx_queue.front();
00368 rx_queue.pop_front();
00369 }
00370 }
00371
00372
00379 void Transport::poll()
00380 {
00381 CHECK_THROW_CONFIGURED();
00382
00383 Message *msg = NULL;
00384
00385 while ((msg = rxMessage()))
00386 {
00387
00388
00389 if (!msg->isData())
00390 {
00391 ++counters[IGNORED_ACK];
00392 delete msg;
00393 continue;
00394 }
00395
00396
00397 enqueueMessage(msg);
00398 }
00399 }
00400
00408 void Transport::send(Message *m)
00409 {
00410
00411 counter_number = 0;
00412 CHECK_THROW_CONFIGURED();
00413
00414
00415 char skip_send = 0;
00416 Message *ack = NULL;
00417 Message *ack_temp = NULL;
00418 int transmit_times = 0;
00419 short result_code;
00420 poll();
00421 while (1)
00422 {
00423
00424 if (transmit_times > this->retries)
00425 {
00426 break;
00427 }
00428
00429
00430 rochDriver.rawData_.clear();
00431 rochDriver.rawData_.rawData.length = m->getTotalLength();
00432 for (int i=0; i<m->getTotalLength(); ++i) rochDriver.rawData_.rawData.data[i]= (unsigned char)(m->data[i]);
00433 if (!skip_send) { rochDriver.WriteData(serial, (char *) (m->data), m->total_len); }
00434
00435
00436 for (int i = 0; i < RETRY_DELAY_MS; ++i)
00437 {
00438 usleep(1000);
00439 if ((ack = getAck())) { counter_number++; ack_temp = ack; break;
00440
00441 }
00442 }
00443 if(ack_temp == NULL)
00444 {
00445 skip_send = 0;
00446 cout << "No message received yet" << endl;
00447 transmit_times++;
00448 continue;
00449 }
00450
00451
00452
00453
00454 result_code = btou(ack_temp->getPayloadPointer(), 2);
00455
00456 if (result_code > 0 && !ack_temp->isData())
00457 {
00458 throw new BadAckException(result_code);
00459 }
00460 else
00461 {
00462
00463 break;
00464 }
00465
00466 transmit_times++;
00467 }
00468 if (ack_temp == NULL)
00469 {
00470 throw new TransportException("Unacknowledged send", TransportException::UNACKNOWLEDGED_SEND);
00471 }
00472 delete ack_temp;
00473
00474 m->is_sent = true;
00475 }
00476
00485 Message* Transport::sendRequest(Message *m)
00486 {
00487 counter_number = 0;
00488 CHECK_THROW_CONFIGURED();
00489
00490 char skip_send = 0;
00491 Message *ack = NULL;
00492 Message *ack_temp = NULL;
00493 int transmit_times = 0;
00494 short result_code;
00495 poll();
00496 while (1)
00497 {
00498
00499 if (transmit_times > this->retries)
00500 {
00501 break;
00502 }
00503
00504
00505 rochDriver.rawData_.clear();
00506 rochDriver.rawData_.rawData.length = m->getTotalLength();
00507 for (int i=0; i<m->getTotalLength(); ++i) rochDriver.rawData_.rawData.data[i]= (unsigned char)(m->data[i]);
00508 if (!skip_send) { rochDriver.WriteData(serial, (char *) (m->data), m->total_len); }
00509
00510
00511 for (int i = 0; i < RETRY_DELAY_MS; ++i)
00512 {
00513 usleep(1000);
00514 if ((ack = getAck())) { counter_number++; ack_temp = ack;
00515 break;
00516 }
00517 }
00518 if(ack_temp == NULL)
00519 {
00520 skip_send = 0;
00521 cout << "No message received yet" << endl;
00522 transmit_times++;
00523 continue;
00524 }
00525
00526
00527
00528 if(ack_temp->isData()) { return ack_temp;}
00529 result_code = btou(ack_temp->getPayloadPointer(), 2);
00530 if (result_code > 0 && !ack_temp->isData())
00531 {
00532 throw new BadAckException(result_code);
00533 }
00534 else
00535 {
00536
00537 break;
00538 }
00539
00540 transmit_times++;
00541 }
00542 if (ack_temp == NULL)
00543 {
00544 throw new TransportException("Unacknowledged send", TransportException::UNACKNOWLEDGED_SEND);
00545 }
00546 delete ack_temp;
00547
00548 m->is_sent = true;
00549 }
00558 Message *Transport::popNext()
00559 {
00560 CHECK_THROW_CONFIGURED();
00561
00562 poll();
00563
00564 if (rx_queue.empty()) { return NULL; }
00565
00566 Message *next = rx_queue.front();
00567 rx_queue.pop_front();
00568 return next;
00569 }
00570
00581 Message *Transport::popNext(enum MessageTypes type)
00582 {
00583 CHECK_THROW_CONFIGURED();
00584
00585 poll();
00586
00587 Message *next;
00588 list<Message *>::iterator iter;
00589 for (iter = rx_queue.begin(); iter != rx_queue.end(); ++iter)
00590 {
00591 if ((*iter)->getType() == type)
00592 {
00593 next = *iter;
00594 rx_queue.erase(iter);
00595 return next;
00596 }
00597 }
00598 return NULL;
00599 }
00600
00607 Message *Transport::waitNext(double timeout)
00608 {
00609 CHECK_THROW_CONFIGURED();
00610
00611 double elapsed = 0.0;
00612 while (true)
00613 {
00614
00615 poll();
00616 if (!rx_queue.empty()) { return popNext(); }
00617
00618
00619 if ((timeout != 0.0) && (elapsed > timeout))
00620 {
00621
00622 return NULL;
00623 }
00624
00625
00626 usleep(1000);
00627 elapsed += 0.001;
00628 }
00629 }
00630
00639 Message *Transport::waitNext(enum MessageTypes type, double timeout)
00640 {
00641 CHECK_THROW_CONFIGURED();
00642
00643 double elapsed = 0.0;
00644 Message *msg;
00645
00646 while (true)
00647 {
00648
00649
00650
00651
00652 poll();
00653 if (msg = Request(type-0x4000,timeout).sendRequest()) return msg;
00654
00655
00656 if ((timeout != 0.0) && (elapsed > timeout))
00657 {
00658 return NULL;
00659 }
00660
00661
00662 usleep(1000);
00663 elapsed += 0.001;
00664 }
00665 }
00666
00674 void Transport::flush(list<Message *> *queue)
00675 {
00676 CHECK_THROW_CONFIGURED();
00677
00678 poll();
00679
00680
00681
00682 list<Message *>::iterator iter;
00683 for (iter = rx_queue.begin(); iter != rx_queue.end(); ++iter)
00684 {
00685 if (queue)
00686 {
00687 queue->push_back(*iter);
00688 }
00689 else
00690 {
00691 delete *iter;
00692 }
00693 }
00694
00695 rx_queue.clear();
00696
00697 }
00698
00708 void Transport::flush(enum MessageTypes type, list<Message *> *queue)
00709 {
00710 CHECK_THROW_CONFIGURED();
00711 poll();
00712 list<Message *>::iterator iter = rx_queue.begin();
00713 while (iter != rx_queue.end())
00714 {
00715 if ((*iter)->getType() == type)
00716 {
00717
00718
00719 if (queue)
00720 {
00721 queue->push_back(*iter);
00722 }
00723 else
00724 {
00725 delete *iter;
00726 }
00727
00728 iter = rx_queue.erase(iter);
00729 }
00730 else
00731 {
00732
00733 iter++;
00734 }
00735 }
00736 }
00737
00741 void Transport::printCounters(ostream &stream)
00742 {
00743 stream << "Transport Counters" << endl;
00744 stream << "==================" << endl;
00745
00746 size_t longest_name = 0;
00747 size_t cur_len = 0;
00748 for (int i = 0; i < NUM_COUNTERS; ++i)
00749 {
00750 cur_len = strlen(counter_names[i]);
00751 if (cur_len > longest_name) { longest_name = cur_len; }
00752 }
00753
00754 for (int i = 0; i < NUM_COUNTERS; ++i)
00755 {
00756 cout.width(longest_name);
00757 cout << left << counter_names[i] << ": " << counters[i] << endl;
00758 }
00759
00760 cout.width(longest_name);
00761 cout << left << "Queue length" << ": " << rx_queue.size() << endl;
00762 }
00763
00767 void Transport::resetCounters()
00768 {
00769 for (int i = 0; i < NUM_COUNTERS; ++i)
00770 {
00771 counters[i] = 0;
00772 }
00773 }
00774
00775 };
00776