00001
00026 #include "nmea_comms/rx.h"
00027 #include "nmea_comms/tx.h"
00028 #include "nmea_comms/checksum.h"
00029
00030 #include <poll.h>
00031 #include <sstream>
00032 #include <sys/ioctl.h>
00033 #include <boost/thread.hpp>
00034 #include <boost/algorithm/string.hpp>
00035
00036 #include "ros/ros.h"
00037 #include "nmea_msgs/Sentence.h"
00038
00039 #define RX_INITIAL_LENGTH 40
00040 #define RX_SUCCESSIVE_LENGTH 8
00041
00042 static void _handle_sentence(ros::Publisher& publisher, ros::Time& stamp, const char* sentence, const char* frame_id)
00043 {
00044 ROS_DEBUG("Sentence RX: %s", sentence);
00045
00046 static nmea_msgs::Sentence sentence_msg;
00047 sentence_msg.sentence = sentence;
00048 sentence_msg.header.stamp = stamp;
00049 sentence_msg.header.frame_id = frame_id;
00050 publisher.publish(sentence_msg);
00051 }
00052
00053
00054 static int threads_active = 1;
00055
00056 static void _thread_func(ros::NodeHandle& n, int fd, std::string frame_id, uint32_t byte_time_ns = 0)
00057 {
00058 ROS_DEBUG("New connection handler thread beginning.");
00059
00060 ros::Publisher pub = n.advertise<nmea_msgs::Sentence>("nmea_sentence", 5);
00061 ros::Subscriber sub = n.subscribe<nmea_msgs::Sentence>("nmea_sentence_out", 5,
00062 boost::bind(tx_msg_callback, _1, fd));
00063
00064 struct pollfd pollfds[] = { { fd, POLLIN, 0 } };
00065 char buffer[2048];
00066 char* buffer_write = buffer;
00067 char* buffer_end = &buffer[sizeof(buffer)];
00068
00069 while (threads_active)
00070 {
00071 errno = 0;
00072 int retval = poll(pollfds, 1, 500);
00073 ROS_DEBUG("Poll retval=%d, errno=%d, revents=%d", retval, errno, pollfds[0].revents);
00074
00075 if (retval == 0)
00076 {
00077
00078 continue;
00079 }
00080 else if (retval < 0)
00081 {
00082 ROS_FATAL("Error polling device. Terminating node.");
00083 ros::shutdown();
00084 }
00085 else if (pollfds[0].revents & (POLLHUP | POLLERR | POLLNVAL))
00086 {
00087 ROS_INFO("Device error/hangup.");
00088 ROS_DEBUG("Shutting down publisher and subscriber.");
00089 pub.shutdown();
00090 sub.shutdown();
00091 ROS_DEBUG("Closing file descriptor.");
00092 close(fd);
00093 ROS_DEBUG("Exiting handler thread.");
00094 return;
00095 }
00096
00097
00098 if (byte_time_ns > 0)
00099 {
00100 int waiting_bytes;
00101 errno = ioctl(fd, FIONREAD, &waiting_bytes);
00102 if (errno == 0)
00103 {
00104 int wait_for = 0;
00105 int buffer_plus_waiting = (buffer_write - buffer) + waiting_bytes;
00106 if (buffer_plus_waiting < RX_INITIAL_LENGTH)
00107 {
00108 wait_for = RX_INITIAL_LENGTH - buffer_plus_waiting;
00109 }
00110 else if (waiting_bytes < RX_SUCCESSIVE_LENGTH)
00111 {
00112 wait_for = RX_SUCCESSIVE_LENGTH - waiting_bytes;
00113 }
00114 if (wait_for > 0)
00115 {
00116 struct timespec req = { 0, wait_for * byte_time_ns }, rem;
00117 ROS_DEBUG_STREAM("Sleeping for " << wait_for << " bytes (" << byte_time_ns << " ns)");
00118 nanosleep(&req, &rem);
00119 }
00120 }
00121 }
00122
00123
00124 ros::Time now = ros::Time::now();
00125 errno = 0;
00126 retval = read(fd, buffer_write, buffer_end - buffer_write - 1);
00127 ROS_DEBUG("Read retval=%d, errno=%d", retval, errno);
00128 ROS_DEBUG_COND(retval < 0, "Read error: %s", strerror(errno));
00129 if (retval > 0)
00130 {
00131 if (strnlen(buffer_write, retval) != retval)
00132 {
00133 ROS_WARN("Null byte received from serial port, flushing buffer.");
00134 buffer_write = buffer;
00135 continue;
00136 }
00137 buffer_write += retval;
00138 }
00139 else if (retval == 0)
00140 {
00141 ROS_INFO("Device stream ended.");
00142 ROS_DEBUG("Shutting down publisher and subscriber.");
00143 pub.shutdown();
00144 sub.shutdown();
00145 ROS_DEBUG("Closing file descriptor.");
00146 close(fd);
00147 ROS_DEBUG("Exiting handler thread.");
00148 return;
00149 }
00150 else
00151 {
00152
00153 if (errno == EAGAIN)
00154 {
00155
00156 continue;
00157 }
00158 else
00159 {
00160 ROS_FATAL("Error reading from device. retval=%d, errno=%d, revents=%d", retval, errno, pollfds[0].revents);
00161 ros::shutdown();
00162 }
00163 }
00164 ROS_DEBUG_STREAM("Buffer size after reading from fd: " << buffer_write - buffer);
00165 *buffer_write = '\0';
00166
00167 char* buffer_read = buffer;
00168 while (1)
00169 {
00170 char* sentence = strchr(buffer_read, '$');
00171 if (sentence == NULL) break;
00172 char* sentence_end = strchr(sentence, '\r');
00173 if (sentence_end == NULL) break;
00174 *sentence_end = '\0';
00175 _handle_sentence(pub, now, sentence, frame_id.c_str());
00176 buffer_read = sentence_end + 1;
00177 }
00178
00179 int remainder = buffer_write - buffer_read;
00180 if (remainder > 2000)
00181 {
00182 ROS_WARN("Buffer size >2000 bytes, resetting buffer.");
00183 remainder = 0;
00184 }
00185 ROS_DEBUG_STREAM("Remainder in buffer is: " << remainder);
00186 memmove(buffer, buffer_read, remainder);
00187 buffer_write = buffer + remainder;
00188 }
00189 close(fd);
00190 }
00191
00192
00193 static std::list<boost::thread*> rx_threads;
00194
00195 int rx_prune_threads()
00196 {
00197 std::list<boost::thread*>::iterator thread_iter = rx_threads.begin();
00198 while (thread_iter != rx_threads.end())
00199 {
00200 if ((**thread_iter).timed_join(boost::posix_time::milliseconds(10)))
00201 {
00202 delete *thread_iter;
00203 thread_iter = rx_threads.erase(thread_iter);
00204 }
00205 else
00206 {
00207 ++thread_iter;
00208 }
00209 }
00210 return rx_threads.size();
00211 }
00212
00213 void rx_stop_all()
00214 {
00215 threads_active = 0;
00216 int thread_close_i = 0;
00217 std::list<boost::thread*>::iterator thread_iter = rx_threads.begin();
00218 while (thread_iter != rx_threads.end())
00219 {
00220 if ((**thread_iter).timed_join(boost::posix_time::milliseconds(600)))
00221 {
00222
00223 thread_close_i++;
00224 }
00225 else
00226 {
00227 ROS_WARN("Thread required interrupt() to exit.");
00228 (**thread_iter).interrupt();
00229 }
00230 delete *thread_iter;
00231 thread_iter = rx_threads.erase(thread_iter);
00232 }
00233 ROS_INFO_STREAM("Closed " << thread_close_i << " thread(s) cleanly.");
00234 }
00235
00236 void rx_thread_start(ros::NodeHandle& n, int fd, std::string frame_id, uint32_t byte_time_ns)
00237 {
00238 rx_prune_threads();
00239 rx_threads.push_back(new boost::thread(_thread_func, boost::ref(n), fd, frame_id, byte_time_ns));
00240 }