rx.cpp
Go to the documentation of this file.
1 
26 #include "nmea_comms/rx.h"
27 #include "nmea_comms/tx.h"
28 #include "nmea_comms/checksum.h"
29 
30 #include <poll.h>
31 #include <sstream>
32 #include <sys/ioctl.h>
33 #include <boost/thread.hpp>
34 #include <boost/algorithm/string.hpp>
35 
36 #include "ros/ros.h"
37 #include "nmea_msgs/Sentence.h"
38 
39 #define RX_INITIAL_LENGTH 40
40 #define RX_SUCCESSIVE_LENGTH 8
41 
42 static void _handle_sentence(ros::Publisher& publisher, ros::Time& stamp, const char* sentence, const char* frame_id)
43 {
44  ROS_DEBUG("Sentence RX: %s", sentence);
45 
46  static nmea_msgs::Sentence sentence_msg;
47  sentence_msg.sentence = sentence;
48  sentence_msg.header.stamp = stamp;
49  sentence_msg.header.frame_id = frame_id;
50  publisher.publish(sentence_msg);
51 }
52 
53 
54 static int threads_active = 1;
55 
56 static void _thread_func(ros::NodeHandle& n, int fd, std::string frame_id, uint32_t byte_time_ns = 0)
57 {
58  ROS_DEBUG("New connection handler thread beginning.");
59 
60  ros::Publisher pub = n.advertise<nmea_msgs::Sentence>("nmea_sentence", 5);
61  ros::Subscriber sub = n.subscribe<nmea_msgs::Sentence>("nmea_sentence_out", 5,
62  boost::bind(tx_msg_callback, _1, fd));
63 
64  struct pollfd pollfds[] = { { fd, POLLIN, 0 } };
65  char buffer[2048];
66  char* buffer_write = buffer;
67  char* buffer_end = &buffer[sizeof(buffer)];
68 
69  while (threads_active)
70  {
71  errno = 0;
72  int retval = poll(pollfds, 1, 500);
73  ROS_DEBUG("Poll retval=%d, errno=%d, revents=%d", retval, errno, pollfds[0].revents);
74 
75  if (retval == 0)
76  {
77  // No event, just 1 sec timeout.
78  continue;
79  }
80  else if (retval < 0)
81  {
82  ROS_FATAL("Error polling device. Terminating node.");
83  ros::shutdown();
84  }
85  else if (pollfds[0].revents & (POLLHUP | POLLERR | POLLNVAL))
86  {
87  ROS_INFO("Device error/hangup.");
88  ROS_DEBUG("Shutting down publisher and subscriber.");
89  pub.shutdown();
90  sub.shutdown();
91  ROS_DEBUG("Closing file descriptor.");
92  close(fd);
93  ROS_DEBUG("Exiting handler thread.");
94  return;
95  }
96 
97  // We can save some CPU by sleeping if the number waiting bytes is really small
98  if (byte_time_ns > 0)
99  {
100  int waiting_bytes;
101  errno = ioctl(fd, FIONREAD, &waiting_bytes);
102  if (errno == 0)
103  {
104  int wait_for = 0;
105  int buffer_plus_waiting = (buffer_write - buffer) + waiting_bytes;
106  if (buffer_plus_waiting < RX_INITIAL_LENGTH)
107  {
108  wait_for = RX_INITIAL_LENGTH - buffer_plus_waiting;
109  }
110  else if (waiting_bytes < RX_SUCCESSIVE_LENGTH)
111  {
112  wait_for = RX_SUCCESSIVE_LENGTH - waiting_bytes;
113  }
114  if (wait_for > 0)
115  {
116  struct timespec req = { 0, wait_for * byte_time_ns }, rem;
117  ROS_DEBUG_STREAM("Sleeping for " << wait_for << " bytes (" << byte_time_ns << " ns)");
118  nanosleep(&req, &rem);
119  }
120  }
121  }
122 
123  // Read in contents of buffer and null-terminate it.
124  ros::Time now = ros::Time::now();
125  errno = 0;
126  retval = read(fd, buffer_write, buffer_end - buffer_write - 1);
127  ROS_DEBUG("Read retval=%d, errno=%d", retval, errno);
128  ROS_DEBUG_COND(retval < 0, "Read error: %s", strerror(errno));
129  if (retval > 0)
130  {
131  if (strnlen(buffer_write, retval) != retval)
132  {
133  ROS_WARN("Null byte received from serial port, flushing buffer.");
134  buffer_write = buffer;
135  continue;
136  }
137  buffer_write += retval;
138  }
139  else if (retval == 0)
140  {
141  ROS_INFO("Device stream ended.");
142  ROS_DEBUG("Shutting down publisher and subscriber.");
143  pub.shutdown();
144  sub.shutdown();
145  ROS_DEBUG("Closing file descriptor.");
146  close(fd);
147  ROS_DEBUG("Exiting handler thread.");
148  return;
149  }
150  else
151  {
152  // retval < 0, indicating an error of some kind.
153  if (errno == EAGAIN)
154  {
155  // Can't read from the device, try again.
156  continue;
157  }
158  else
159  {
160  ROS_FATAL("Error reading from device. retval=%d, errno=%d, revents=%d", retval, errno, pollfds[0].revents);
161  ros::shutdown();
162  }
163  }
164  ROS_DEBUG_STREAM("Buffer size after reading from fd: " << buffer_write - buffer);
165  *buffer_write = '\0';
166 
167  char* buffer_read = buffer;
168  while (1)
169  {
170  char* sentence = strchr(buffer_read, '$');
171  if (sentence == NULL) break;
172  char* sentence_end = strchr(sentence, '\r');
173  if (sentence_end == NULL) break;
174  *sentence_end = '\0';
175  _handle_sentence(pub, now, sentence, frame_id.c_str());
176  buffer_read = sentence_end + 1;
177  }
178 
179  int remainder = buffer_write - buffer_read;
180  if (remainder > 2000)
181  {
182  ROS_WARN("Buffer size >2000 bytes, resetting buffer.");
183  remainder = 0;
184  }
185  ROS_DEBUG_STREAM("Remainder in buffer is: " << remainder);
186  memmove(buffer, buffer_read, remainder);
187  buffer_write = buffer + remainder;
188  }
189  close(fd);
190 }
191 
192 
193 static std::list<boost::thread*> rx_threads;
194 
196 {
197  std::list<boost::thread*>::iterator thread_iter = rx_threads.begin();
198  while (thread_iter != rx_threads.end())
199  {
200  if ((**thread_iter).timed_join(boost::posix_time::milliseconds(10)))
201  {
202  delete *thread_iter;
203  thread_iter = rx_threads.erase(thread_iter);
204  }
205  else
206  {
207  ++thread_iter;
208  }
209  }
210  return rx_threads.size();
211 }
212 
214 {
215  threads_active = 0;
216  int thread_close_i = 0;
217  std::list<boost::thread*>::iterator thread_iter = rx_threads.begin();
218  while (thread_iter != rx_threads.end())
219  {
220  if ((**thread_iter).timed_join(boost::posix_time::milliseconds(600)))
221  {
222  // Thread joined cleanly.
223  thread_close_i++;
224  }
225  else
226  {
227  ROS_WARN("Thread required interrupt() to exit.");
228  (**thread_iter).interrupt();
229  }
230  delete *thread_iter;
231  thread_iter = rx_threads.erase(thread_iter);
232  }
233  ROS_INFO_STREAM("Closed " << thread_close_i << " thread(s) cleanly.");
234 }
235 
236 void rx_thread_start(ros::NodeHandle& n, int fd, std::string frame_id, uint32_t byte_time_ns)
237 {
239  rx_threads.push_back(new boost::thread(_thread_func, boost::ref(n), fd, frame_id, byte_time_ns));
240 }
#define ROS_FATAL(...)
#define RX_INITIAL_LENGTH
Definition: rx.cpp:39
void publish(const boost::shared_ptr< M > &message) const
Subscriber subscribe(const std::string &topic, uint32_t queue_size, void(T::*fp)(M), T *obj, const TransportHints &transport_hints=TransportHints())
static void _thread_func(ros::NodeHandle &n, int fd, std::string frame_id, uint32_t byte_time_ns=0)
Definition: rx.cpp:56
void rx_thread_start(ros::NodeHandle &n, int fd, std::string frame_id, uint32_t byte_time_ns)
Definition: rx.cpp:236
#define ROS_WARN(...)
int rx_prune_threads()
Definition: rx.cpp:195
void tx_msg_callback(const nmea_msgs::SentenceConstPtr sentence_msg_ptr, int fd)
Definition: tx.cpp:37
#define ROS_INFO(...)
#define RX_SUCCESSIVE_LENGTH
Definition: rx.cpp:40
Publisher advertise(const std::string &topic, uint32_t queue_size, bool latch=false)
static int threads_active
Definition: rx.cpp:54
#define ROS_DEBUG_STREAM(args)
#define ROS_DEBUG_COND(cond,...)
static std::list< boost::thread * > rx_threads
Definition: rx.cpp:193
#define ROS_INFO_STREAM(args)
static Time now()
ROSCPP_DECL void shutdown()
static void _handle_sentence(ros::Publisher &publisher, ros::Time &stamp, const char *sentence, const char *frame_id)
Definition: rx.cpp:42
void rx_stop_all()
Definition: rx.cpp:213
#define ROS_DEBUG(...)


nmea_comms
Author(s): Mike Purvis
autogenerated on Mon Aug 5 2019 03:53:18