Transport.cpp
Go to the documentation of this file.
00001 
00047 #include <string.h>
00048 #include <unistd.h>
00049 #include <cstdlib>
00050 #include <iostream>
00051 #include <vector>
00052 #include <string>
00053 #include <sstream>
00054 #include <ctime>
00055 
00056 #include "husky_base/horizon_legacy/Transport.h"
00057 #include "husky_base/horizon_legacy/Number.h"
00058 #include "husky_base/horizon_legacy/Message.h"
00059 #include "husky_base/horizon_legacy/Message_request.h"
00060 #include "husky_base/horizon_legacy/Message_cmd.h"
00061 #include "husky_base/horizon_legacy/serial.h"
00062 #include "husky_base/horizon_legacy/Logger.h"
00063 
00064 using namespace std;
00065 
00066 namespace clearpath
00067 {
00068 
00069   const char *Transport::counter_names[] = {
00070       "Garbled bytes",
00071       "Invalid messages",
00072       "Ignored acknowledgment",
00073       "Message queue overflow"
00074   };
00075 
00076   TransportException::TransportException(const char *msg, enum errors ex_type)
00077       : Exception(msg), type(ex_type)
00078   {
00079     if (msg)
00080     {
00081       CPR_EXCEPT() << "TransportException " << (int) type << ": " << message << endl << std::flush;
00082     }
00083   }
00084 
00085   BadAckException::BadAckException(unsigned int flag) :
00086       TransportException(NULL, TransportException::BAD_ACK_RESULT),
00087       ack_flag((enum ackFlags) flag)
00088   {
00089     switch (ack_flag)
00090     {
00091       case BAD_CHECKSUM:
00092         message = "Bad checksum";
00093         break;
00094       case BAD_TYPE:
00095         message = "Bad message type";
00096         break;
00097       case BAD_FORMAT:
00098         message = "Bad message format";
00099         break;
00100       case RANGE:
00101         message = "Range error";
00102         break;
00103       case OVER_FREQ:
00104         message = "Requested frequency too high";
00105         break;
00106       case OVER_SUBSCRIBE:
00107         message = "Too many subscriptions";
00108         break;
00109       default:
00110         message = "Unknown error code.";
00111         break;
00112     };
00113     stringstream ss;
00114 
00115     CPR_EXCEPT() << "BadAckException (0x" << hex << flag << dec << "): " << message << endl << flush;
00116   }
00117 
00118 #define CHECK_THROW_CONFIGURED() \
00119     do { \
00120         if( ! configured ) { \
00121             throw new TransportException("Transport not configured", TransportException::NOT_CONFIGURED); \
00122         } \
00123     } while( 0 )
00124 
00129   Transport &Transport::instance()
00130   {
00131     static Transport instance;
00132     return instance;
00133   }
00134 
00138   Transport::Transport() :
00139       configured(false),
00140       serial(0),
00141       retries(0)
00142   {
00143     for (int i = 0; i < NUM_COUNTERS; ++i)
00144     {
00145       counters[i] = 0;
00146     }
00147   }
00148 
00149   Transport::~Transport()
00150   {
00151     close();
00152   }
00153 
00164   void Transport::configure(const char *device, int retries)
00165   {
00166     if (configured)
00167     {
00168       // Close serial
00169       close();
00170     }
00171 
00172     // Forget old counters
00173     resetCounters();
00174 
00175     this->retries = retries;
00176 
00177     if (!openComm(device))
00178     {
00179       configured = true;
00180     }
00181     else
00182     {
00183       throw new TransportException("Failed to open serial port", TransportException::CONFIGURE_FAIL);
00184     }
00185   }
00186 
00192   int Transport::close()
00193   {
00194     int retval = 0;
00195     if (configured)
00196     {
00197       flush();
00198       retval = closeComm();
00199     }
00200     configured = false;
00201     return retval;
00202   }
00203 
00208   int Transport::openComm(const char *device)
00209   {
00210     int tmp = OpenSerial(&(this->serial), device);
00211     if (tmp < 0)
00212     {
00213       return -1;
00214     }
00215     tmp = SetupSerial(this->serial);
00216     if (tmp < 0)
00217     {
00218       return -2;
00219     }
00220     return 0;
00221   }
00222 
00226   int Transport::closeComm()
00227   {
00228     CloseSerial(this->serial);
00229     //serial = 0;
00230     return 0;
00231   }
00232 
00241   Message *Transport::rxMessage()
00242   {
00243     /* Each time this function is called, any available characters are added
00244      * to the receive buffer.  A new Message is created and returned when 
00245      * a complete message has been received (the message may be aggregated
00246      * from data received over multiple calls) */
00247     static char rx_buf[Message::MAX_MSG_LENGTH];
00248     static size_t rx_inx = 0;
00249     static size_t msg_len = 0;
00250 
00251     if (!rx_inx) { memset(rx_buf, 0xba, Message::MAX_MSG_LENGTH); }
00252 
00253     /* Read in and handle characters, one at a time. 
00254      * (This is a simple state machine, using 'rx_inx' as state) */
00255     while (ReadData(serial, rx_buf + rx_inx, 1) == 1)
00256     {
00257       switch (rx_inx)
00258       {
00259 
00260         /* Waiting for SOH */
00261         case 0:
00262           if ((uint8_t) (rx_buf[0]) == (uint8_t) (Message::SOH))
00263           {
00264             rx_inx++;
00265           }
00266           else { counters[GARBLE_BYTES]++; }
00267           break;
00268 
00269           /* Waiting for length */
00270         case 1:
00271           rx_inx++;
00272           break;
00273 
00274           /* Waiting for ~length */
00275         case 2:
00276           rx_inx++;
00277           msg_len = rx_buf[1] + 3;
00278 
00279           /* Check for valid length */
00280           if (static_cast<unsigned char>(rx_buf[1] ^ rx_buf[2]) != 0xFF ||
00281               (msg_len < Message::MIN_MSG_LENGTH))
00282           {
00283             counters[GARBLE_BYTES] += rx_inx;
00284             rx_inx = 0;
00285           }
00286 
00287           break;
00288 
00289           //case 9:
00290           //case 10:
00291           //    cout << hex << " " << (unsigned int)(rx_buf[rx_inx]);
00292           //    if(rx_inx==10) cout << endl;
00293 
00294           /* Waiting for the rest of the message */
00295         default:
00296           rx_inx++;
00297           if (rx_inx < msg_len) { break; }
00298           /* Finished rxing, reset this state machine and return msg */
00299           rx_inx = 0;
00300           Message *msg = Message::factory(rx_buf, msg_len);
00301           return msg;
00302 
00303       } // switch( rx_inx )
00304     } // while( get character )
00305 
00306     // Breaking out of loop indicates end of available serial input
00307     return NULL;
00308   }
00309 
00316   Message *Transport::getAck()
00317   {
00318     Message *msg = NULL;
00319 
00320     while ((msg = rxMessage()))
00321     {
00322       /* Queue any data messages that turn up */
00323       if (msg->isData())
00324       {
00325         enqueueMessage(msg);
00326         continue;
00327       }
00328 
00329       /* Drop invalid messages */
00330       if (!msg->isValid())
00331       {
00332         ++counters[INVALID_MSG];
00333         delete msg;
00334         continue;
00335       }
00336 
00337       return msg;
00338     }
00339 
00340     return NULL;
00341   }
00342 
00349   void Transport::enqueueMessage(Message *msg)
00350   {
00351     /* Reject invalid messages */
00352     if (!msg->isValid())
00353     {
00354       ++counters[INVALID_MSG];
00355       delete msg;
00356       return;
00357     }
00358 
00359     // Enqueue
00360     rx_queue.push_back(msg);
00361 
00362     /* Drop the oldest messages if the queue has overflowed */
00363     while (rx_queue.size() > MAX_QUEUE_LEN)
00364     {
00365       ++counters[QUEUE_FULL];
00366       delete rx_queue.front();
00367       rx_queue.pop_front();
00368     }
00369   }
00370 
00371 
00378   void Transport::poll()
00379   {
00380     CHECK_THROW_CONFIGURED();
00381 
00382     Message *msg = NULL;
00383 
00384     while ((msg = rxMessage()))
00385     {
00386       /* We're not waiting for acks, so drop them */
00387       if (!msg->isData())
00388       {
00389         ++counters[IGNORED_ACK];
00390         delete msg;
00391         continue;
00392       }
00393 
00394       // Message is good, queue it.
00395       enqueueMessage(msg);
00396     }
00397   }
00398 
00406   void Transport::send(Message *m)
00407   {
00408     CHECK_THROW_CONFIGURED();
00409 
00410     char skip_send = 0;
00411     Message *ack = NULL;
00412     int transmit_times = 0;
00413     short result_code;
00414 
00415     poll();
00416 
00417     while (1)
00418     {
00419       // We have exceeded our retry numbers
00420       if (transmit_times > this->retries)
00421       {
00422         break;
00423       }
00424       // Write output
00425       if (!skip_send) { WriteData(serial, (char *) (m->data), m->total_len); }
00426 
00427       // Wait up to 100 ms for ack
00428       for (int i = 0; i < RETRY_DELAY_MS; ++i)
00429       {
00430         usleep(1000);
00431         if ((ack = getAck())) { break; }
00432       }
00433 
00434       // No message - resend
00435       if (ack == NULL)
00436       {
00437         skip_send = 0;
00438         //cout << "No message received yet" << endl;
00439         transmit_times++;
00440         continue;
00441       }
00442 
00443       // Check result code
00444       // If the result code is bad, the message was still transmitted
00445       // successfully
00446       result_code = btou(ack->getPayloadPointer(), 2);
00447       if (result_code > 0)
00448       {
00449         throw new BadAckException(result_code);
00450       }
00451       else
00452       {
00453         // Everything's good - return
00454         break;
00455       }
00456       // Other failure
00457       transmit_times++;
00458     }
00459     if (ack == NULL)
00460     {
00461       throw new TransportException("Unacknowledged send", TransportException::UNACKNOWLEDGED_SEND);
00462     }
00463     delete ack;
00464 
00465     m->is_sent = true;
00466   }
00467 
00476   Message *Transport::popNext()
00477   {
00478     CHECK_THROW_CONFIGURED();
00479 
00480     poll();  // empty the current serial RX queue.
00481 
00482     if (rx_queue.empty()) { return NULL; }
00483 
00484     Message *next = rx_queue.front();
00485     rx_queue.pop_front();
00486     return next;
00487   }
00488 
00499   Message *Transport::popNext(enum MessageTypes type)
00500   {
00501     CHECK_THROW_CONFIGURED();
00502 
00503     poll(); // empty the current RX queue
00504 
00505     Message *next;
00506     list<Message *>::iterator iter;
00507     for (iter = rx_queue.begin(); iter != rx_queue.end(); ++iter)
00508     {
00509       if ((*iter)->getType() == type)
00510       {
00511         next = *iter;
00512         rx_queue.erase(iter);
00513         return next;
00514       }
00515     }
00516     return NULL;
00517   }
00518 
00525   Message *Transport::waitNext(double timeout)
00526   {
00527     CHECK_THROW_CONFIGURED();
00528 
00529     double elapsed = 0.0;
00530     while (true)
00531     {
00532       /* Return a message if it's turned up */
00533       poll();
00534       if (!rx_queue.empty()) { return popNext(); }
00535 
00536       /* Check timeout */
00537       if ((timeout != 0.0) && (elapsed > timeout))
00538       {
00539         // If we have a timeout set, and it has elapsed, exit.
00540         return NULL;
00541       }
00542 
00543       // Wait a ms before retry
00544       usleep(1000);
00545       elapsed += 0.001;
00546     }
00547   }
00548 
00557   Message *Transport::waitNext(enum MessageTypes type, double timeout)
00558   {
00559     CHECK_THROW_CONFIGURED();
00560 
00561     double elapsed = 0.0;
00562     Message *msg;
00563 
00564     while (true)
00565     {
00566       /* Check if the message has turned up
00567        * Since we're blocking, not doing anything useful anyway, it doesn't
00568        * really matter that we're iterating the entire message queue every spin. */
00569       poll();
00570       msg = popNext(type);
00571       if (msg) { return msg; }
00572 
00573       /* Check timeout */
00574       if ((timeout != 0.0) && (elapsed > timeout))
00575       {
00576         // If a timeout is set and has elapsed, fail out.
00577         return NULL;
00578       }
00579 
00580       // Wait a ms  before retry
00581       usleep(1000);
00582       elapsed += 0.001;
00583     }
00584   }
00585 
00593   void Transport::flush(list<Message *> *queue)
00594   {
00595     CHECK_THROW_CONFIGURED();
00596 
00597     poll(); // flush serial buffer
00598 
00599     /* Either delete or move all elements in the queue, depending
00600      * on whether a destination list is provided */
00601     list<Message *>::iterator iter;
00602     for (iter = rx_queue.begin(); iter != rx_queue.end(); ++iter)
00603     {
00604       if (queue)
00605       {
00606         queue->push_back(*iter);
00607       }
00608       else
00609       {
00610         delete *iter;
00611       }
00612     }
00613 
00614     rx_queue.clear();
00615   }
00616 
00626   void Transport::flush(enum MessageTypes type, list<Message *> *queue)
00627   {
00628     CHECK_THROW_CONFIGURED();
00629 
00630     poll();
00631 
00632     list<Message *>::iterator iter = rx_queue.begin();
00633     while (iter != rx_queue.end())
00634     {
00635       if ((*iter)->getType() == type)
00636       {
00637         /* Element is of flush type.  If there's a destination
00638          * list, move it.  Otherwise, destroy it */
00639         if (queue)
00640         {
00641           queue->push_back(*iter);
00642         }
00643         else
00644         {
00645           delete *iter;
00646         }
00647         // This advances to the next element in the queue:
00648         iter = rx_queue.erase(iter);
00649       }
00650       else
00651       {
00652         // Not interested in this element.  Next!
00653         iter++;
00654       }
00655     }
00656   }
00657 
00661   void Transport::printCounters(ostream &stream)
00662   {
00663     stream << "Transport Counters" << endl;
00664     stream << "==================" << endl;
00665 
00666     size_t longest_name = 0;
00667     size_t cur_len = 0;
00668     for (int i = 0; i < NUM_COUNTERS; ++i)
00669     {
00670       cur_len = strlen(counter_names[i]);
00671       if (cur_len > longest_name) { longest_name = cur_len; }
00672     }
00673 
00674     for (int i = 0; i < NUM_COUNTERS; ++i)
00675     {
00676       cout.width(longest_name);
00677       cout << left << counter_names[i] << ": " << counters[i] << endl;
00678     }
00679 
00680     cout.width(longest_name);
00681     cout << left << "Queue length" << ": " << rx_queue.size() << endl;
00682   }
00683 
00687   void Transport::resetCounters()
00688   {
00689     for (int i = 0; i < NUM_COUNTERS; ++i)
00690     {
00691       counters[i] = 0;
00692     }
00693   }
00694 
00695 }; // namespace clearpath
00696 


husky_base
Author(s): Mike Purvis , Paul Bovbel
autogenerated on Sat Jun 8 2019 18:26:01