Transport.cpp
Go to the documentation of this file.
1 
47 #include <string.h>
48 #include <unistd.h>
49 #include <cstdlib>
50 #include <iostream>
51 #include <vector>
52 #include <string>
53 #include <sstream>
54 #include <ctime>
55 
63 
64 using namespace std;
65 
66 namespace clearpath
67 {
68 
69  const char *Transport::counter_names[] = {
70  "Garbled bytes",
71  "Invalid messages",
72  "Ignored acknowledgment",
73  "Message queue overflow"
74  };
75 
76  TransportException::TransportException(const char *msg, enum errors ex_type)
77  : Exception(msg), type(ex_type)
78  {
79  if (msg)
80  {
81  CPR_EXCEPT() << "TransportException " << (int) type << ": " << message << endl << std::flush;
82  }
83  }
84 
85  BadAckException::BadAckException(unsigned int flag) :
87  ack_flag((enum ackFlags) flag)
88  {
89  switch (ack_flag)
90  {
91  case BAD_CHECKSUM:
92  message = "Bad checksum";
93  break;
94  case BAD_TYPE:
95  message = "Bad message type";
96  break;
97  case BAD_FORMAT:
98  message = "Bad message format";
99  break;
100  case RANGE:
101  message = "Range error";
102  break;
103  case OVER_FREQ:
104  message = "Requested frequency too high";
105  break;
106  case OVER_SUBSCRIBE:
107  message = "Too many subscriptions";
108  break;
109  default:
110  message = "Unknown error code.";
111  break;
112  };
113  stringstream ss;
114 
115  CPR_EXCEPT() << "BadAckException (0x" << hex << flag << dec << "): " << message << endl << flush;
116  }
117 
118 #define CHECK_THROW_CONFIGURED() \
119  do { \
120  if( ! configured ) { \
121  throw new TransportException("Transport not configured", TransportException::NOT_CONFIGURED); \
122  } \
123  } while( 0 )
124 
130  {
131  static Transport instance;
132  return instance;
133  }
134 
139  configured(false),
140  serial(0),
141  retries(0)
142  {
143  for (int i = 0; i < NUM_COUNTERS; ++i)
144  {
145  counters[i] = 0;
146  }
147  }
148 
150  {
151  close();
152  }
153 
164  void Transport::configure(const char *device, int retries)
165  {
166  if (configured)
167  {
168  // Close serial
169  close();
170  }
171 
172  // Forget old counters
173  resetCounters();
174 
175  this->retries = retries;
176 
177  if (!openComm(device))
178  {
179  configured = true;
180  }
181  else
182  {
183  throw new TransportException("Failed to open serial port", TransportException::CONFIGURE_FAIL);
184  }
185  }
186 
193  {
194  int retval = 0;
195  if (configured)
196  {
197  flush();
198  retval = closeComm();
199  }
200  configured = false;
201  return retval;
202  }
203 
208  int Transport::openComm(const char *device)
209  {
210  int tmp = OpenSerial(&(this->serial), device);
211  if (tmp < 0)
212  {
213  return -1;
214  }
215  tmp = SetupSerial(this->serial);
216  if (tmp < 0)
217  {
218  return -2;
219  }
220  return 0;
221  }
222 
227  {
228  CloseSerial(this->serial);
229  //serial = 0;
230  return 0;
231  }
232 
242  {
243  /* Each time this function is called, any available characters are added
244  * to the receive buffer. A new Message is created and returned when
245  * a complete message has been received (the message may be aggregated
246  * from data received over multiple calls) */
247  static char rx_buf[Message::MAX_MSG_LENGTH];
248  static size_t rx_inx = 0;
249  static size_t msg_len = 0;
250 
251  if (!rx_inx) { memset(rx_buf, 0xba, Message::MAX_MSG_LENGTH); }
252 
253  /* Read in and handle characters, one at a time.
254  * (This is a simple state machine, using 'rx_inx' as state) */
255  while (ReadData(serial, rx_buf + rx_inx, 1) == 1)
256  {
257  switch (rx_inx)
258  {
259 
260  /* Waiting for SOH */
261  case 0:
262  if ((uint8_t) (rx_buf[0]) == (uint8_t) (Message::SOH))
263  {
264  rx_inx++;
265  }
266  else { counters[GARBLE_BYTES]++; }
267  break;
268 
269  /* Waiting for length */
270  case 1:
271  rx_inx++;
272  break;
273 
274  /* Waiting for ~length */
275  case 2:
276  rx_inx++;
277  msg_len = rx_buf[1] + 3;
278 
279  /* Check for valid length */
280  if (static_cast<unsigned char>(rx_buf[1] ^ rx_buf[2]) != 0xFF ||
281  (msg_len < Message::MIN_MSG_LENGTH))
282  {
283  counters[GARBLE_BYTES] += rx_inx;
284  rx_inx = 0;
285  }
286 
287  break;
288 
289  //case 9:
290  //case 10:
291  // cout << hex << " " << (unsigned int)(rx_buf[rx_inx]);
292  // if(rx_inx==10) cout << endl;
293 
294  /* Waiting for the rest of the message */
295  default:
296  rx_inx++;
297  if (rx_inx < msg_len) { break; }
298  /* Finished rxing, reset this state machine and return msg */
299  rx_inx = 0;
300  Message *msg = Message::factory(rx_buf, msg_len);
301  return msg;
302 
303  } // switch( rx_inx )
304  } // while( get character )
305 
306  // Breaking out of loop indicates end of available serial input
307  return NULL;
308  }
309 
317  {
318  Message *msg = NULL;
319 
320  while ((msg = rxMessage()))
321  {
322  /* Queue any data messages that turn up */
323  if (msg->isData())
324  {
325  enqueueMessage(msg);
326  continue;
327  }
328 
329  /* Drop invalid messages */
330  if (!msg->isValid())
331  {
333  delete msg;
334  continue;
335  }
336 
337  return msg;
338  }
339 
340  return NULL;
341  }
342 
350  {
351  /* Reject invalid messages */
352  if (!msg->isValid())
353  {
355  delete msg;
356  return;
357  }
358 
359  // Enqueue
360  rx_queue.push_back(msg);
361 
362  /* Drop the oldest messages if the queue has overflowed */
363  while (rx_queue.size() > MAX_QUEUE_LEN)
364  {
365  ++counters[QUEUE_FULL];
366  delete rx_queue.front();
367  rx_queue.pop_front();
368  }
369  }
370 
371 
379  {
381 
382  Message *msg = NULL;
383 
384  while ((msg = rxMessage()))
385  {
386  /* We're not waiting for acks, so drop them */
387  if (!msg->isData())
388  {
390  delete msg;
391  continue;
392  }
393 
394  // Message is good, queue it.
395  enqueueMessage(msg);
396  }
397  }
398 
407  {
409 
410  char skip_send = 0;
411  Message *ack = NULL;
412  int transmit_times = 0;
413  short result_code;
414 
415  poll();
416 
417  while (1)
418  {
419  // We have exceeded our retry numbers
420  if (transmit_times > this->retries)
421  {
422  break;
423  }
424  // Write output
425  if (!skip_send) { WriteData(serial, (char *) (m->data), m->total_len); }
426 
427  // Wait up to 100 ms for ack
428  for (int i = 0; i < RETRY_DELAY_MS; ++i)
429  {
430  usleep(1000);
431  if ((ack = getAck())) { break; }
432  }
433 
434  // No message - resend
435  if (ack == NULL)
436  {
437  skip_send = 0;
438  //cout << "No message received yet" << endl;
439  transmit_times++;
440  continue;
441  }
442 
443  // Check result code
444  // If the result code is bad, the message was still transmitted
445  // successfully
446  result_code = btou(ack->getPayloadPointer(), 2);
447  if (result_code > 0)
448  {
449  throw new BadAckException(result_code);
450  }
451  else
452  {
453  // Everything's good - return
454  break;
455  }
456  // Other failure
457  transmit_times++;
458  }
459  if (ack == NULL)
460  {
461  throw new TransportException("Unacknowledged send", TransportException::UNACKNOWLEDGED_SEND);
462  }
463  delete ack;
464 
465  m->is_sent = true;
466  }
467 
477  {
479 
480  poll(); // empty the current serial RX queue.
481 
482  if (rx_queue.empty()) { return NULL; }
483 
484  Message *next = rx_queue.front();
485  rx_queue.pop_front();
486  return next;
487  }
488 
500  {
502 
503  poll(); // empty the current RX queue
504 
505  Message *next;
506  list<Message *>::iterator iter;
507  for (iter = rx_queue.begin(); iter != rx_queue.end(); ++iter)
508  {
509  if ((*iter)->getType() == type)
510  {
511  next = *iter;
512  rx_queue.erase(iter);
513  return next;
514  }
515  }
516  return NULL;
517  }
518 
525  Message *Transport::waitNext(double timeout)
526  {
528 
529  double elapsed = 0.0;
530  while (true)
531  {
532  /* Return a message if it's turned up */
533  poll();
534  if (!rx_queue.empty()) { return popNext(); }
535 
536  /* Check timeout */
537  if ((timeout != 0.0) && (elapsed > timeout))
538  {
539  // If we have a timeout set, and it has elapsed, exit.
540  return NULL;
541  }
542 
543  // Wait a ms before retry
544  usleep(1000);
545  elapsed += 0.001;
546  }
547  }
548 
557  Message *Transport::waitNext(enum MessageTypes type, double timeout)
558  {
560 
561  double elapsed = 0.0;
562  Message *msg;
563 
564  while (true)
565  {
566  /* Check if the message has turned up
567  * Since we're blocking, not doing anything useful anyway, it doesn't
568  * really matter that we're iterating the entire message queue every spin. */
569  poll();
570  msg = popNext(type);
571  if (msg) { return msg; }
572 
573  /* Check timeout */
574  if ((timeout != 0.0) && (elapsed > timeout))
575  {
576  // If a timeout is set and has elapsed, fail out.
577  return NULL;
578  }
579 
580  // Wait a ms before retry
581  usleep(1000);
582  elapsed += 0.001;
583  }
584  }
585 
593  void Transport::flush(list<Message *> *queue)
594  {
596 
597  poll(); // flush serial buffer
598 
599  /* Either delete or move all elements in the queue, depending
600  * on whether a destination list is provided */
601  list<Message *>::iterator iter;
602  for (iter = rx_queue.begin(); iter != rx_queue.end(); ++iter)
603  {
604  if (queue)
605  {
606  queue->push_back(*iter);
607  }
608  else
609  {
610  delete *iter;
611  }
612  }
613 
614  rx_queue.clear();
615  }
616 
626  void Transport::flush(enum MessageTypes type, list<Message *> *queue)
627  {
629 
630  poll();
631 
632  list<Message *>::iterator iter = rx_queue.begin();
633  while (iter != rx_queue.end())
634  {
635  if ((*iter)->getType() == type)
636  {
637  /* Element is of flush type. If there's a destination
638  * list, move it. Otherwise, destroy it */
639  if (queue)
640  {
641  queue->push_back(*iter);
642  }
643  else
644  {
645  delete *iter;
646  }
647  // This advances to the next element in the queue:
648  iter = rx_queue.erase(iter);
649  }
650  else
651  {
652  // Not interested in this element. Next!
653  iter++;
654  }
655  }
656  }
657 
661  void Transport::printCounters(ostream &stream)
662  {
663  stream << "Transport Counters" << endl;
664  stream << "==================" << endl;
665 
666  size_t longest_name = 0;
667  size_t cur_len = 0;
668  for (int i = 0; i < NUM_COUNTERS; ++i)
669  {
670  cur_len = strlen(counter_names[i]);
671  if (cur_len > longest_name) { longest_name = cur_len; }
672  }
673 
674  for (int i = 0; i < NUM_COUNTERS; ++i)
675  {
676  cout.width(longest_name);
677  cout << left << counter_names[i] << ": " << counters[i] << endl;
678  }
679 
680  cout.width(longest_name);
681  cout << left << "Queue length" << ": " << rx_queue.size() << endl;
682  }
683 
688  {
689  for (int i = 0; i < NUM_COUNTERS; ++i)
690  {
691  counters[i] = 0;
692  }
693  }
694 
695 }; // namespace clearpath
696 
BadAckException(unsigned int flag)
Definition: Transport.cpp:85
msg
uint8_t data[MAX_MSG_LENGTH]
Definition: Message.h:101
static const char * counter_names[NUM_COUNTERS]
Definition: Transport.h:108
enum clearpath::BadAckException::ackFlags ack_flag
static Transport & instance()
Definition: Transport.cpp:129
int OpenSerial(void **handle, const char *port_name)
int openComm(const char *device)
Definition: Transport.cpp:208
Message * getAck()
Definition: Transport.cpp:316
int CloseSerial(void *handle)
static Message * factory(void *input, size_t msg_len)
Definition: Message.cpp:385
std::list< Message * > rx_queue
Definition: Transport.h:118
static const int RETRY_DELAY_MS
Definition: Transport.h:116
int ReadData(void *handle, char *buffer, int length)
uint64_t btou(void *src, size_t src_len)
Definition: Number.cpp:121
unsigned long counters[NUM_COUNTERS]
Definition: Transport.h:121
#define CPR_EXCEPT()
Definition: Logger.h:106
void enqueueMessage(Message *msg)
Definition: Transport.cpp:349
size_t total_len
Definition: Message.h:103
Message * waitNext(double timeout=0.0)
Definition: Transport.cpp:525
const char * message
Definition: Exception.h:58
uint8_t * getPayloadPointer(size_t offset=0)
Definition: Message.cpp:181
int WriteData(void *handle, const char *buffer, int length)
void send(Message *m)
Definition: Transport.cpp:406
int SetupSerial(void *handle)
static const size_t MAX_QUEUE_LEN
Definition: Transport.h:119
Message * rxMessage()
Definition: Transport.cpp:241
MessageTypes
Definition: Message.h:211
static const size_t MAX_MSG_LENGTH
Definition: Message.h:78
Message * popNext()
Definition: Transport.cpp:476
static const uint8_t SOH
Definition: Message.h:113
static const size_t MIN_MSG_LENGTH
Definition: Message.h:112
#define CHECK_THROW_CONFIGURED()
Definition: Transport.cpp:118
bool isValid(char *whyNot=NULL, size_t strLen=0)
Definition: Message.cpp:302
void printCounters(std::ostream &stream=std::cout)
Definition: Transport.cpp:661
void flush(std::list< Message * > *queue=0)
void configure(const char *device, int retries)
Definition: Transport.cpp:164


husky_base
Author(s): Mike Purvis , Paul Bovbel
autogenerated on Fri Oct 2 2020 03:40:07