session.h
Go to the documentation of this file.
1 
35 #ifndef ROSSERIAL_SERVER_SESSION_H
36 #define ROSSERIAL_SERVER_SESSION_H
37 
38 #include <map>
39 #include <boost/bind.hpp>
40 #include <boost/asio.hpp>
41 #include <boost/function.hpp>
42 
43 #include <ros/callback_queue.h>
44 #include <ros/ros.h>
45 #include <rosserial_msgs/TopicInfo.h>
46 #include <rosserial_msgs/Log.h>
48 #include <std_msgs/Time.h>
49 
52 
53 namespace rosserial_server
54 {
55 
56 typedef std::vector<uint8_t> Buffer;
58 
59 template<typename Socket>
60 class Session : boost::noncopyable
61 {
62 public:
63  Session(boost::asio::io_service& io_service)
64  : socket_(io_service),
65  sync_timer_(io_service),
66  require_check_timer_(io_service),
67  ros_spin_timer_(io_service),
69  boost::bind(&Session::read_failed, this,
70  boost::asio::placeholders::error))
71  {
72  active_ = false;
73 
74  timeout_interval_ = boost::posix_time::milliseconds(5000);
75  attempt_interval_ = boost::posix_time::milliseconds(1000);
76  require_check_interval_ = boost::posix_time::milliseconds(1000);
77  ros_spin_interval_ = boost::posix_time::milliseconds(10);
78  require_param_name_ = "~require";
79 
81 
82  // Intermittent callback to service ROS callbacks. To avoid polling like this,
83  // CallbackQueue could in the future be extended with a scheme to monitor for
84  // callbacks on another thread, and then queue them up to be executed on this one.
85  ros_spin_timer_.expires_from_now(ros_spin_interval_);
86  ros_spin_timer_.async_wait(boost::bind(&Session::ros_spin_timeout, this,
87  boost::asio::placeholders::error));
88  }
89 
90  Socket& socket()
91  {
92  return socket_;
93  }
94 
95  void start()
96  {
97  ROS_DEBUG("Starting session.");
98 
99  callbacks_[rosserial_msgs::TopicInfo::ID_PUBLISHER]
100  = boost::bind(&Session::setup_publisher, this, _1);
101  callbacks_[rosserial_msgs::TopicInfo::ID_SUBSCRIBER]
102  = boost::bind(&Session::setup_subscriber, this, _1);
103  callbacks_[rosserial_msgs::TopicInfo::ID_SERVICE_CLIENT+rosserial_msgs::TopicInfo::ID_PUBLISHER]
104  = boost::bind(&Session::setup_service_client_publisher, this, _1);
105  callbacks_[rosserial_msgs::TopicInfo::ID_SERVICE_CLIENT+rosserial_msgs::TopicInfo::ID_SUBSCRIBER]
106  = boost::bind(&Session::setup_service_client_subscriber, this, _1);
107  callbacks_[rosserial_msgs::TopicInfo::ID_LOG]
108  = boost::bind(&Session::handle_log, this, _1);
109  callbacks_[rosserial_msgs::TopicInfo::ID_TIME]
110  = boost::bind(&Session::handle_time, this, _1);
111 
112  active_ = true;
113  attempt_sync();
115  }
116 
117  void stop()
118  {
119  // Abort any pending ROS callbacks.
121 
122  // Abort active session timer callbacks, if present.
123  sync_timer_.cancel();
124  require_check_timer_.cancel();
125 
126  // Reset the state of the session, dropping any publishers or subscribers
127  // we currently know about from this client.
128  callbacks_.clear();
129  subscribers_.clear();
130  publishers_.clear();
131  services_.clear();
132 
133  // Close the socket.
134  socket_.close();
135  active_ = false;
136  }
137 
138  bool is_active()
139  {
140  return active_;
141  }
142 
149  void set_require_param(std::string param_name)
150  {
151  require_param_name_ = param_name;
152  }
153 
154 private:
159  void ros_spin_timeout(const boost::system::error_code& error) {
161 
162  if (ros::ok())
163  {
164  // Call again next interval.
165  ros_spin_timer_.expires_from_now(ros_spin_interval_);
166  ros_spin_timer_.async_wait(boost::bind(&Session::ros_spin_timeout, this,
167  boost::asio::placeholders::error));
168  }
169  }
170 
172  // TODO: Total message timeout, implement primarily in ReadBuffer.
173 
175  async_read_buffer_.read(1, boost::bind(&Session::read_sync_first, this, _1));
176  }
177 
179  uint8_t sync;
180  stream >> sync;
181  if (sync == 0xff) {
182  async_read_buffer_.read(1, boost::bind(&Session::read_sync_second, this, _1));
183  } else {
185  }
186  }
187 
189  uint8_t sync;
190  stream >> sync;
191  if (sync == 0xfe) {
192  async_read_buffer_.read(5, boost::bind(&Session::read_id_length, this, _1));
193  } else {
195  }
196  }
197 
199  uint16_t topic_id, length;
200  uint8_t length_checksum;
201 
202  // Check header checksum byte for length field.
203  stream >> length >> length_checksum;
204  if (length_checksum + checksum(length) != 0xff) {
205  uint8_t csl = checksum(length);
206  ROS_WARN("Bad message header length checksum. Dropping message from client. T%d L%d C%d %d", topic_id, length, length_checksum, csl);
208  return;
209  } else {
210  stream >> topic_id;
211  }
212  ROS_DEBUG("Received message header with length %d and topic_id=%d", length, topic_id);
213 
214  // Read message length + checksum byte.
215  async_read_buffer_.read(length + 1, boost::bind(&Session::read_body, this,
216  _1, topic_id));
217  }
218 
219  void read_body(ros::serialization::IStream& stream, uint16_t topic_id) {
220  ROS_DEBUG("Received body of length %d for message on topic %d.", stream.getLength(), topic_id);
221 
222  ros::serialization::IStream checksum_stream(stream.getData(), stream.getLength());
223  uint8_t msg_checksum = checksum(checksum_stream) + checksum(topic_id);
224 
225  if (msg_checksum != 0xff) {
226  ROS_WARN("Rejecting message on topicId=%d, length=%d with bad checksum.", topic_id, stream.getLength());
227  } else {
228  if (callbacks_.count(topic_id) == 1) {
229  try {
230  callbacks_[topic_id](stream);
232  if (topic_id < 100) {
233  ROS_ERROR("Buffer overrun when attempting to parse setup message.");
234  ROS_ERROR_ONCE("Is this firmware from a pre-Groovy rosserial?");
235  } else {
236  ROS_WARN("Buffer overrun when attempting to parse user message.");
237  }
238  }
239  } else {
240  ROS_WARN("Received message with unrecognized topicId (%d).", topic_id);
241  // TODO: Resynchronize on multiples?
242  }
243  }
244 
245  // Kickoff next message read.
247  }
248 
249  void read_failed(const boost::system::error_code& error) {
250  if (error == boost::system::errc::no_buffer_space) {
251  // No worries. Begin syncing on a new message.
252  ROS_WARN("Overrun on receive buffer. Attempting to regain rx sync.");
254  } else if (error) {
255  // When some other read error has occurred, stop the session, which destroys
256  // all known publishers and subscribers.
257  ROS_WARN_STREAM("Socket asio error, closing socket: " << error);
258  stop();
259  }
260  }
261 
263 
264  void write_message(Buffer& message, const uint16_t topic_id) {
265  uint8_t overhead_bytes = 8;
266  uint16_t length = overhead_bytes + message.size();
267  BufferPtr buffer_ptr(new Buffer(length));
268 
269  uint8_t msg_checksum;
270  ros::serialization::IStream checksum_stream(message.size() > 0 ? &message[0] : NULL, message.size());
271 
272  ros::serialization::OStream stream(&buffer_ptr->at(0), buffer_ptr->size());
273  uint8_t msg_len_checksum = 255 - checksum(message.size());
274  stream << (uint16_t)0xfeff << (uint16_t)message.size() << msg_len_checksum << topic_id;
275  msg_checksum = 255 - (checksum(checksum_stream) + checksum(topic_id));
276 
277  memcpy(stream.advance(message.size()), &message[0], message.size());
278  stream << msg_checksum;
279 
280  ROS_DEBUG_NAMED("async_write", "Sending buffer of %d bytes to client.", length);
281  boost::asio::async_write(socket_, boost::asio::buffer(*buffer_ptr),
282  boost::bind(&Session::write_completion_cb, this, boost::asio::placeholders::error, buffer_ptr));
283  }
284 
285  void write_completion_cb(const boost::system::error_code& error,
286  BufferPtr buffer_ptr) {
287  if (error) {
288  if (error == boost::system::errc::io_error) {
289  ROS_WARN_THROTTLE(1, "Socket write operation returned IO error.");
290  } else if (error == boost::system::errc::no_such_device) {
291  ROS_WARN_THROTTLE(1, "Socket write operation returned no device.");
292  } else {
293  ROS_WARN_STREAM_THROTTLE(1, "Unknown error returned during write operation: " << error);
294  }
295  stop();
296  }
297  // Buffer is destructed when this function exits and buffer_ptr goes out of scope.
298  }
299 
301  void attempt_sync() {
302  request_topics();
304  }
305 
306  void set_sync_timeout(const boost::posix_time::time_duration& interval) {
307  if (ros::ok())
308  {
309  sync_timer_.cancel();
310  sync_timer_.expires_from_now(interval);
311  sync_timer_.async_wait(boost::bind(&Session::sync_timeout, this,
312  boost::asio::placeholders::error));
313  }
314  }
315 
316  void sync_timeout(const boost::system::error_code& error) {
317  if (error != boost::asio::error::operation_aborted) {
318  ROS_DEBUG("Sync with device lost.");
319  stop();
320  }
321  }
322 
324  void request_topics() {
325  std::vector<uint8_t> message(0);
326  ROS_DEBUG("Sending request topics message for VER2 protocol.");
327  write_message(message, rosserial_msgs::TopicInfo::ID_PUBLISHER);
328 
329  // Set timer for future point at which to verify the subscribers and publishers
330  // created by the client against the expected set given in the parameters.
332  require_check_timer_.async_wait(boost::bind(&Session::required_topics_check, this,
333  boost::asio::placeholders::error));
334  }
335 
336  void required_topics_check(const boost::system::error_code& error) {
337  if (error != boost::asio::error::operation_aborted) {
339  if (!check_set(require_param_name_ + "/publishers", publishers_) ||
340  !check_set(require_param_name_ + "/subscribers", subscribers_)) {
341  ROS_WARN("Connected client failed to establish the publishers and subscribers dictated by require parameter. Re-requesting topics.");
342  request_topics();
343  }
344  }
345  }
346  }
347 
348  template<typename M>
349  bool check_set(std::string param_name, M map) {
350  XmlRpc::XmlRpcValue param_list;
351  ros::param::get(param_name, param_list);
353  for (int i = 0; i < param_list.size(); ++i) {
354  ROS_ASSERT(param_list[i].getType() == XmlRpc::XmlRpcValue::TypeString);
355  std::string required_topic((std::string(param_list[i])));
356  // Iterate through map of registered topics, to ensure that this one is present.
357  bool found = false;
358  for (typename M::iterator j = map.begin(); j != map.end(); ++j) {
359  if (nh_.resolveName(j->second->get_topic()) ==
360  nh_.resolveName(required_topic)) {
361  found = true;
362  ROS_INFO_STREAM("Verified connection to topic " << required_topic << ", given in parameter " << param_name);
363  break;
364  }
365  }
366  if (!found) {
367  ROS_WARN_STREAM("Missing connection to topic " << required_topic << ", required by parameter " << param_name);
368  return false;
369  }
370  }
371  return true;
372  }
373 
374  static uint8_t checksum(ros::serialization::IStream& stream) {
375  uint8_t sum = 0;
376  for (uint16_t i = 0; i < stream.getLength(); ++i) {
377  sum += stream.getData()[i];
378  }
379  return sum;
380  }
381 
382  static uint8_t checksum(uint16_t val) {
383  return (val >> 8) + val;
384  }
385 
387 
389  rosserial_msgs::TopicInfo topic_info;
391 
392  PublisherPtr pub(new Publisher(nh_, topic_info));
393  callbacks_[topic_info.topic_id] = boost::bind(&Publisher::handle, pub, _1);
394  publishers_[topic_info.topic_id] = pub;
395 
397  }
398 
400  rosserial_msgs::TopicInfo topic_info;
402 
403  SubscriberPtr sub(new Subscriber(nh_, topic_info,
404  boost::bind(&Session::write_message, this, _1, topic_info.topic_id)));
405  subscribers_[topic_info.topic_id] = sub;
406 
408  }
409 
410  // When the rosserial client creates a ServiceClient object (and/or when it registers that object with the NodeHandle)
411  // it creates a publisher (to publish the service request message to us) and a subscriber (to receive the response)
412  // the service client callback is attached to the *subscriber*, so when we receive the service response
413  // and wish to send it over the socket to the client,
414  // we must attach the topicId that came from the service client subscriber message
415 
417  rosserial_msgs::TopicInfo topic_info;
419 
420  if (!services_.count(topic_info.topic_name)) {
421  ROS_DEBUG("Creating service client for topic %s",topic_info.topic_name.c_str());
423  nh_,topic_info,boost::bind(&Session::write_message, this, _1, _2)));
424  services_[topic_info.topic_name] = srv;
425  callbacks_[topic_info.topic_id] = boost::bind(&ServiceClient::handle, srv, _1);
426  }
427  if (services_[topic_info.topic_name]->getRequestMessageMD5() != topic_info.md5sum) {
428  ROS_WARN("Service client setup: Request message MD5 mismatch between rosserial client and ROS");
429  } else {
430  ROS_DEBUG("Service client %s: request message MD5 successfully validated as %s",
431  topic_info.topic_name.c_str(),topic_info.md5sum.c_str());
432  }
434  }
435 
437  rosserial_msgs::TopicInfo topic_info;
439 
440  if (!services_.count(topic_info.topic_name)) {
441  ROS_DEBUG("Creating service client for topic %s",topic_info.topic_name.c_str());
443  nh_,topic_info,boost::bind(&Session::write_message, this, _1, _2)));
444  services_[topic_info.topic_name] = srv;
445  callbacks_[topic_info.topic_id] = boost::bind(&ServiceClient::handle, srv, _1);
446  }
447  // see above comment regarding the service client callback for why we set topic_id here
448  services_[topic_info.topic_name]->setTopicId(topic_info.topic_id);
449  if (services_[topic_info.topic_name]->getResponseMessageMD5() != topic_info.md5sum) {
450  ROS_WARN("Service client setup: Response message MD5 mismatch between rosserial client and ROS");
451  } else {
452  ROS_DEBUG("Service client %s: response message MD5 successfully validated as %s",
453  topic_info.topic_name.c_str(),topic_info.md5sum.c_str());
454  }
456  }
457 
459  rosserial_msgs::Log l;
461  if(l.level == rosserial_msgs::Log::ROSDEBUG) ROS_DEBUG("%s", l.msg.c_str());
462  else if(l.level == rosserial_msgs::Log::INFO) ROS_INFO("%s", l.msg.c_str());
463  else if(l.level == rosserial_msgs::Log::WARN) ROS_WARN("%s", l.msg.c_str());
464  else if(l.level == rosserial_msgs::Log::ERROR) ROS_ERROR("%s", l.msg.c_str());
465  else if(l.level == rosserial_msgs::Log::FATAL) ROS_FATAL("%s", l.msg.c_str());
466  }
467 
469  std_msgs::Time time;
470  time.data = ros::Time::now();
471 
472  size_t length = ros::serialization::serializationLength(time);
473  std::vector<uint8_t> message(length);
474 
475  ros::serialization::OStream ostream(&message[0], length);
477 
478  write_message(message, rosserial_msgs::TopicInfo::ID_TIME);
479 
480  // The MCU requesting the time from the server is the sync notification. This
481  // call moves the timeout forward.
483  }
484 
485  Socket socket_;
487  enum { buffer_max = 1023 };
488  bool active_;
489 
492 
493  boost::posix_time::time_duration timeout_interval_;
494  boost::posix_time::time_duration attempt_interval_;
495  boost::posix_time::time_duration require_check_interval_;
496  boost::posix_time::time_duration ros_spin_interval_;
497  boost::asio::deadline_timer sync_timer_;
498  boost::asio::deadline_timer require_check_timer_;
499  boost::asio::deadline_timer ros_spin_timer_;
500  std::string require_param_name_;
501 
502  std::map<uint16_t, boost::function<void(ros::serialization::IStream&)> > callbacks_;
503  std::map<uint16_t, PublisherPtr> publishers_;
504  std::map<uint16_t, SubscriberPtr> subscribers_;
505  std::map<std::string, ServiceClientPtr> services_;
506 };
507 
508 } // namespace
509 
510 #endif // ROSSERIAL_SERVER_SESSION_H
static void write(Stream &stream, typename boost::call_traits< T >::param_type t)
boost::asio::deadline_timer require_check_timer_
Definition: session.h:498
void set_require_param(std::string param_name)
Definition: session.h:149
void setup_subscriber(ros::serialization::IStream &stream)
Definition: session.h:399
#define ROS_FATAL(...)
#define ROS_WARN_STREAM_THROTTLE(period, args)
void write_message(Buffer &message, const uint16_t topic_id)
Definition: session.h:264
void read_body(ros::serialization::IStream &stream, uint16_t topic_id)
Definition: session.h:219
boost::asio::deadline_timer ros_spin_timer_
Definition: session.h:499
void ros_spin_timeout(const boost::system::error_code &error)
Definition: session.h:159
Helper object for successive reads from a ReadStream.
static uint8_t checksum(uint16_t val)
Definition: session.h:382
void read_sync_second(ros::serialization::IStream &stream)
Definition: session.h:188
int size() const
std::string resolveName(const std::string &name, bool remap=true) const
std::vector< uint8_t > Buffer
Definition: session.h:56
bool check_set(std::string param_name, M map)
Definition: session.h:349
void set_sync_timeout(const boost::posix_time::time_duration &interval)
Definition: session.h:306
std::map< uint16_t, boost::function< void(ros::serialization::IStream &)> > callbacks_
Definition: session.h:502
static uint8_t checksum(ros::serialization::IStream &stream)
Definition: session.h:374
boost::posix_time::time_duration require_check_interval_
Definition: session.h:495
Session(boost::asio::io_service &io_service)
Definition: session.h:63
boost::posix_time::time_duration attempt_interval_
Definition: session.h:494
Type const & getType() const
#define ROS_WARN(...)
AsyncReadBuffer< Socket > async_read_buffer_
Definition: session.h:486
void read_failed(const boost::system::error_code &error)
Definition: session.h:249
boost::posix_time::time_duration timeout_interval_
Definition: session.h:493
void setup_publisher(ros::serialization::IStream &stream)
Definition: session.h:388
std::string require_param_name_
Definition: session.h:500
std::map< uint16_t, PublisherPtr > publishers_
Definition: session.h:503
#define ROS_DEBUG_NAMED(name,...)
void handle_time(ros::serialization::IStream &stream)
Definition: session.h:468
boost::asio::deadline_timer sync_timer_
Definition: session.h:497
#define ROS_WARN_THROTTLE(period,...)
ROSCPP_DECL bool get(const std::string &key, std::string &s)
#define ROS_INFO(...)
#define ROS_ERROR_ONCE(...)
void setCallbackQueue(CallbackQueueInterface *queue)
Classes which manage the Publish and Subscribe relationships that a Session has with its client...
ROSCPP_DECL bool ok()
ros::CallbackQueue ros_callback_queue_
Definition: session.h:491
static void read(Stream &stream, typename boost::call_traits< T >::reference t)
void required_topics_check(const boost::system::error_code &error)
Definition: session.h:336
void setup_service_client_publisher(ros::serialization::IStream &stream)
Definition: session.h:416
std::map< uint16_t, SubscriberPtr > subscribers_
Definition: session.h:504
ROSCPP_DECL bool has(const std::string &key)
#define ROS_WARN_STREAM(args)
void read_sync_first(ros::serialization::IStream &stream)
Definition: session.h:178
ros::NodeHandle nh_
Definition: session.h:490
void handle(ros::serialization::IStream stream)
void write_completion_cb(const boost::system::error_code &error, BufferPtr buffer_ptr)
Definition: session.h:285
void sync_timeout(const boost::system::error_code &error)
Definition: session.h:316
uint32_t serializationLength(const T &t)
#define ROS_INFO_STREAM(args)
void read(size_t requested_bytes, boost::function< void(ros::serialization::IStream &)> callback)
Commands a fixed number of bytes from the buffer. This may be fulfilled from existing buffer content...
boost::posix_time::time_duration ros_spin_interval_
Definition: session.h:496
boost::shared_ptr< Buffer > BufferPtr
Definition: session.h:57
void read_id_length(ros::serialization::IStream &stream)
Definition: session.h:198
static Time now()
#define ROS_ASSERT(cond)
#define ROS_ERROR(...)
void handle_log(ros::serialization::IStream &stream)
Definition: session.h:458
void handle(ros::serialization::IStream stream)
void setup_service_client_subscriber(ros::serialization::IStream &stream)
Definition: session.h:436
std::map< std::string, ServiceClientPtr > services_
Definition: session.h:505
#define ROS_DEBUG(...)


rosserial_server
Author(s): Mike Purvis
autogenerated on Fri Jun 7 2019 22:02:58