Transport.cpp
Go to the documentation of this file.
00001 
00047 #include <string.h>
00048 #include <unistd.h>
00049 #include <cstdlib>
00050 #include <iostream>
00051 #include <vector>
00052 #include <string>
00053 #include <sstream>
00054 #include <ctime>
00055 
00056 #include "roch_base/core/Transport.h"
00057 #include "roch_base/core/Number.h"
00058 #include "roch_base/core/Message.h"
00059 #include "roch_base/core/Message_request.h"
00060 #include "roch_base/core/Message_cmd.h"
00061 #include "roch_base/core/serial.h"
00062 #include "roch_base/core/Logger.h"
00063 
00064 using namespace std;
00065 
00066 namespace sawyer
00067 {
00068 
00069   const char *Transport::counter_names[] = {
00070       "Garbled bytes",
00071       "Invalid messages",
00072       "Ignored acknowledgment",
00073       "Message queue overflow"
00074   };
00075 
00076   TransportException::TransportException(const char *msg, enum errors ex_type)
00077       : Exception(msg), type(ex_type)
00078   {
00079     if (msg)
00080     {
00081       CPR_EXCEPT() << "TransportException " << (int) type << ": " << message << endl << std::flush;
00082     }
00083   }
00084 
00085   BadAckException::BadAckException(unsigned int flag) :
00086       TransportException(NULL, TransportException::BAD_ACK_RESULT),
00087       ack_flag((enum ackFlags) flag)
00088   {
00089     switch (ack_flag)
00090     {
00091       case BAD_CHECKSUM:
00092         message = "Bad checksum";
00093         break;
00094       case BAD_TYPE:
00095         message = "Bad message type";
00096         break;
00097       case BAD_FORMAT:
00098         message = "Bad message format";
00099         break;
00100       case RANGE:
00101         message = "Range error";
00102         break;
00103       case OVER_FREQ:
00104         message = "Requested frequency too high";
00105         break;
00106       case OVER_SUBSCRIBE:
00107         message = "Too many subscriptions";
00108         break;
00109       default:
00110         message = "Unknown error code.";
00111         break;
00112     };
00113     stringstream ss;
00114 
00115     CPR_EXCEPT() << "BadAckException (0x" << hex << flag << dec << "): " << message << endl << flush;
00116   }
00117 
00118 #define CHECK_THROW_CONFIGURED() \
00119     do { \
00120         if( ! configured ) { \
00121             throw new TransportException("Transport not configured", TransportException::NOT_CONFIGURED); \
00122         } \
00123     } while( 0 )
00124 
00129   Transport &Transport::instance()
00130   {
00131     static Transport instance;
00132     return instance;
00133   }
00134 
00138   Transport::Transport() :
00139       configured(false),
00140       serial(0),
00141       retries(0)
00142   {
00143     for (int i = 0; i < NUM_COUNTERS; ++i)
00144     {
00145       counters[i] = 0;
00146     }
00147   }
00148 
00149   Transport::~Transport()
00150   {
00151     close();
00152   }
00153 
00164   void Transport::configure(const char *device, int retries)
00165   {
00166    
00167     if (configured)
00168     {
00169       // Close serial
00170       close();
00171     }
00172 
00173     // Forget old counters
00174     resetCounters();
00175 
00176     this->retries = retries;
00177     if (!openComm(device))
00178     {
00179       configured = true;
00180     }
00181     else
00182     {
00183       throw new TransportException("Failed to open serial port", TransportException::CONFIGURE_FAIL);
00184     }
00185   }
00186 
00192   int Transport::close()
00193   {
00194     int retval = 0;
00195     if (configured)
00196     {
00197       flush();
00198       retval = closeComm();
00199     }
00200     configured = false;
00201     return retval;
00202   }
00203 
00208   int Transport::openComm(const char *device)
00209   {
00210      
00211     int tmp = rochDriver.OpenSerial(&(this->serial), device);
00212     if (tmp < 0)
00213     {
00214       return -1;
00215     }
00216     tmp = rochDriver.SetupSerial(this->serial);
00217     if (tmp < 0)
00218     {
00219       return -2;
00220     }
00221     return 0;
00222   }
00223 
00227   int Transport::closeComm()
00228   {
00229     rochDriver.CloseSerial(this->serial);
00230     //serial = 0;
00231     return 0;
00232   }
00233 
00242   Message *Transport::rxMessage()
00243   {
00244     /* Each time this function is called, any available characters are added
00245      * to the receive buffer.  A new Message is created and returned when
00246      * a complete message has been received (the message may be aggregated
00247      * from data received over multiple calls) */
00248 
00249     static char rx_buf[Message::MAX_MSG_LENGTH];
00250     static size_t rx_inx = 0;
00251     static size_t msg_len = 0;
00252 
00253     if (!rx_inx) { memset(rx_buf, 0xba, Message::MAX_MSG_LENGTH); }
00254 
00255     while ( rochDriver.ReadData(serial, rx_buf + rx_inx, 1)==1)
00256     {
00257       switch (rx_inx)
00258       {
00259 
00260         /* Waiting for SOH */
00261         case 0:
00262           if ((uint8_t) (rx_buf[0]) == (uint8_t) (Message::SOH))
00263           {
00264             rx_inx++;
00265           }
00266           else { counters[GARBLE_BYTES]++; }
00267           break;
00268 
00269           /* Waiting for length */
00270         case 1:
00271           rx_inx++;
00272           break;
00273 
00274           /* Waiting for ~length */
00275         case 2:
00276           rx_inx++;
00277           msg_len = rx_buf[1] + 3;//
00278 
00279           /* Check for valid length */
00280           if (static_cast<unsigned char>(rx_buf[1] ^ rx_buf[2]) != 0xFF ||
00281               (msg_len < Message::MIN_MSG_LENGTH))
00282           {
00283             counters[GARBLE_BYTES] += rx_inx;
00284             rx_inx = 0;
00285           }
00286 
00287           break;
00288 
00289           /* Waiting for the rest of the message */
00290         default:
00291           rx_inx++;
00292           if (rx_inx < msg_len) { break; }
00293           /* Finished rxing, reset this state machine and return msg */
00294           rx_inx = 0;
00295           rochDriver.rawData_.clear();
00296           rochDriver.rawData_.rawData.length = msg_len;
00297           for (int i=0; i<msg_len; ++i) rochDriver.rawData_.rawData.data[i]= (unsigned char)(rx_buf[i]);
00298           
00299           Message *msg = Message::factory(rx_buf, msg_len);
00300           return msg;
00301 
00302       } // switch( rx_inx )
00303     } // while( get character )
00304 
00305     // Breaking out of loop indicates end of available serial input
00306     return NULL;
00307   }
00308 
00315   Message *Transport::getAck()
00316   {
00317     Message *msg = NULL;
00318     while ((msg = rxMessage()))
00319     {
00320       /* Queue any data messages that turn up */
00321       if (msg->isData())
00322       {
00323           rochDriver.rawData_.clear();
00324           rochDriver.rawData_.rawData.length = msg->getTotalLength();
00325           for (int i=0; i<msg->getTotalLength(); ++i) rochDriver.rawData_.rawData.data[i]= (unsigned char)(msg->data[i]);
00326         return msg;
00327         continue;
00328         
00329       }
00330 
00331       /* Drop invalid messages */
00332       if (!msg->isValid())
00333       {
00334         ++counters[INVALID_MSG];
00335         delete msg;
00336         continue;
00337       }
00338       return msg;
00339     }
00340     return NULL;
00341 
00342   }
00343 
00350   void Transport::enqueueMessage(Message *msg)
00351   {
00352     /* Reject invalid messages */
00353     if (!msg->isValid())
00354     {
00355       ++counters[INVALID_MSG];
00356       delete msg;
00357       return;
00358     }
00359 
00360     // Enqueue
00361     rx_queue.push_back(msg);
00362     
00363     /* Drop the oldest messages if the queue has overflowed */
00364     while (rx_queue.size() > MAX_QUEUE_LEN)
00365     {
00366       ++counters[QUEUE_FULL];
00367       delete rx_queue.front();
00368       rx_queue.pop_front();
00369     }
00370   }
00371 
00372 
00379   void Transport::poll()
00380   {
00381     CHECK_THROW_CONFIGURED();
00382 
00383     Message *msg = NULL;
00384 
00385     while ((msg = rxMessage()))
00386     {
00387         
00388       /* We're not waiting for acks, so drop them */
00389       if (!msg->isData())
00390       {
00391         ++counters[IGNORED_ACK];
00392         delete msg;
00393         continue;
00394       }
00395 
00396       // Message is good, queue it.
00397       enqueueMessage(msg);
00398     }
00399   }
00400 
00408   void Transport::send(Message *m)
00409   {
00410    
00411     counter_number = 0;
00412     CHECK_THROW_CONFIGURED();
00413 
00414  
00415     char skip_send = 0;
00416     Message *ack = NULL;
00417     Message *ack_temp = NULL;
00418     int transmit_times = 0;
00419     short result_code;
00420     poll();
00421     while (1)
00422     {
00423       // We have exceeded our retry numbers
00424       if (transmit_times > this->retries)
00425       {
00426         break;
00427       }
00428       // Write output
00429       
00430           rochDriver.rawData_.clear();
00431           rochDriver.rawData_.rawData.length = m->getTotalLength();
00432           for (int i=0; i<m->getTotalLength(); ++i) rochDriver.rawData_.rawData.data[i]= (unsigned char)(m->data[i]);
00433       if (!skip_send) { rochDriver.WriteData(serial, (char *) (m->data), m->total_len); }
00434 
00435       // Wait up to 100 ms for ack
00436       for (int i = 0; i < RETRY_DELAY_MS; ++i)
00437       {
00438         usleep(1000);
00439         if ((ack = getAck())) {   counter_number++; ack_temp = ack; break; 
00440           
00441         }
00442       }
00443       if(ack_temp == NULL)
00444       {
00445         skip_send = 0;
00446         cout << "No message received yet" << endl;
00447         transmit_times++;
00448         continue;
00449       }
00450 
00451       // Check result code
00452       // If the result code is bad, the message was still transmitted
00453       // successfully
00454       result_code = btou(ack_temp->getPayloadPointer(), 2);
00455       
00456       if (result_code > 0 && !ack_temp->isData())
00457       {
00458         throw new BadAckException(result_code);
00459       }
00460       else
00461       {
00462         // Everything's good - return
00463         break;
00464       }
00465       // Other failure
00466       transmit_times++;
00467     }
00468     if (ack_temp == NULL)
00469     {
00470       throw new TransportException("Unacknowledged send", TransportException::UNACKNOWLEDGED_SEND);
00471     }
00472     delete ack_temp;
00473 
00474     m->is_sent = true;
00475   }
00476 
00485   Message* Transport::sendRequest(Message *m)
00486   {
00487     counter_number = 0;
00488     CHECK_THROW_CONFIGURED();
00489     
00490     char skip_send = 0;
00491     Message *ack = NULL;
00492     Message *ack_temp = NULL;
00493     int transmit_times = 0;
00494     short result_code;
00495     poll();
00496     while (1)
00497     {
00498       // We have exceeded our retry numbers
00499       if (transmit_times > this->retries)
00500       {
00501         break;
00502       }
00503       // Write output
00504       
00505           rochDriver.rawData_.clear();
00506           rochDriver.rawData_.rawData.length = m->getTotalLength();
00507           for (int i=0; i<m->getTotalLength(); ++i) rochDriver.rawData_.rawData.data[i]= (unsigned char)(m->data[i]);
00508       if (!skip_send) { rochDriver.WriteData(serial, (char *) (m->data), m->total_len); }
00509 
00510       // Wait up to 100 ms for ack
00511       for (int i = 0; i < RETRY_DELAY_MS; ++i)
00512       {
00513         usleep(1000);
00514         if ((ack = getAck())) {   counter_number++; ack_temp = ack;
00515           break; 
00516         }
00517       }
00518       if(ack_temp == NULL)
00519       {
00520         skip_send = 0;
00521         cout << "No message received yet" << endl;
00522         transmit_times++;
00523         continue;
00524       }
00525       // Check result code
00526       // If the result code is bad, the message was still transmitted
00527       // successfully
00528       if(ack_temp->isData()) {  return ack_temp;}
00529       result_code = btou(ack_temp->getPayloadPointer(), 2);
00530       if (result_code > 0 && !ack_temp->isData())
00531       {
00532         throw new BadAckException(result_code);
00533       }
00534       else
00535       {
00536         // Everything's good - return
00537         break;
00538       }
00539       // Other failure
00540       transmit_times++;
00541     }
00542     if (ack_temp == NULL)
00543     {
00544       throw new TransportException("Unacknowledged send", TransportException::UNACKNOWLEDGED_SEND);
00545     }
00546     delete ack_temp;
00547 
00548     m->is_sent = true;
00549   }
00558   Message *Transport::popNext()
00559   {
00560     CHECK_THROW_CONFIGURED();
00561 
00562     poll();  // empty the current serial RX queue.
00563 
00564     if (rx_queue.empty()) { return NULL; }
00565 
00566     Message *next = rx_queue.front();
00567     rx_queue.pop_front();
00568     return next;
00569   }
00570 
00581   Message *Transport::popNext(enum MessageTypes type)
00582   {
00583     CHECK_THROW_CONFIGURED();
00584 
00585     poll(); // empty the current RX queue
00586 
00587     Message *next;
00588     list<Message *>::iterator iter;
00589     for (iter = rx_queue.begin(); iter != rx_queue.end(); ++iter)
00590     {
00591       if ((*iter)->getType() == type)
00592       {
00593         next = *iter;
00594         rx_queue.erase(iter);
00595         return next;
00596       }
00597     }
00598     return NULL;
00599   }
00600 
00607   Message *Transport::waitNext(double timeout)
00608   {
00609     CHECK_THROW_CONFIGURED();
00610 
00611     double elapsed = 0.0;
00612     while (true)
00613     {
00614       /* Return a message if it's turned up */
00615       poll();
00616       if (!rx_queue.empty()) { return popNext(); }
00617 
00618       /* Check timeout */
00619       if ((timeout != 0.0) && (elapsed > timeout))
00620       {
00621         // If we have a timeout set, and it has elapsed, exit.
00622         return NULL;
00623       }
00624 
00625       // Wait a ms before retry
00626       usleep(1000);
00627       elapsed += 0.001;
00628     }
00629   }
00630 
00639   Message *Transport::waitNext(enum MessageTypes type, double timeout)
00640   {
00641     CHECK_THROW_CONFIGURED();
00642 
00643     double elapsed = 0.0;
00644     Message *msg;
00645 
00646     while (true)
00647     {
00648       /* Check if the message has turned up
00649        * Since we're blocking, not doing anything useful anyway, it doesn't
00650        * really matter that we're iterating the entire message queue every spin. */
00651   
00652       poll();
00653       if (msg = Request(type-0x4000,timeout).sendRequest()) return msg; 
00654         
00655       /* Check timeout */
00656       if ((timeout != 0.0) && (elapsed > timeout))
00657       {
00658         return NULL;
00659       }
00660 
00661       // Wait a ms  before retry
00662       usleep(1000);
00663       elapsed += 0.001;
00664     }
00665   }
00666 
00674   void Transport::flush(list<Message *> *queue)
00675   {
00676     CHECK_THROW_CONFIGURED();
00677 
00678     poll(); // flush serial buffer
00679 
00680     /* Either delete or move all elements in the queue, depending
00681      * on whether a destination list is provided */
00682     list<Message *>::iterator iter;
00683     for (iter = rx_queue.begin(); iter != rx_queue.end(); ++iter)
00684     {
00685       if (queue)
00686       {
00687         queue->push_back(*iter);
00688       }
00689       else
00690       {
00691         delete *iter;
00692       }
00693     }
00694 
00695     rx_queue.clear();
00696 
00697   }
00698 
00708   void Transport::flush(enum MessageTypes type, list<Message *> *queue)
00709   {
00710     CHECK_THROW_CONFIGURED();
00711     poll();
00712     list<Message *>::iterator iter = rx_queue.begin();
00713     while (iter != rx_queue.end())
00714     {
00715       if ((*iter)->getType() == type)
00716       {
00717         /* Element is of flush type.  If there's a destination
00718          * list, move it.  Otherwise, destroy it */
00719         if (queue)
00720         {
00721           queue->push_back(*iter);
00722         }
00723         else
00724         {
00725           delete *iter;
00726         }
00727         // This advances to the next element in the queue:
00728         iter = rx_queue.erase(iter);
00729       }
00730       else
00731       {
00732         // Not interested in this element.  Next!
00733         iter++;
00734       }
00735     }
00736   }
00737 
00741   void Transport::printCounters(ostream &stream)
00742   {
00743     stream << "Transport Counters" << endl;
00744     stream << "==================" << endl;
00745 
00746     size_t longest_name = 0;
00747     size_t cur_len = 0;
00748     for (int i = 0; i < NUM_COUNTERS; ++i)
00749     {
00750       cur_len = strlen(counter_names[i]);
00751       if (cur_len > longest_name) { longest_name = cur_len; }
00752     }
00753 
00754     for (int i = 0; i < NUM_COUNTERS; ++i)
00755     {
00756       cout.width(longest_name);
00757       cout << left << counter_names[i] << ": " << counters[i] << endl;
00758     }
00759 
00760     cout.width(longest_name);
00761     cout << left << "Queue length" << ": " << rx_queue.size() << endl;
00762   }
00763 
00767   void Transport::resetCounters()
00768   {
00769     for (int i = 0; i < NUM_COUNTERS; ++i)
00770     {
00771       counters[i] = 0;
00772     }
00773   }
00774 
00775 }; // namespace sawyer
00776 


roch_base
Author(s): Mike Purvis , Paul Bovbel , Carl
autogenerated on Sat Jun 8 2019 20:32:33