rx.cpp
Go to the documentation of this file.
00001 
00026 #include "nmea_comms/rx.h"
00027 #include "nmea_comms/tx.h"
00028 #include "nmea_comms/checksum.h"
00029 
00030 #include <poll.h>
00031 #include <sstream>
00032 #include <sys/ioctl.h>
00033 #include <boost/thread.hpp>
00034 #include <boost/algorithm/string.hpp>
00035 
00036 #include "ros/ros.h"
00037 #include "nmea_msgs/Sentence.h"
00038 
00039 #define RX_INITIAL_LENGTH 40
00040 #define RX_SUCCESSIVE_LENGTH 8
00041 
00042 static void _handle_sentence(ros::Publisher& publisher, ros::Time& stamp, const char* sentence, const char* frame_id)
00043 {
00044   ROS_DEBUG("Sentence RX: %s", sentence);
00045 
00046   static nmea_msgs::Sentence sentence_msg;
00047   sentence_msg.sentence = sentence;
00048   sentence_msg.header.stamp = stamp;
00049   sentence_msg.header.frame_id = frame_id;
00050   publisher.publish(sentence_msg);
00051 }
00052 
00053 
00054 static int threads_active = 1;
00055 
00056 static void _thread_func(ros::NodeHandle& n, int fd, std::string frame_id, uint32_t byte_time_ns = 0)
00057 {
00058   ROS_DEBUG("New connection handler thread beginning.");
00059 
00060   ros::Publisher pub = n.advertise<nmea_msgs::Sentence>("nmea_sentence", 5);
00061   ros::Subscriber sub = n.subscribe<nmea_msgs::Sentence>("nmea_sentence_out", 5,
00062                         boost::bind(tx_msg_callback, _1, fd));
00063 
00064   struct pollfd pollfds[] = { { fd, POLLIN, 0 } };
00065   char buffer[2048];
00066   char* buffer_write = buffer;
00067   char* buffer_end = &buffer[sizeof(buffer)];
00068 
00069   while (threads_active)
00070   {
00071     errno = 0;
00072     int retval = poll(pollfds, 1, 500);
00073     ROS_DEBUG("Poll retval=%d, errno=%d, revents=%d", retval, errno, pollfds[0].revents);
00074 
00075     if (retval == 0)
00076     {
00077       // No event, just 1 sec timeout.
00078       continue;
00079     }
00080     else if (retval < 0)
00081     {
00082       ROS_FATAL("Error polling device. Terminating node.");
00083       ros::shutdown();
00084     }
00085     else if (pollfds[0].revents & (POLLHUP | POLLERR | POLLNVAL))
00086     {
00087       ROS_INFO("Device error/hangup.");
00088       ROS_DEBUG("Shutting down publisher and subscriber.");
00089       pub.shutdown();
00090       sub.shutdown();
00091       ROS_DEBUG("Closing file descriptor.");
00092       close(fd);
00093       ROS_DEBUG("Exiting handler thread.");
00094       return;
00095     }
00096 
00097     // We can save some CPU by sleeping if the number waiting bytes is really small
00098     if (byte_time_ns > 0)
00099     {
00100       int waiting_bytes;
00101       errno = ioctl(fd, FIONREAD, &waiting_bytes);
00102       if (errno == 0)
00103       {
00104         int wait_for = 0;
00105         int buffer_plus_waiting = (buffer_write - buffer) + waiting_bytes;
00106         if (buffer_plus_waiting < RX_INITIAL_LENGTH)
00107         {
00108           wait_for = RX_INITIAL_LENGTH - buffer_plus_waiting;
00109         }
00110         else if (waiting_bytes < RX_SUCCESSIVE_LENGTH)
00111         {
00112           wait_for = RX_SUCCESSIVE_LENGTH - waiting_bytes;
00113         }
00114         if (wait_for > 0)
00115         {
00116           struct timespec req = { 0, wait_for * byte_time_ns }, rem;
00117           ROS_DEBUG_STREAM("Sleeping for " << wait_for << " bytes (" << byte_time_ns << " ns)");
00118           nanosleep(&req, &rem);
00119         }
00120       }
00121     }
00122 
00123     // Read in contents of buffer and null-terminate it.
00124     ros::Time now = ros::Time::now();
00125     errno = 0;
00126     retval = read(fd, buffer_write, buffer_end - buffer_write - 1);
00127     ROS_DEBUG("Read retval=%d, errno=%d", retval, errno);
00128     ROS_DEBUG_COND(retval < 0, "Read error: %s", strerror(errno));
00129     if (retval > 0)
00130     {
00131       if (strnlen(buffer_write, retval) != retval)
00132       {
00133         ROS_WARN("Null byte received from serial port, flushing buffer.");
00134         buffer_write = buffer;
00135         continue;
00136       }
00137       buffer_write += retval;
00138     }
00139     else if (retval == 0)
00140     {
00141       ROS_INFO("Device stream ended.");
00142       ROS_DEBUG("Shutting down publisher and subscriber.");
00143       pub.shutdown();
00144       sub.shutdown();
00145       ROS_DEBUG("Closing file descriptor.");
00146       close(fd);
00147       ROS_DEBUG("Exiting handler thread.");
00148       return;
00149     }
00150     else
00151     {
00152       // retval < 0, indicating an error of some kind.
00153       if (errno == EAGAIN)
00154       {
00155         // Can't read from the device, try again.
00156         continue;
00157       }
00158       else
00159       {
00160         ROS_FATAL("Error reading from device. retval=%d, errno=%d, revents=%d", retval, errno, pollfds[0].revents);
00161         ros::shutdown();
00162       }
00163     }
00164     ROS_DEBUG_STREAM("Buffer size after reading from fd: " << buffer_write - buffer);
00165     *buffer_write = '\0';
00166 
00167     char* buffer_read = buffer;
00168     while (1)
00169     {
00170       char* sentence = strchr(buffer_read, '$');
00171       if (sentence == NULL) break;
00172       char* sentence_end = strchr(sentence, '\r');
00173       if (sentence_end == NULL) break;
00174       *sentence_end = '\0';
00175       _handle_sentence(pub, now, sentence, frame_id.c_str());
00176       buffer_read = sentence_end + 1;
00177     }
00178 
00179     int remainder = buffer_write - buffer_read;
00180     if (remainder > 2000)
00181     {
00182       ROS_WARN("Buffer size >2000 bytes, resetting buffer.");
00183       remainder = 0;
00184     }
00185     ROS_DEBUG_STREAM("Remainder in buffer is: " << remainder);
00186     memmove(buffer, buffer_read, remainder);
00187     buffer_write = buffer + remainder;
00188   }
00189   close(fd);
00190 }
00191 
00192 
00193 static std::list<boost::thread*> rx_threads;
00194 
00195 int rx_prune_threads()
00196 {
00197   std::list<boost::thread*>::iterator thread_iter = rx_threads.begin();
00198   while (thread_iter != rx_threads.end())
00199   {
00200     if ((**thread_iter).timed_join(boost::posix_time::milliseconds(10)))
00201     {
00202       delete *thread_iter;
00203       thread_iter = rx_threads.erase(thread_iter);
00204     }
00205     else
00206     {
00207       ++thread_iter;
00208     }
00209   }
00210   return rx_threads.size();
00211 }
00212 
00213 void rx_stop_all()
00214 {
00215   threads_active = 0;
00216   int thread_close_i = 0;
00217   std::list<boost::thread*>::iterator thread_iter = rx_threads.begin();
00218   while (thread_iter != rx_threads.end())
00219   {
00220     if ((**thread_iter).timed_join(boost::posix_time::milliseconds(600)))
00221     {
00222       // Thread joined cleanly.
00223       thread_close_i++;
00224     }
00225     else
00226     {
00227       ROS_WARN("Thread required interrupt() to exit.");
00228       (**thread_iter).interrupt();
00229     }
00230     delete *thread_iter;
00231     thread_iter = rx_threads.erase(thread_iter);
00232   }
00233   ROS_INFO_STREAM("Closed " << thread_close_i << " thread(s) cleanly.");
00234 }
00235 
00236 void rx_thread_start(ros::NodeHandle& n, int fd, std::string frame_id, uint32_t byte_time_ns)
00237 {
00238   rx_prune_threads();
00239   rx_threads.push_back(new boost::thread(_thread_func, boost::ref(n), fd, frame_id, byte_time_ns));
00240 }


nmea_comms
Author(s): Mike Purvis
autogenerated on Thu Dec 22 2016 03:29:22