session.h
Go to the documentation of this file.
1 
35 #ifndef ROSSERIAL_SERVER_SESSION_H
36 #define ROSSERIAL_SERVER_SESSION_H
37 
38 #include <map>
39 #include <boost/bind.hpp>
40 #include <boost/asio.hpp>
41 #include <boost/function.hpp>
42 
43 #include <ros/callback_queue.h>
44 #include <ros/ros.h>
45 #include <rosserial_msgs/TopicInfo.h>
46 #include <rosserial_msgs/Log.h>
48 #include <std_msgs/Time.h>
49 
52 
53 namespace rosserial_server
54 {
55 
56 typedef std::vector<uint8_t> Buffer;
58 
59 template<typename Socket>
60 class Session : boost::noncopyable
61 {
62 public:
63  Session(boost::asio::io_service& io_service)
64  : io_service_(io_service),
65  socket_(io_service),
66  sync_timer_(io_service),
67  require_check_timer_(io_service),
68  ros_spin_timer_(io_service),
70  boost::bind(&Session::read_failed, this,
71  boost::asio::placeholders::error))
72  {
73  active_ = false;
74 
75  timeout_interval_ = boost::posix_time::milliseconds(5000);
76  attempt_interval_ = boost::posix_time::milliseconds(1000);
77  require_check_interval_ = boost::posix_time::milliseconds(1000);
78  ros_spin_interval_ = boost::posix_time::milliseconds(10);
79  require_param_name_ = "~require";
80 
82 
83  // Intermittent callback to service ROS callbacks. To avoid polling like this,
84  // CallbackQueue could in the future be extended with a scheme to monitor for
85  // callbacks on another thread, and then queue them up to be executed on this one.
86  ros_spin_timer_.expires_from_now(ros_spin_interval_);
87  ros_spin_timer_.async_wait(boost::bind(&Session::ros_spin_timeout, this,
88  boost::asio::placeholders::error));
89  }
90 
91  Socket& socket()
92  {
93  return socket_;
94  }
95 
96  void start()
97  {
98  ROS_DEBUG("Starting session.");
99 
100  callbacks_[rosserial_msgs::TopicInfo::ID_PUBLISHER]
101  = boost::bind(&Session::setup_publisher, this, _1);
102  callbacks_[rosserial_msgs::TopicInfo::ID_SUBSCRIBER]
103  = boost::bind(&Session::setup_subscriber, this, _1);
104  callbacks_[rosserial_msgs::TopicInfo::ID_SERVICE_CLIENT+rosserial_msgs::TopicInfo::ID_PUBLISHER]
105  = boost::bind(&Session::setup_service_client_publisher, this, _1);
106  callbacks_[rosserial_msgs::TopicInfo::ID_SERVICE_CLIENT+rosserial_msgs::TopicInfo::ID_SUBSCRIBER]
107  = boost::bind(&Session::setup_service_client_subscriber, this, _1);
108  callbacks_[rosserial_msgs::TopicInfo::ID_LOG]
109  = boost::bind(&Session::handle_log, this, _1);
110  callbacks_[rosserial_msgs::TopicInfo::ID_TIME]
111  = boost::bind(&Session::handle_time, this, _1);
112 
113  active_ = true;
114  attempt_sync();
116  }
117 
118  void stop()
119  {
120  // Abort any pending ROS callbacks.
122 
123  // Abort active session timer callbacks, if present.
124  sync_timer_.cancel();
125  require_check_timer_.cancel();
126 
127  // Reset the state of the session, dropping any publishers or subscribers
128  // we currently know about from this client.
129  callbacks_.clear();
130  subscribers_.clear();
131  publishers_.clear();
132  services_.clear();
133 
134  // Close the socket.
135  socket_.close();
136  active_ = false;
137  }
138 
139  void shutdown()
140  {
141  if (is_active())
142  {
143  stop();
144  }
145  io_service_.stop();
146  }
147 
148  bool is_active()
149  {
150  return active_;
151  }
152 
159  void set_require_param(std::string param_name)
160  {
161  require_param_name_ = param_name;
162  }
163 
164 private:
169  void ros_spin_timeout(const boost::system::error_code& error) {
171 
172  if (ros::ok())
173  {
174  // Call again next interval.
175  ros_spin_timer_.expires_from_now(ros_spin_interval_);
176  ros_spin_timer_.async_wait(boost::bind(&Session::ros_spin_timeout, this,
177  boost::asio::placeholders::error));
178  }
179  else
180  {
181  shutdown();
182  }
183  }
184 
186  // TODO: Total message timeout, implement primarily in ReadBuffer.
187 
189  async_read_buffer_.read(1, boost::bind(&Session::read_sync_first, this, _1));
190  }
191 
193  uint8_t sync;
194  stream >> sync;
195  if (sync == 0xff) {
196  async_read_buffer_.read(1, boost::bind(&Session::read_sync_second, this, _1));
197  } else {
199  }
200  }
201 
203  uint8_t sync;
204  stream >> sync;
205  if (sync == 0xfe) {
206  async_read_buffer_.read(5, boost::bind(&Session::read_id_length, this, _1));
207  } else {
209  }
210  }
211 
213  uint16_t topic_id, length;
214  uint8_t length_checksum;
215 
216  // Check header checksum byte for length field.
217  stream >> length >> length_checksum;
218  if (length_checksum + checksum(length) != 0xff) {
219  uint8_t csl = checksum(length);
220  ROS_WARN("Bad message header length checksum. Dropping message from client. T%d L%d C%d %d", topic_id, length, length_checksum, csl);
222  return;
223  } else {
224  stream >> topic_id;
225  }
226  ROS_DEBUG("Received message header with length %d and topic_id=%d", length, topic_id);
227 
228  // Read message length + checksum byte.
229  async_read_buffer_.read(length + 1, boost::bind(&Session::read_body, this,
230  _1, topic_id));
231  }
232 
233  void read_body(ros::serialization::IStream& stream, uint16_t topic_id) {
234  ROS_DEBUG("Received body of length %d for message on topic %d.", stream.getLength(), topic_id);
235 
236  ros::serialization::IStream checksum_stream(stream.getData(), stream.getLength());
237  uint8_t msg_checksum = checksum(checksum_stream) + checksum(topic_id);
238 
239  if (msg_checksum != 0xff) {
240  ROS_WARN("Rejecting message on topicId=%d, length=%d with bad checksum.", topic_id, stream.getLength());
241  } else {
242  if (callbacks_.count(topic_id) == 1) {
243  try {
244  callbacks_[topic_id](stream);
246  if (topic_id < 100) {
247  ROS_ERROR("Buffer overrun when attempting to parse setup message.");
248  ROS_ERROR_ONCE("Is this firmware from a pre-Groovy rosserial?");
249  } else {
250  ROS_WARN("Buffer overrun when attempting to parse user message.");
251  }
252  }
253  } else {
254  ROS_WARN("Received message with unrecognized topicId (%d).", topic_id);
255  // TODO: Resynchronize on multiples?
256  }
257  }
258 
259  // Kickoff next message read.
261  }
262 
263  void read_failed(const boost::system::error_code& error) {
264  if (error == boost::system::errc::no_buffer_space) {
265  // No worries. Begin syncing on a new message.
266  ROS_WARN("Overrun on receive buffer. Attempting to regain rx sync.");
268  } else if (error) {
269  // When some other read error has occurred, stop the session, which destroys
270  // all known publishers and subscribers.
271  ROS_WARN_STREAM("Socket asio error, closing socket: " << error);
272  stop();
273  }
274  }
275 
277 
278  void write_message(Buffer& message, const uint16_t topic_id) {
279  uint8_t overhead_bytes = 8;
280  uint16_t length = overhead_bytes + message.size();
281  BufferPtr buffer_ptr(new Buffer(length));
282 
283  uint8_t msg_checksum;
284  ros::serialization::IStream checksum_stream(message.size() > 0 ? &message[0] : NULL, message.size());
285 
286  ros::serialization::OStream stream(&buffer_ptr->at(0), buffer_ptr->size());
287  uint8_t msg_len_checksum = 255 - checksum(message.size());
288  stream << (uint16_t)0xfeff << (uint16_t)message.size() << msg_len_checksum << topic_id;
289  msg_checksum = 255 - (checksum(checksum_stream) + checksum(topic_id));
290 
291  memcpy(stream.advance(message.size()), &message[0], message.size());
292  stream << msg_checksum;
293 
294  ROS_DEBUG_NAMED("async_write", "Sending buffer of %d bytes to client.", length);
295  boost::asio::async_write(socket_, boost::asio::buffer(*buffer_ptr),
296  boost::bind(&Session::write_completion_cb, this, boost::asio::placeholders::error, buffer_ptr));
297  }
298 
299  void write_completion_cb(const boost::system::error_code& error,
300  BufferPtr buffer_ptr) {
301  if (error) {
302  if (error == boost::system::errc::io_error) {
303  ROS_WARN_THROTTLE(1, "Socket write operation returned IO error.");
304  } else if (error == boost::system::errc::no_such_device) {
305  ROS_WARN_THROTTLE(1, "Socket write operation returned no device.");
306  } else {
307  ROS_WARN_STREAM_THROTTLE(1, "Unknown error returned during write operation: " << error);
308  }
309  stop();
310  }
311  // Buffer is destructed when this function exits and buffer_ptr goes out of scope.
312  }
313 
315  void attempt_sync() {
316  request_topics();
318  }
319 
320  void set_sync_timeout(const boost::posix_time::time_duration& interval) {
321  if (ros::ok())
322  {
323  sync_timer_.cancel();
324  sync_timer_.expires_from_now(interval);
325  sync_timer_.async_wait(boost::bind(&Session::sync_timeout, this,
326  boost::asio::placeholders::error));
327  }
328  else
329  {
330  shutdown();
331  }
332  }
333 
334  void sync_timeout(const boost::system::error_code& error) {
335  if (error != boost::asio::error::operation_aborted) {
336  ROS_DEBUG("Sync with device lost.");
337  stop();
338  }
339  }
340 
342  void request_topics() {
343  std::vector<uint8_t> message(0);
344  ROS_DEBUG("Sending request topics message for VER2 protocol.");
345  write_message(message, rosserial_msgs::TopicInfo::ID_PUBLISHER);
346 
347  // Set timer for future point at which to verify the subscribers and publishers
348  // created by the client against the expected set given in the parameters.
350  require_check_timer_.async_wait(boost::bind(&Session::required_topics_check, this,
351  boost::asio::placeholders::error));
352  }
353 
354  void required_topics_check(const boost::system::error_code& error) {
355  if (error != boost::asio::error::operation_aborted) {
357  if (!check_set(require_param_name_ + "/publishers", publishers_) ||
358  !check_set(require_param_name_ + "/subscribers", subscribers_)) {
359  ROS_WARN("Connected client failed to establish the publishers and subscribers dictated by require parameter. Re-requesting topics.");
360  request_topics();
361  }
362  }
363  }
364  }
365 
366  template<typename M>
367  bool check_set(std::string param_name, M map) {
368  XmlRpc::XmlRpcValue param_list;
369  ros::param::get(param_name, param_list);
371  for (int i = 0; i < param_list.size(); ++i) {
372  ROS_ASSERT(param_list[i].getType() == XmlRpc::XmlRpcValue::TypeString);
373  std::string required_topic((std::string(param_list[i])));
374  // Iterate through map of registered topics, to ensure that this one is present.
375  bool found = false;
376  for (typename M::iterator j = map.begin(); j != map.end(); ++j) {
377  if (nh_.resolveName(j->second->get_topic()) ==
378  nh_.resolveName(required_topic)) {
379  found = true;
380  ROS_INFO_STREAM("Verified connection to topic " << required_topic << ", given in parameter " << param_name);
381  break;
382  }
383  }
384  if (!found) {
385  ROS_WARN_STREAM("Missing connection to topic " << required_topic << ", required by parameter " << param_name);
386  return false;
387  }
388  }
389  return true;
390  }
391 
392  static uint8_t checksum(ros::serialization::IStream& stream) {
393  uint8_t sum = 0;
394  for (uint16_t i = 0; i < stream.getLength(); ++i) {
395  sum += stream.getData()[i];
396  }
397  return sum;
398  }
399 
400  static uint8_t checksum(uint16_t val) {
401  return (val >> 8) + val;
402  }
403 
405 
407  rosserial_msgs::TopicInfo topic_info;
409 
410  PublisherPtr pub(new Publisher(nh_, topic_info));
411  callbacks_[topic_info.topic_id] = boost::bind(&Publisher::handle, pub, _1);
412  publishers_[topic_info.topic_id] = pub;
413 
415  }
416 
418  rosserial_msgs::TopicInfo topic_info;
420 
421  SubscriberPtr sub(new Subscriber(nh_, topic_info,
422  boost::bind(&Session::write_message, this, _1, topic_info.topic_id)));
423  subscribers_[topic_info.topic_id] = sub;
424 
426  }
427 
428  // When the rosserial client creates a ServiceClient object (and/or when it registers that object with the NodeHandle)
429  // it creates a publisher (to publish the service request message to us) and a subscriber (to receive the response)
430  // the service client callback is attached to the *subscriber*, so when we receive the service response
431  // and wish to send it over the socket to the client,
432  // we must attach the topicId that came from the service client subscriber message
433 
435  rosserial_msgs::TopicInfo topic_info;
437 
438  if (!services_.count(topic_info.topic_name)) {
439  ROS_DEBUG("Creating service client for topic %s",topic_info.topic_name.c_str());
441  nh_,topic_info,boost::bind(&Session::write_message, this, _1, _2)));
442  services_[topic_info.topic_name] = srv;
443  callbacks_[topic_info.topic_id] = boost::bind(&ServiceClient::handle, srv, _1);
444  }
445  if (services_[topic_info.topic_name]->getRequestMessageMD5() != topic_info.md5sum) {
446  ROS_WARN("Service client setup: Request message MD5 mismatch between rosserial client and ROS");
447  } else {
448  ROS_DEBUG("Service client %s: request message MD5 successfully validated as %s",
449  topic_info.topic_name.c_str(),topic_info.md5sum.c_str());
450  }
452  }
453 
455  rosserial_msgs::TopicInfo topic_info;
457 
458  if (!services_.count(topic_info.topic_name)) {
459  ROS_DEBUG("Creating service client for topic %s",topic_info.topic_name.c_str());
461  nh_,topic_info,boost::bind(&Session::write_message, this, _1, _2)));
462  services_[topic_info.topic_name] = srv;
463  callbacks_[topic_info.topic_id] = boost::bind(&ServiceClient::handle, srv, _1);
464  }
465  // see above comment regarding the service client callback for why we set topic_id here
466  services_[topic_info.topic_name]->setTopicId(topic_info.topic_id);
467  if (services_[topic_info.topic_name]->getResponseMessageMD5() != topic_info.md5sum) {
468  ROS_WARN("Service client setup: Response message MD5 mismatch between rosserial client and ROS");
469  } else {
470  ROS_DEBUG("Service client %s: response message MD5 successfully validated as %s",
471  topic_info.topic_name.c_str(),topic_info.md5sum.c_str());
472  }
474  }
475 
477  rosserial_msgs::Log l;
479  if(l.level == rosserial_msgs::Log::ROSDEBUG) ROS_DEBUG("%s", l.msg.c_str());
480  else if(l.level == rosserial_msgs::Log::INFO) ROS_INFO("%s", l.msg.c_str());
481  else if(l.level == rosserial_msgs::Log::WARN) ROS_WARN("%s", l.msg.c_str());
482  else if(l.level == rosserial_msgs::Log::ERROR) ROS_ERROR("%s", l.msg.c_str());
483  else if(l.level == rosserial_msgs::Log::FATAL) ROS_FATAL("%s", l.msg.c_str());
484  }
485 
487  std_msgs::Time time;
488  time.data = ros::Time::now();
489 
490  size_t length = ros::serialization::serializationLength(time);
491  std::vector<uint8_t> message(length);
492 
493  ros::serialization::OStream ostream(&message[0], length);
495 
496  write_message(message, rosserial_msgs::TopicInfo::ID_TIME);
497 
498  // The MCU requesting the time from the server is the sync notification. This
499  // call moves the timeout forward.
501  }
502 
503  boost::asio::io_service& io_service_;
504  Socket socket_;
506  enum { buffer_max = 1023 };
507  bool active_;
508 
511 
512  boost::posix_time::time_duration timeout_interval_;
513  boost::posix_time::time_duration attempt_interval_;
514  boost::posix_time::time_duration require_check_interval_;
515  boost::posix_time::time_duration ros_spin_interval_;
516  boost::asio::deadline_timer sync_timer_;
517  boost::asio::deadline_timer require_check_timer_;
518  boost::asio::deadline_timer ros_spin_timer_;
519  std::string require_param_name_;
520 
521  std::map<uint16_t, boost::function<void(ros::serialization::IStream&)> > callbacks_;
522  std::map<uint16_t, PublisherPtr> publishers_;
523  std::map<uint16_t, SubscriberPtr> subscribers_;
524  std::map<std::string, ServiceClientPtr> services_;
525 };
526 
527 } // namespace
528 
529 #endif // ROSSERIAL_SERVER_SESSION_H
static void write(Stream &stream, typename boost::call_traits< T >::param_type t)
boost::asio::deadline_timer require_check_timer_
Definition: session.h:517
void set_require_param(std::string param_name)
Definition: session.h:159
void setup_subscriber(ros::serialization::IStream &stream)
Definition: session.h:417
#define ROS_FATAL(...)
#define ROS_WARN_STREAM_THROTTLE(period, args)
void write_message(Buffer &message, const uint16_t topic_id)
Definition: session.h:278
void read_body(ros::serialization::IStream &stream, uint16_t topic_id)
Definition: session.h:233
boost::asio::deadline_timer ros_spin_timer_
Definition: session.h:518
void ros_spin_timeout(const boost::system::error_code &error)
Definition: session.h:169
Helper object for successive reads from a ReadStream.
static uint8_t checksum(uint16_t val)
Definition: session.h:400
void read_sync_second(ros::serialization::IStream &stream)
Definition: session.h:202
std::vector< uint8_t > Buffer
Definition: session.h:56
bool check_set(std::string param_name, M map)
Definition: session.h:367
void set_sync_timeout(const boost::posix_time::time_duration &interval)
Definition: session.h:320
std::map< uint16_t, boost::function< void(ros::serialization::IStream &)> > callbacks_
Definition: session.h:521
static uint8_t checksum(ros::serialization::IStream &stream)
Definition: session.h:392
boost::posix_time::time_duration require_check_interval_
Definition: session.h:514
Session(boost::asio::io_service &io_service)
Definition: session.h:63
boost::posix_time::time_duration attempt_interval_
Definition: session.h:513
#define ROS_WARN(...)
AsyncReadBuffer< Socket > async_read_buffer_
Definition: session.h:505
void read_failed(const boost::system::error_code &error)
Definition: session.h:263
boost::posix_time::time_duration timeout_interval_
Definition: session.h:512
boost::asio::io_service & io_service_
Definition: session.h:503
void setup_publisher(ros::serialization::IStream &stream)
Definition: session.h:406
std::string require_param_name_
Definition: session.h:519
std::map< uint16_t, PublisherPtr > publishers_
Definition: session.h:522
#define ROS_DEBUG_NAMED(name,...)
void handle_time(ros::serialization::IStream &stream)
Definition: session.h:486
Type const & getType() const
boost::asio::deadline_timer sync_timer_
Definition: session.h:516
#define ROS_WARN_THROTTLE(period,...)
ROSCPP_DECL bool get(const std::string &key, std::string &s)
#define ROS_INFO(...)
#define ROS_ERROR_ONCE(...)
void setCallbackQueue(CallbackQueueInterface *queue)
Classes which manage the Publish and Subscribe relationships that a Session has with its client...
ROSCPP_DECL bool ok()
ros::CallbackQueue ros_callback_queue_
Definition: session.h:510
static void read(Stream &stream, typename boost::call_traits< T >::reference t)
void required_topics_check(const boost::system::error_code &error)
Definition: session.h:354
std::string resolveName(const std::string &name, bool remap=true) const
void setup_service_client_publisher(ros::serialization::IStream &stream)
Definition: session.h:434
std::map< uint16_t, SubscriberPtr > subscribers_
Definition: session.h:523
ROSCPP_DECL bool has(const std::string &key)
#define ROS_WARN_STREAM(args)
void read_sync_first(ros::serialization::IStream &stream)
Definition: session.h:192
ros::NodeHandle nh_
Definition: session.h:509
void handle(ros::serialization::IStream stream)
void write_completion_cb(const boost::system::error_code &error, BufferPtr buffer_ptr)
Definition: session.h:299
void sync_timeout(const boost::system::error_code &error)
Definition: session.h:334
uint32_t serializationLength(const T &t)
#define ROS_INFO_STREAM(args)
void read(size_t requested_bytes, boost::function< void(ros::serialization::IStream &)> callback)
Commands a fixed number of bytes from the buffer. This may be fulfilled from existing buffer content...
boost::posix_time::time_duration ros_spin_interval_
Definition: session.h:515
boost::shared_ptr< Buffer > BufferPtr
Definition: session.h:57
void read_id_length(ros::serialization::IStream &stream)
Definition: session.h:212
static Time now()
#define ROS_ASSERT(cond)
#define ROS_ERROR(...)
void handle_log(ros::serialization::IStream &stream)
Definition: session.h:476
void handle(ros::serialization::IStream stream)
void setup_service_client_subscriber(ros::serialization::IStream &stream)
Definition: session.h:454
std::map< std::string, ServiceClientPtr > services_
Definition: session.h:524
#define ROS_DEBUG(...)


rosserial_server
Author(s): Mike Purvis
autogenerated on Mon Feb 28 2022 23:35:31