32 #include <sys/ioctl.h> 33 #include <boost/thread.hpp> 34 #include <boost/algorithm/string.hpp> 37 #include "nmea_msgs/Sentence.h" 39 #define RX_INITIAL_LENGTH 40 40 #define RX_SUCCESSIVE_LENGTH 8 46 static nmea_msgs::Sentence sentence_msg;
47 sentence_msg.sentence = sentence;
48 sentence_msg.header.stamp = stamp;
49 sentence_msg.header.frame_id = frame_id;
50 publisher.
publish(sentence_msg);
58 ROS_DEBUG(
"New connection handler thread beginning.");
64 struct pollfd pollfds[] = { { fd, POLLIN, 0 } };
66 char* buffer_write = buffer;
67 char* buffer_end = &buffer[
sizeof(buffer)];
72 int retval = poll(pollfds, 1, 500);
73 ROS_DEBUG(
"Poll retval=%d, errno=%d, revents=%d", retval, errno, pollfds[0].revents);
82 ROS_FATAL(
"Error polling device. Terminating node.");
85 else if (pollfds[0].revents & (POLLHUP | POLLERR | POLLNVAL))
88 ROS_DEBUG(
"Shutting down publisher and subscriber.");
101 errno = ioctl(fd, FIONREAD, &waiting_bytes);
105 int buffer_plus_waiting = (buffer_write - buffer) + waiting_bytes;
116 struct timespec req = { 0, wait_for * byte_time_ns }, rem;
117 ROS_DEBUG_STREAM(
"Sleeping for " << wait_for <<
" bytes (" << byte_time_ns <<
" ns)");
118 nanosleep(&req, &rem);
126 retval = read(fd, buffer_write, buffer_end - buffer_write - 1);
127 ROS_DEBUG(
"Read retval=%d, errno=%d", retval, errno);
131 if (strnlen(buffer_write, retval) != retval)
133 ROS_WARN(
"Null byte received from serial port, flushing buffer.");
134 buffer_write = buffer;
137 buffer_write += retval;
139 else if (retval == 0)
142 ROS_DEBUG(
"Shutting down publisher and subscriber.");
160 ROS_FATAL(
"Error reading from device. retval=%d, errno=%d, revents=%d", retval, errno, pollfds[0].revents);
164 ROS_DEBUG_STREAM(
"Buffer size after reading from fd: " << buffer_write - buffer);
165 *buffer_write =
'\0';
167 char* buffer_read = buffer;
170 char* sentence = strchr(buffer_read,
'$');
171 if (sentence == NULL)
break;
172 char* sentence_end = strchr(sentence,
'\r');
173 if (sentence_end == NULL)
break;
174 *sentence_end =
'\0';
176 buffer_read = sentence_end + 1;
179 int remainder = buffer_write - buffer_read;
180 if (remainder > 2000)
182 ROS_WARN(
"Buffer size >2000 bytes, resetting buffer.");
186 memmove(buffer, buffer_read, remainder);
187 buffer_write = buffer + remainder;
197 std::list<boost::thread*>::iterator thread_iter =
rx_threads.begin();
200 if ((**thread_iter).timed_join(boost::posix_time::milliseconds(10)))
216 int thread_close_i = 0;
217 std::list<boost::thread*>::iterator thread_iter =
rx_threads.begin();
220 if ((**thread_iter).timed_join(boost::posix_time::milliseconds(600)))
227 ROS_WARN(
"Thread required interrupt() to exit.");
228 (**thread_iter).interrupt();
#define RX_INITIAL_LENGTH
void publish(const boost::shared_ptr< M > &message) const
Subscriber subscribe(const std::string &topic, uint32_t queue_size, void(T::*fp)(M), T *obj, const TransportHints &transport_hints=TransportHints())
static void _thread_func(ros::NodeHandle &n, int fd, std::string frame_id, uint32_t byte_time_ns=0)
void rx_thread_start(ros::NodeHandle &n, int fd, std::string frame_id, uint32_t byte_time_ns)
void tx_msg_callback(const nmea_msgs::SentenceConstPtr sentence_msg_ptr, int fd)
#define RX_SUCCESSIVE_LENGTH
Publisher advertise(const std::string &topic, uint32_t queue_size, bool latch=false)
static int threads_active
#define ROS_DEBUG_STREAM(args)
#define ROS_DEBUG_COND(cond,...)
static std::list< boost::thread * > rx_threads
#define ROS_INFO_STREAM(args)
ROSCPP_DECL void shutdown()
static void _handle_sentence(ros::Publisher &publisher, ros::Time &stamp, const char *sentence, const char *frame_id)