session.h
Go to the documentation of this file.
00001 
00035 #ifndef ROSSERIAL_SERVER_SESSION_H
00036 #define ROSSERIAL_SERVER_SESSION_H
00037 
00038 #include <map>
00039 #include <boost/bind.hpp>
00040 #include <boost/asio.hpp>
00041 #include <boost/function.hpp>
00042 
00043 #include <ros/callback_queue.h>
00044 #include <ros/ros.h>
00045 #include <rosserial_msgs/TopicInfo.h>
00046 #include <rosserial_msgs/Log.h>
00047 #include <topic_tools/shape_shifter.h>
00048 #include <std_msgs/Time.h>
00049 
00050 #include "rosserial_server/async_read_buffer.h"
00051 #include "rosserial_server/topic_handlers.h"
00052 
00053 namespace rosserial_server
00054 {
00055 
00056 typedef std::vector<uint8_t> Buffer;
00057 typedef boost::shared_ptr<Buffer> BufferPtr;
00058 
00059 template<typename Socket>
00060 class Session : boost::noncopyable
00061 {
00062 public:
00063   Session(boost::asio::io_service& io_service)
00064     : socket_(io_service),
00065       sync_timer_(io_service),
00066       require_check_timer_(io_service),
00067       ros_spin_timer_(io_service),
00068       async_read_buffer_(socket_, buffer_max,
00069                          boost::bind(&Session::read_failed, this,
00070                                      boost::asio::placeholders::error))
00071   {
00072     active_ = false;
00073 
00074     timeout_interval_ = boost::posix_time::milliseconds(5000);
00075     attempt_interval_ = boost::posix_time::milliseconds(1000);
00076     require_check_interval_ = boost::posix_time::milliseconds(1000);
00077     ros_spin_interval_ = boost::posix_time::milliseconds(10);
00078     require_param_name_ = "~require";
00079 
00080     nh_.setCallbackQueue(&ros_callback_queue_);
00081 
00082     // Intermittent callback to service ROS callbacks. To avoid polling like this,
00083     // CallbackQueue could in the future be extended with a scheme to monitor for
00084     // callbacks on another thread, and then queue them up to be executed on this one.
00085     ros_spin_timer_.expires_from_now(ros_spin_interval_);
00086     ros_spin_timer_.async_wait(boost::bind(&Session::ros_spin_timeout, this,
00087                                            boost::asio::placeholders::error));
00088   }
00089 
00090   Socket& socket()
00091   {
00092     return socket_;
00093   }
00094 
00095   void start()
00096   {
00097     ROS_DEBUG("Starting session.");
00098 
00099     callbacks_[rosserial_msgs::TopicInfo::ID_PUBLISHER]
00100         = boost::bind(&Session::setup_publisher, this, _1);
00101     callbacks_[rosserial_msgs::TopicInfo::ID_SUBSCRIBER]
00102         = boost::bind(&Session::setup_subscriber, this, _1);
00103     callbacks_[rosserial_msgs::TopicInfo::ID_SERVICE_CLIENT+rosserial_msgs::TopicInfo::ID_PUBLISHER]
00104         = boost::bind(&Session::setup_service_client_publisher, this, _1);
00105     callbacks_[rosserial_msgs::TopicInfo::ID_SERVICE_CLIENT+rosserial_msgs::TopicInfo::ID_SUBSCRIBER]
00106         = boost::bind(&Session::setup_service_client_subscriber, this, _1);
00107     callbacks_[rosserial_msgs::TopicInfo::ID_LOG]
00108         = boost::bind(&Session::handle_log, this, _1);
00109     callbacks_[rosserial_msgs::TopicInfo::ID_TIME]
00110         = boost::bind(&Session::handle_time, this, _1);
00111 
00112     active_ = true;
00113     attempt_sync();
00114     read_sync_header();
00115   }
00116 
00117   void stop()
00118   {
00119     // Abort any pending ROS callbacks.
00120     ros_callback_queue_.clear();
00121 
00122     // Abort active session timer callbacks, if present.
00123     sync_timer_.cancel();
00124     require_check_timer_.cancel();
00125 
00126     // Reset the state of the session, dropping any publishers or subscribers
00127     // we currently know about from this client.
00128     callbacks_.clear();
00129     subscribers_.clear();
00130     publishers_.clear();
00131     services_.clear();
00132 
00133     // Close the socket.
00134     socket_.close();
00135     active_ = false;
00136   }
00137 
00138   bool is_active()
00139   {
00140     return active_;
00141   }
00142 
00149   void set_require_param(std::string param_name)
00150   {
00151     require_param_name_ = param_name;
00152   }
00153 
00154 private:
00159   void ros_spin_timeout(const boost::system::error_code& error) {
00160     ros_callback_queue_.callAvailable();
00161 
00162     if (ros::ok())
00163     {
00164       // Call again next interval.
00165       ros_spin_timer_.expires_from_now(ros_spin_interval_);
00166       ros_spin_timer_.async_wait(boost::bind(&Session::ros_spin_timeout, this,
00167                                              boost::asio::placeholders::error));
00168     }
00169   }
00170 
00172   // TODO: Total message timeout, implement primarily in ReadBuffer.
00173 
00174   void read_sync_header() {
00175     async_read_buffer_.read(1, boost::bind(&Session::read_sync_first, this, _1));
00176   }
00177 
00178   void read_sync_first(ros::serialization::IStream& stream) {
00179     uint8_t sync;
00180     stream >> sync;
00181     if (sync == 0xff) {
00182       async_read_buffer_.read(1, boost::bind(&Session::read_sync_second, this, _1));
00183     } else {
00184       read_sync_header();
00185     }
00186   }
00187 
00188   void read_sync_second(ros::serialization::IStream& stream) {
00189     uint8_t sync;
00190     stream >> sync;
00191     if (sync == 0xfe) {
00192       async_read_buffer_.read(5, boost::bind(&Session::read_id_length, this, _1));
00193     } else {
00194       read_sync_header();
00195     }
00196   }
00197 
00198   void read_id_length(ros::serialization::IStream& stream) {
00199     uint16_t topic_id, length;
00200     uint8_t length_checksum;
00201 
00202     // Check header checksum byte for length field.
00203     stream >> length >> length_checksum;
00204     if (length_checksum + checksum(length) != 0xff) {
00205       uint8_t csl = checksum(length);
00206       ROS_WARN("Bad message header length checksum. Dropping message from client. T%d L%d C%d %d", topic_id, length, length_checksum, csl);
00207       read_sync_header();
00208       return;
00209     } else {
00210       stream >> topic_id;
00211     }
00212     ROS_DEBUG("Received message header with length %d and topic_id=%d", length, topic_id);
00213 
00214     // Read message length + checksum byte.
00215     async_read_buffer_.read(length + 1, boost::bind(&Session::read_body, this,
00216                                                     _1, topic_id));
00217   }
00218 
00219   void read_body(ros::serialization::IStream& stream, uint16_t topic_id) {
00220     ROS_DEBUG("Received body of length %d for message on topic %d.", stream.getLength(), topic_id);
00221 
00222     ros::serialization::IStream checksum_stream(stream.getData(), stream.getLength());
00223     uint8_t msg_checksum = checksum(checksum_stream) + checksum(topic_id);
00224 
00225     if (msg_checksum != 0xff) {
00226       ROS_WARN("Rejecting message on topicId=%d, length=%d with bad checksum.", topic_id, stream.getLength());
00227     } else {
00228       if (callbacks_.count(topic_id) == 1) {
00229         try {
00230           callbacks_[topic_id](stream);
00231         } catch(ros::serialization::StreamOverrunException e) {
00232           if (topic_id < 100) {
00233             ROS_ERROR("Buffer overrun when attempting to parse setup message.");
00234             ROS_ERROR_ONCE("Is this firmware from a pre-Groovy rosserial?");
00235           } else {
00236             ROS_WARN("Buffer overrun when attempting to parse user message.");
00237           }
00238         }
00239       } else {
00240         ROS_WARN("Received message with unrecognized topicId (%d).", topic_id);
00241         // TODO: Resynchronize on multiples?
00242       }
00243     }
00244 
00245     // Kickoff next message read.
00246     read_sync_header();
00247   }
00248 
00249   void read_failed(const boost::system::error_code& error) {
00250     if (error == boost::system::errc::no_buffer_space) {
00251       // No worries. Begin syncing on a new message.
00252       ROS_WARN("Overrun on receive buffer. Attempting to regain rx sync.");
00253       read_sync_header();
00254     } else if (error) {
00255       // When some other read error has occurred, stop the session, which destroys
00256       // all known publishers and subscribers.
00257       ROS_WARN_STREAM("Socket asio error, closing socket: " << error);
00258       stop();
00259     }
00260   }
00261 
00263 
00264   void write_message(Buffer& message, const uint16_t topic_id) {
00265     uint8_t overhead_bytes = 8;
00266     uint16_t length = overhead_bytes + message.size();
00267     BufferPtr buffer_ptr(new Buffer(length));
00268 
00269     uint8_t msg_checksum;
00270     ros::serialization::IStream checksum_stream(message.size() > 0 ? &message[0] : NULL, message.size());
00271 
00272     ros::serialization::OStream stream(&buffer_ptr->at(0), buffer_ptr->size());
00273     uint8_t msg_len_checksum = 255 - checksum(message.size());
00274     stream << (uint16_t)0xfeff << (uint16_t)message.size() << msg_len_checksum << topic_id;
00275     msg_checksum = 255 - (checksum(checksum_stream) + checksum(topic_id));
00276 
00277     memcpy(stream.advance(message.size()), &message[0], message.size());
00278     stream << msg_checksum;
00279 
00280     ROS_DEBUG_NAMED("async_write", "Sending buffer of %d bytes to client.", length);
00281     boost::asio::async_write(socket_, boost::asio::buffer(*buffer_ptr),
00282           boost::bind(&Session::write_completion_cb, this, boost::asio::placeholders::error, buffer_ptr));
00283   }
00284 
00285   void write_completion_cb(const boost::system::error_code& error,
00286                            BufferPtr buffer_ptr) {
00287     if (error) {
00288       if (error == boost::system::errc::io_error) {
00289         ROS_WARN_THROTTLE(1, "Socket write operation returned IO error.");
00290       } else if (error == boost::system::errc::no_such_device) {
00291         ROS_WARN_THROTTLE(1, "Socket write operation returned no device.");
00292       } else {
00293         ROS_WARN_STREAM_THROTTLE(1, "Unknown error returned during write operation: " << error);
00294       }
00295       stop();
00296     }
00297     // Buffer is destructed when this function exits and buffer_ptr goes out of scope.
00298   }
00299 
00301   void attempt_sync() {
00302     request_topics();
00303     set_sync_timeout(attempt_interval_);
00304   }
00305 
00306   void set_sync_timeout(const boost::posix_time::time_duration& interval) {
00307     if (ros::ok())
00308     {
00309       sync_timer_.cancel();
00310       sync_timer_.expires_from_now(interval);
00311       sync_timer_.async_wait(boost::bind(&Session::sync_timeout, this,
00312             boost::asio::placeholders::error));
00313     }
00314   }
00315 
00316   void sync_timeout(const boost::system::error_code& error) {
00317     if (error != boost::asio::error::operation_aborted) {
00318       ROS_DEBUG("Sync with device lost.");
00319       stop();
00320     }
00321   }
00322 
00324   void request_topics() {
00325     std::vector<uint8_t> message(0);
00326     ROS_DEBUG("Sending request topics message for VER2 protocol.");
00327     write_message(message, rosserial_msgs::TopicInfo::ID_PUBLISHER);
00328 
00329     // Set timer for future point at which to verify the subscribers and publishers
00330     // created by the client against the expected set given in the parameters.
00331     require_check_timer_.expires_from_now(require_check_interval_);
00332     require_check_timer_.async_wait(boost::bind(&Session::required_topics_check, this,
00333           boost::asio::placeholders::error));
00334   }
00335 
00336   void required_topics_check(const boost::system::error_code& error) {
00337     if (error != boost::asio::error::operation_aborted) {
00338       if (ros::param::has(require_param_name_)) {
00339         if (!check_set(require_param_name_ + "/publishers", publishers_) ||
00340             !check_set(require_param_name_ + "/subscribers", subscribers_)) {
00341           ROS_WARN("Connected client failed to establish the publishers and subscribers dictated by require parameter. Re-requesting topics.");
00342           request_topics();
00343         }
00344       }
00345     }
00346   }
00347 
00348   template<typename M>
00349   bool check_set(std::string param_name, M map) {
00350     XmlRpc::XmlRpcValue param_list;
00351     ros::param::get(param_name, param_list);
00352     ROS_ASSERT(param_list.getType() == XmlRpc::XmlRpcValue::TypeArray);
00353     for (int i = 0; i < param_list.size(); ++i) {
00354       ROS_ASSERT(param_list[i].getType() == XmlRpc::XmlRpcValue::TypeString);
00355       std::string required_topic((std::string(param_list[i])));
00356       // Iterate through map of registered topics, to ensure that this one is present.
00357       bool found = false;
00358       for (typename M::iterator j = map.begin(); j != map.end(); ++j) {
00359         if (nh_.resolveName(j->second->get_topic()) ==
00360             nh_.resolveName(required_topic)) {
00361           found = true;
00362           ROS_INFO_STREAM("Verified connection to topic " << required_topic << ", given in parameter " << param_name);
00363           break;
00364         }
00365       }
00366       if (!found) {
00367         ROS_WARN_STREAM("Missing connection to topic " << required_topic << ", required by parameter " << param_name);
00368         return false;
00369       }
00370     }
00371     return true;
00372   }
00373 
00374   static uint8_t checksum(ros::serialization::IStream& stream) {
00375     uint8_t sum = 0;
00376     for (uint16_t i = 0; i < stream.getLength(); ++i) {
00377       sum += stream.getData()[i];
00378     }
00379     return sum;
00380   }
00381 
00382   static uint8_t checksum(uint16_t val) {
00383     return (val >> 8) + val;
00384   }
00385 
00387 
00388   void setup_publisher(ros::serialization::IStream& stream) {
00389     rosserial_msgs::TopicInfo topic_info;
00390     ros::serialization::Serializer<rosserial_msgs::TopicInfo>::read(stream, topic_info);
00391 
00392     PublisherPtr pub(new Publisher(nh_, topic_info));
00393     callbacks_[topic_info.topic_id] = boost::bind(&Publisher::handle, pub, _1);
00394     publishers_[topic_info.topic_id] = pub;
00395 
00396     set_sync_timeout(timeout_interval_);
00397   }
00398 
00399   void setup_subscriber(ros::serialization::IStream& stream) {
00400     rosserial_msgs::TopicInfo topic_info;
00401     ros::serialization::Serializer<rosserial_msgs::TopicInfo>::read(stream, topic_info);
00402 
00403     SubscriberPtr sub(new Subscriber(nh_, topic_info,
00404         boost::bind(&Session::write_message, this, _1, topic_info.topic_id)));
00405     subscribers_[topic_info.topic_id] = sub;
00406 
00407     set_sync_timeout(timeout_interval_);
00408   }
00409 
00410   // When the rosserial client creates a ServiceClient object (and/or when it registers that object with the NodeHandle)
00411   // it creates a publisher (to publish the service request message to us) and a subscriber (to receive the response)
00412   // the service client callback is attached to the *subscriber*, so when we receive the service response
00413   // and wish to send it over the socket to the client,
00414   // we must attach the topicId that came from the service client subscriber message
00415 
00416   void setup_service_client_publisher(ros::serialization::IStream& stream) {
00417     rosserial_msgs::TopicInfo topic_info;
00418     ros::serialization::Serializer<rosserial_msgs::TopicInfo>::read(stream, topic_info);
00419 
00420     if (!services_.count(topic_info.topic_name)) {
00421       ROS_DEBUG("Creating service client for topic %s",topic_info.topic_name.c_str());
00422       ServiceClientPtr srv(new ServiceClient(
00423         nh_,topic_info,boost::bind(&Session::write_message, this, _1, _2)));
00424       services_[topic_info.topic_name] = srv;
00425       callbacks_[topic_info.topic_id] = boost::bind(&ServiceClient::handle, srv, _1);
00426     }
00427     if (services_[topic_info.topic_name]->getRequestMessageMD5() != topic_info.md5sum) {
00428       ROS_WARN("Service client setup: Request message MD5 mismatch between rosserial client and ROS");
00429     } else {
00430       ROS_DEBUG("Service client %s: request message MD5 successfully validated as %s",
00431         topic_info.topic_name.c_str(),topic_info.md5sum.c_str());
00432     }
00433     set_sync_timeout(timeout_interval_);
00434   }
00435 
00436   void setup_service_client_subscriber(ros::serialization::IStream& stream) {
00437     rosserial_msgs::TopicInfo topic_info;
00438     ros::serialization::Serializer<rosserial_msgs::TopicInfo>::read(stream, topic_info);
00439 
00440     if (!services_.count(topic_info.topic_name)) {
00441       ROS_DEBUG("Creating service client for topic %s",topic_info.topic_name.c_str());
00442       ServiceClientPtr srv(new ServiceClient(
00443         nh_,topic_info,boost::bind(&Session::write_message, this, _1, _2)));
00444       services_[topic_info.topic_name] = srv;
00445       callbacks_[topic_info.topic_id] = boost::bind(&ServiceClient::handle, srv, _1);
00446     }
00447     // see above comment regarding the service client callback for why we set topic_id here
00448     services_[topic_info.topic_name]->setTopicId(topic_info.topic_id);
00449     if (services_[topic_info.topic_name]->getResponseMessageMD5() != topic_info.md5sum) {
00450       ROS_WARN("Service client setup: Response message MD5 mismatch between rosserial client and ROS");
00451     } else {
00452       ROS_DEBUG("Service client %s: response message MD5 successfully validated as %s",
00453         topic_info.topic_name.c_str(),topic_info.md5sum.c_str());
00454     }
00455     set_sync_timeout(timeout_interval_);
00456   }
00457 
00458   void handle_log(ros::serialization::IStream& stream) {
00459     rosserial_msgs::Log l;
00460     ros::serialization::Serializer<rosserial_msgs::Log>::read(stream, l);
00461     if(l.level == rosserial_msgs::Log::ROSDEBUG) ROS_DEBUG("%s", l.msg.c_str());
00462     else if(l.level == rosserial_msgs::Log::INFO) ROS_INFO("%s", l.msg.c_str());
00463     else if(l.level == rosserial_msgs::Log::WARN) ROS_WARN("%s", l.msg.c_str());
00464     else if(l.level == rosserial_msgs::Log::ERROR) ROS_ERROR("%s", l.msg.c_str());
00465     else if(l.level == rosserial_msgs::Log::FATAL) ROS_FATAL("%s", l.msg.c_str());
00466   }
00467 
00468   void handle_time(ros::serialization::IStream& stream) {
00469     std_msgs::Time time;
00470     time.data = ros::Time::now();
00471 
00472     size_t length = ros::serialization::serializationLength(time);
00473     std::vector<uint8_t> message(length);
00474 
00475     ros::serialization::OStream ostream(&message[0], length);
00476     ros::serialization::Serializer<std_msgs::Time>::write(ostream, time);
00477 
00478     write_message(message, rosserial_msgs::TopicInfo::ID_TIME);
00479 
00480     // The MCU requesting the time from the server is the sync notification. This
00481     // call moves the timeout forward.
00482     set_sync_timeout(timeout_interval_);
00483   }
00484 
00485   Socket socket_;
00486   AsyncReadBuffer<Socket> async_read_buffer_;
00487   enum { buffer_max = 1023 };
00488   bool active_;
00489 
00490   ros::NodeHandle nh_;
00491   ros::CallbackQueue ros_callback_queue_;
00492 
00493   boost::posix_time::time_duration timeout_interval_;
00494   boost::posix_time::time_duration attempt_interval_;
00495   boost::posix_time::time_duration require_check_interval_;
00496   boost::posix_time::time_duration ros_spin_interval_;
00497   boost::asio::deadline_timer sync_timer_;
00498   boost::asio::deadline_timer require_check_timer_;
00499   boost::asio::deadline_timer ros_spin_timer_;
00500   std::string require_param_name_;
00501 
00502   std::map<uint16_t, boost::function<void(ros::serialization::IStream&)> > callbacks_;
00503   std::map<uint16_t, PublisherPtr> publishers_;
00504   std::map<uint16_t, SubscriberPtr> subscribers_;
00505   std::map<std::string, ServiceClientPtr> services_;
00506 };
00507 
00508 }  // namespace
00509 
00510 #endif  // ROSSERIAL_SERVER_SESSION_H


rosserial_server
Author(s): Mike Purvis
autogenerated on Sat Oct 7 2017 03:08:51