Transport.cpp
Go to the documentation of this file.
1 
47 #include <string.h>
48 #include <unistd.h>
49 #include <cstdlib>
50 #include <iostream>
51 #include <vector>
52 #include <string>
53 #include <sstream>
54 #include <ctime>
55 
57 #include "roch_base/core/Number.h"
58 #include "roch_base/core/Message.h"
61 #include "roch_base/core/serial.h"
62 #include "roch_base/core/Logger.h"
63 
64 using namespace std;
65 
66 namespace sawyer
67 {
68 
69  const char *Transport::counter_names[] = {
70  "Garbled bytes",
71  "Invalid messages",
72  "Ignored acknowledgment",
73  "Message queue overflow"
74  };
75 
76  TransportException::TransportException(const char *msg, enum errors ex_type)
77  : Exception(msg), type(ex_type)
78  {
79  if (msg)
80  {
81  CPR_EXCEPT() << "TransportException " << (int) type << ": " << message << endl << std::flush;
82  }
83  }
84 
85  BadAckException::BadAckException(unsigned int flag) :
87  ack_flag((enum ackFlags) flag)
88  {
89  switch (ack_flag)
90  {
91  case BAD_CHECKSUM:
92  message = "Bad checksum";
93  break;
94  case BAD_TYPE:
95  message = "Bad message type";
96  break;
97  case BAD_FORMAT:
98  message = "Bad message format";
99  break;
100  case RANGE:
101  message = "Range error";
102  break;
103  case OVER_FREQ:
104  message = "Requested frequency too high";
105  break;
106  case OVER_SUBSCRIBE:
107  message = "Too many subscriptions";
108  break;
109  default:
110  message = "Unknown error code.";
111  break;
112  };
113  stringstream ss;
114 
115  CPR_EXCEPT() << "BadAckException (0x" << hex << flag << dec << "): " << message << endl << flush;
116  }
117 
118 #define CHECK_THROW_CONFIGURED() \
119  do { \
120  if( ! configured ) { \
121  throw new TransportException("Transport not configured", TransportException::NOT_CONFIGURED); \
122  } \
123  } while( 0 )
124 
130  {
131  static Transport instance;
132  return instance;
133  }
134 
139  configured(false),
140  serial(0),
141  retries(0)
142  {
143  for (int i = 0; i < NUM_COUNTERS; ++i)
144  {
145  counters[i] = 0;
146  }
147  }
148 
150  {
151  close();
152  }
153 
164  void Transport::configure(const char *device, int retries)
165  {
166 
167  if (configured)
168  {
169  // Close serial
170  close();
171  }
172 
173  // Forget old counters
174  resetCounters();
175 
176  this->retries = retries;
177  if (!openComm(device))
178  {
179  configured = true;
180  }
181  else
182  {
183  throw new TransportException("Failed to open serial port", TransportException::CONFIGURE_FAIL);
184  }
185  }
186 
193  {
194  int retval = 0;
195  if (configured)
196  {
197  flush();
198  retval = closeComm();
199  }
200  configured = false;
201  return retval;
202  }
203 
208  int Transport::openComm(const char *device)
209  {
210 
211  int tmp = rochDriver.OpenSerial(&(this->serial), device);
212  if (tmp < 0)
213  {
214  return -1;
215  }
216  tmp = rochDriver.SetupSerial(this->serial);
217  if (tmp < 0)
218  {
219  return -2;
220  }
221  return 0;
222  }
223 
228  {
230  //serial = 0;
231  return 0;
232  }
233 
243  {
244  /* Each time this function is called, any available characters are added
245  * to the receive buffer. A new Message is created and returned when
246  * a complete message has been received (the message may be aggregated
247  * from data received over multiple calls) */
248 
249  static char rx_buf[Message::MAX_MSG_LENGTH];
250  static size_t rx_inx = 0;
251  static size_t msg_len = 0;
252 
253  if (!rx_inx) { memset(rx_buf, 0xba, Message::MAX_MSG_LENGTH); }
254 
255  while ( rochDriver.ReadData(serial, rx_buf + rx_inx, 1)==1)
256  {
257  switch (rx_inx)
258  {
259 
260  /* Waiting for SOH */
261  case 0:
262  if ((uint8_t) (rx_buf[0]) == (uint8_t) (Message::SOH))
263  {
264  rx_inx++;
265  }
266  else { counters[GARBLE_BYTES]++; }
267  break;
268 
269  /* Waiting for length */
270  case 1:
271  rx_inx++;
272  break;
273 
274  /* Waiting for ~length */
275  case 2:
276  rx_inx++;
277  msg_len = rx_buf[1] + 3;//
278 
279  /* Check for valid length */
280  if (static_cast<unsigned char>(rx_buf[1] ^ rx_buf[2]) != 0xFF ||
281  (msg_len < Message::MIN_MSG_LENGTH))
282  {
283  counters[GARBLE_BYTES] += rx_inx;
284  rx_inx = 0;
285  }
286 
287  break;
288 
289  /* Waiting for the rest of the message */
290  default:
291  rx_inx++;
292  if (rx_inx < msg_len) { break; }
293  /* Finished rxing, reset this state machine and return msg */
294  rx_inx = 0;
296  rochDriver.rawData_.rawData.length = msg_len;
297  for (int i=0; i<msg_len; ++i) rochDriver.rawData_.rawData.data[i]= (unsigned char)(rx_buf[i]);
298 
299  Message *msg = Message::factory(rx_buf, msg_len);
300  return msg;
301 
302  } // switch( rx_inx )
303  } // while( get character )
304 
305  // Breaking out of loop indicates end of available serial input
306  return NULL;
307  }
308 
316  {
317  Message *msg = NULL;
318  while ((msg = rxMessage()))
319  {
320  /* Queue any data messages that turn up */
321  if (msg->isData())
322  {
325  for (int i=0; i<msg->getTotalLength(); ++i) rochDriver.rawData_.rawData.data[i]= (unsigned char)(msg->data[i]);
326  return msg;
327  continue;
328 
329  }
330 
331  /* Drop invalid messages */
332  if (!msg->isValid())
333  {
335  delete msg;
336  continue;
337  }
338  return msg;
339  }
340  return NULL;
341 
342  }
343 
351  {
352  /* Reject invalid messages */
353  if (!msg->isValid())
354  {
356  delete msg;
357  return;
358  }
359 
360  // Enqueue
361  rx_queue.push_back(msg);
362 
363  /* Drop the oldest messages if the queue has overflowed */
364  while (rx_queue.size() > MAX_QUEUE_LEN)
365  {
366  ++counters[QUEUE_FULL];
367  delete rx_queue.front();
368  rx_queue.pop_front();
369  }
370  }
371 
372 
380  {
382 
383  Message *msg = NULL;
384 
385  while ((msg = rxMessage()))
386  {
387 
388  /* We're not waiting for acks, so drop them */
389  if (!msg->isData())
390  {
392  delete msg;
393  continue;
394  }
395 
396  // Message is good, queue it.
397  enqueueMessage(msg);
398  }
399  }
400 
409  {
410 
411  counter_number = 0;
413 
414 
415  char skip_send = 0;
416  Message *ack = NULL;
417  Message *ack_temp = NULL;
418  int transmit_times = 0;
419  short result_code;
420  poll();
421  while (1)
422  {
423  // We have exceeded our retry numbers
424  if (transmit_times > this->retries)
425  {
426  break;
427  }
428  // Write output
429 
432  for (int i=0; i<m->getTotalLength(); ++i) rochDriver.rawData_.rawData.data[i]= (unsigned char)(m->data[i]);
433  if (!skip_send) { rochDriver.WriteData(serial, (char *) (m->data), m->total_len); }
434 
435  // Wait up to 100 ms for ack
436  for (int i = 0; i < RETRY_DELAY_MS; ++i)
437  {
438  usleep(1000);
439  if ((ack = getAck())) { counter_number++; ack_temp = ack; break;
440 
441  }
442  }
443  if(ack_temp == NULL)
444  {
445  skip_send = 0;
446  cout << "No message received yet" << endl;
447  transmit_times++;
448  continue;
449  }
450 
451  // Check result code
452  // If the result code is bad, the message was still transmitted
453  // successfully
454  result_code = btou(ack_temp->getPayloadPointer(), 2);
455 
456  if (result_code > 0 && !ack_temp->isData())
457  {
458  throw new BadAckException(result_code);
459  }
460  else
461  {
462  // Everything's good - return
463  break;
464  }
465  // Other failure
466  transmit_times++;
467  }
468  if (ack_temp == NULL)
469  {
470  throw new TransportException("Unacknowledged send", TransportException::UNACKNOWLEDGED_SEND);
471  }
472  delete ack_temp;
473 
474  m->is_sent = true;
475  }
476 
486  {
487  counter_number = 0;
489 
490  char skip_send = 0;
491  Message *ack = NULL;
492  Message *ack_temp = NULL;
493  int transmit_times = 0;
494  short result_code;
495  poll();
496  while (1)
497  {
498  // We have exceeded our retry numbers
499  if (transmit_times > this->retries)
500  {
501  break;
502  }
503  // Write output
504 
507  for (int i=0; i<m->getTotalLength(); ++i) rochDriver.rawData_.rawData.data[i]= (unsigned char)(m->data[i]);
508  if (!skip_send) { rochDriver.WriteData(serial, (char *) (m->data), m->total_len); }
509 
510  // Wait up to 100 ms for ack
511  for (int i = 0; i < RETRY_DELAY_MS; ++i)
512  {
513  usleep(1000);
514  if ((ack = getAck())) { counter_number++; ack_temp = ack;
515  break;
516  }
517  }
518  if(ack_temp == NULL)
519  {
520  skip_send = 0;
521  cout << "No message received yet" << endl;
522  transmit_times++;
523  continue;
524  }
525  // Check result code
526  // If the result code is bad, the message was still transmitted
527  // successfully
528  if(ack_temp->isData()) { return ack_temp;}
529  result_code = btou(ack_temp->getPayloadPointer(), 2);
530  if (result_code > 0 && !ack_temp->isData())
531  {
532  throw new BadAckException(result_code);
533  }
534  else
535  {
536  // Everything's good - return
537  break;
538  }
539  // Other failure
540  transmit_times++;
541  }
542  if (ack_temp == NULL)
543  {
544  throw new TransportException("Unacknowledged send", TransportException::UNACKNOWLEDGED_SEND);
545  }
546  delete ack_temp;
547 
548  m->is_sent = true;
549  }
559  {
561 
562  poll(); // empty the current serial RX queue.
563 
564  if (rx_queue.empty()) { return NULL; }
565 
566  Message *next = rx_queue.front();
567  rx_queue.pop_front();
568  return next;
569  }
570 
582  {
584 
585  poll(); // empty the current RX queue
586 
587  Message *next;
588  list<Message *>::iterator iter;
589  for (iter = rx_queue.begin(); iter != rx_queue.end(); ++iter)
590  {
591  if ((*iter)->getType() == type)
592  {
593  next = *iter;
594  rx_queue.erase(iter);
595  return next;
596  }
597  }
598  return NULL;
599  }
600 
607  Message *Transport::waitNext(double timeout)
608  {
610 
611  double elapsed = 0.0;
612  while (true)
613  {
614  /* Return a message if it's turned up */
615  poll();
616  if (!rx_queue.empty()) { return popNext(); }
617 
618  /* Check timeout */
619  if ((timeout != 0.0) && (elapsed > timeout))
620  {
621  // If we have a timeout set, and it has elapsed, exit.
622  return NULL;
623  }
624 
625  // Wait a ms before retry
626  usleep(1000);
627  elapsed += 0.001;
628  }
629  }
630 
639  Message *Transport::waitNext(enum MessageTypes type, double timeout)
640  {
642 
643  double elapsed = 0.0;
644  Message *msg;
645 
646  while (true)
647  {
648  /* Check if the message has turned up
649  * Since we're blocking, not doing anything useful anyway, it doesn't
650  * really matter that we're iterating the entire message queue every spin. */
651 
652  poll();
653  if (msg = Request(type-0x4000,timeout).sendRequest()) return msg;
654 
655  /* Check timeout */
656  if ((timeout != 0.0) && (elapsed > timeout))
657  {
658  return NULL;
659  }
660 
661  // Wait a ms before retry
662  usleep(1000);
663  elapsed += 0.001;
664  }
665  }
666 
674  void Transport::flush(list<Message *> *queue)
675  {
677 
678  poll(); // flush serial buffer
679 
680  /* Either delete or move all elements in the queue, depending
681  * on whether a destination list is provided */
682  list<Message *>::iterator iter;
683  for (iter = rx_queue.begin(); iter != rx_queue.end(); ++iter)
684  {
685  if (queue)
686  {
687  queue->push_back(*iter);
688  }
689  else
690  {
691  delete *iter;
692  }
693  }
694 
695  rx_queue.clear();
696 
697  }
698 
708  void Transport::flush(enum MessageTypes type, list<Message *> *queue)
709  {
711  poll();
712  list<Message *>::iterator iter = rx_queue.begin();
713  while (iter != rx_queue.end())
714  {
715  if ((*iter)->getType() == type)
716  {
717  /* Element is of flush type. If there's a destination
718  * list, move it. Otherwise, destroy it */
719  if (queue)
720  {
721  queue->push_back(*iter);
722  }
723  else
724  {
725  delete *iter;
726  }
727  // This advances to the next element in the queue:
728  iter = rx_queue.erase(iter);
729  }
730  else
731  {
732  // Not interested in this element. Next!
733  iter++;
734  }
735  }
736  }
737 
741  void Transport::printCounters(ostream &stream)
742  {
743  stream << "Transport Counters" << endl;
744  stream << "==================" << endl;
745 
746  size_t longest_name = 0;
747  size_t cur_len = 0;
748  for (int i = 0; i < NUM_COUNTERS; ++i)
749  {
750  cur_len = strlen(counter_names[i]);
751  if (cur_len > longest_name) { longest_name = cur_len; }
752  }
753 
754  for (int i = 0; i < NUM_COUNTERS; ++i)
755  {
756  cout.width(longest_name);
757  cout << left << counter_names[i] << ": " << counters[i] << endl;
758  }
759 
760  cout.width(longest_name);
761  cout << left << "Queue length" << ": " << rx_queue.size() << endl;
762  }
763 
768  {
769  for (int i = 0; i < NUM_COUNTERS; ++i)
770  {
771  counters[i] = 0;
772  }
773  }
774 
775 }; // namespace sawyer
776 
static const size_t MAX_QUEUE_LEN
Definition: Transport.h:112
msg
void clear()
Definition: serial.h:68
Message * sendRequest(Message *m)
Definition: Transport.cpp:485
static const int RETRY_DELAY_MS
Definition: Transport.h:109
static const size_t MAX_MSG_LENGTH
Definition: Message.h:70
bool isValid(char *whyNot=NULL, size_t strLen=0)
Definition: Message.cpp:329
unsigned long counters[NUM_COUNTERS]
Definition: Transport.h:114
uint64_t btou(void *src, size_t src_len)
Definition: Number.cpp:120
size_t total_len
Definition: Message.h:74
static Transport & instance()
Definition: Transport.cpp:129
MessageTypes
Definition: Message.h:207
#define CPR_EXCEPT()
Definition: Logger.h:97
static const size_t MIN_MSG_LENGTH
Definition: Message.h:102
static Message * factory(void *input, size_t msg_len)
Definition: Message.cpp:423
uint8_t * getPayloadPointer(size_t offset=0)
Definition: Message.cpp:218
Message * getAck()
Definition: Transport.cpp:315
int SetupSerial(void *handle)
bool is_sent
Definition: Message.h:97
void printCounters(std::ostream &stream=std::cout)
Definition: Transport.cpp:741
int OpenSerial(void **handle, const char *port_name)
bool isData()
Definition: Message.h:187
const char * message
Definition: Exception.h:49
Message * popNext()
Definition: Transport.cpp:558
BadAckException(unsigned int flag)
Definition: Transport.cpp:85
void flush(std::list< Message * > *queue=0)
int CloseSerial(void *handle)
Message * waitNext(double timeout=0.0)
Definition: Transport.cpp:607
int WriteData(void *handle, const char *buffer, int length)
std::list< Message * > rx_queue
Definition: Transport.h:111
static const char * counter_names[NUM_COUNTERS]
Definition: Transport.h:100
base_data rawData_
Definition: serial.h:94
unsigned char data[MAX_MSG_LENGTH]
Definition: serial.h:62
#define CHECK_THROW_CONFIGURED()
Definition: Transport.cpp:118
int ReadData(void *handle, char *buffer, int length)
void configure(const char *device, int retries)
Definition: Transport.cpp:164
int openComm(const char *device)
Definition: Transport.cpp:208
void enqueueMessage(Message *msg)
Definition: Transport.cpp:350
roch_driver rochDriver
Definition: Transport.h:108
Message * rxMessage()
Definition: Transport.cpp:242
void send(Message *m)
Definition: Transport.cpp:408
uint8_t data[MAX_MSG_LENGTH]
Definition: Message.h:72
static const uint8_t SOH
Definition: Message.h:103
enum sawyer::BadAckException::ackFlags ack_flag
size_t getTotalLength()
Definition: Message.h:168
struct sawyer::base_data::RawData rawData


roch_base
Author(s): Mike Purvis , Paul Bovbel , Chen
autogenerated on Mon Jun 10 2019 14:41:14