session.h
Go to the documentation of this file.
00001 
00035 #ifndef ROSSERIAL_SERVER_SESSION_H
00036 #define ROSSERIAL_SERVER_SESSION_H
00037 
00038 #include <map>
00039 #include <boost/bind.hpp>
00040 #include <boost/asio.hpp>
00041 #include <boost/function.hpp>
00042 
00043 #include <ros/ros.h>
00044 #include <rosserial_msgs/TopicInfo.h>
00045 #include <topic_tools/shape_shifter.h>
00046 #include <std_msgs/Time.h>
00047 
00048 #include "rosserial_server/async_read_buffer.h"
00049 #include "rosserial_server/topic_handlers.h"
00050 
00051 namespace rosserial_server
00052 {
00053 
00054 typedef std::vector<uint8_t> Buffer;
00055 typedef boost::shared_ptr<Buffer> BufferPtr;
00056 
00057 template<typename Socket>
00058 class Session
00059 {
00060 public:
00061   Session(boost::asio::io_service& io_service)
00062     : socket_(io_service), client_version(PROTOCOL_UNKNOWN),
00063       client_version_try(PROTOCOL_VER2),
00064       timeout_interval_(boost::posix_time::milliseconds(5000)),
00065       attempt_interval_(boost::posix_time::milliseconds(1000)),
00066       require_check_interval_(boost::posix_time::milliseconds(1000)),
00067       sync_timer_(io_service),
00068       require_check_timer_(io_service),
00069       async_read_buffer_(socket_, buffer_max,
00070                          boost::bind(&Session::read_failed, this,
00071                                      boost::asio::placeholders::error))
00072   {
00073     callbacks_[rosserial_msgs::TopicInfo::ID_PUBLISHER]
00074         = boost::bind(&Session::setup_publisher, this, _1);
00075     callbacks_[rosserial_msgs::TopicInfo::ID_SUBSCRIBER]
00076         = boost::bind(&Session::setup_subscriber, this, _1);
00077     callbacks_[rosserial_msgs::TopicInfo::ID_SERVICE_CLIENT+rosserial_msgs::TopicInfo::ID_PUBLISHER]
00078         = boost::bind(&Session::setup_service_client_publisher, this, _1);
00079     callbacks_[rosserial_msgs::TopicInfo::ID_SERVICE_CLIENT+rosserial_msgs::TopicInfo::ID_SUBSCRIBER]
00080         = boost::bind(&Session::setup_service_client_subscriber, this, _1);
00081     callbacks_[rosserial_msgs::TopicInfo::ID_SERVICE_SERVER+rosserial_msgs::TopicInfo::ID_PUBLISHER]
00082         = boost::bind(&Session::setup_service_server_publisher, this, _1);
00083     callbacks_[rosserial_msgs::TopicInfo::ID_SERVICE_SERVER+rosserial_msgs::TopicInfo::ID_SUBSCRIBER]
00084         = boost::bind(&Session::setup_service_server_subscriber, this, _1);
00085     callbacks_[rosserial_msgs::TopicInfo::ID_TIME]
00086         = boost::bind(&Session::handle_time, this, _1);
00087   }
00088 
00089   virtual ~Session()
00090   {
00091     abort_servers();
00092     ROS_INFO("Ending session.");
00093   }
00094 
00095   Socket& socket()
00096   {
00097     return socket_;
00098   }
00099 
00100   void start()
00101   {
00102     ROS_INFO("Starting session.");
00103 
00104     attempt_sync();
00105     read_sync_header();
00106   }
00107 
00108   enum Version {
00109     PROTOCOL_UNKNOWN = 0,
00110     PROTOCOL_VER1 = 1,
00111     PROTOCOL_VER2 = 2,
00112     PROTOCOL_MAX
00113   };
00114 
00115 private:
00117   // TODO: Total message timeout, implement primarily in ReadBuffer.
00118 
00119   void read_sync_header() {
00120     async_read_buffer_.read(1, boost::bind(&Session::read_sync_first, this, _1));
00121   }
00122 
00123   void read_sync_first(ros::serialization::IStream& stream) {
00124     uint8_t sync;
00125     stream >> sync;
00126     if (sync == 0xff) {
00127       async_read_buffer_.read(1, boost::bind(&Session::read_sync_second, this, _1));
00128     } else {
00129       read_sync_header();
00130     }
00131   }
00132 
00133   void read_sync_second(ros::serialization::IStream& stream) {
00134     uint8_t sync;
00135     stream >> sync;
00136     if (client_version == PROTOCOL_UNKNOWN) {
00137       if (sync == 0xff) {
00138         ROS_WARN("Attached client is using protocol VER1 (groovy)");
00139         client_version = PROTOCOL_VER1;
00140       } else if (sync == 0xfe) {
00141         ROS_INFO("Attached client is using protocol VER2 (hydro)");
00142         client_version = PROTOCOL_VER2;
00143       }
00144     }
00145     if (sync == 0xff && client_version == PROTOCOL_VER1) {
00146       async_read_buffer_.read(4, boost::bind(&Session::read_id_length, this, _1));
00147     } else if (sync == 0xfe && client_version == PROTOCOL_VER2) {
00148       async_read_buffer_.read(5, boost::bind(&Session::read_id_length, this, _1));
00149     } else {
00150       read_sync_header();
00151     }
00152   }
00153 
00154   void read_id_length(ros::serialization::IStream& stream) {
00155     uint16_t topic_id, length;
00156     uint8_t length_checksum;
00157     if (client_version == PROTOCOL_VER2) {
00158       // Complex header with checksum byte for length field.
00159       stream >> length >> length_checksum;
00160       if (length_checksum + checksum(length) != 0xff) {
00161         uint8_t csl = checksum(length);
00162         ROS_WARN("Bad message header length checksum. Dropping message from client. T%d L%d C%d %d", topic_id, length, length_checksum, csl);
00163         read_sync_header();
00164         return;
00165       } else {
00166         stream >> topic_id;
00167       }
00168     } else if (client_version == PROTOCOL_VER1) {
00169       // Simple header in VER1 protocol.
00170       stream >> topic_id >> length;
00171     }
00172     ROS_DEBUG("Received message header with length %d and topic_id=%d", length, topic_id);
00173 
00174     // Read message length + checksum byte.
00175     async_read_buffer_.read(length + 1, boost::bind(&Session::read_body, this,
00176                                                     _1, topic_id));
00177   }
00178 
00179   void read_body(ros::serialization::IStream& stream, uint16_t topic_id) {
00180     ROS_DEBUG("Received body of length %d for message on topic %d.", stream.getLength(), topic_id);
00181 
00182     ros::serialization::IStream checksum_stream(stream.getData(), stream.getLength());
00183 
00184     uint8_t msg_checksum = checksum(checksum_stream) + checksum(topic_id);
00185     if (client_version == PROTOCOL_VER1) {
00186       msg_checksum += checksum(stream.getLength() - 1);
00187     }
00188 
00189     if (msg_checksum != 0xff) {
00190       ROS_WARN("Rejecting message on topicId=%d, length=%d with bad checksum.", topic_id, stream.getLength());
00191     } else {
00192       if (callbacks_.count(topic_id) == 1) {
00193         try {
00194           callbacks_[topic_id](stream);
00195         } catch(ros::serialization::StreamOverrunException e) {
00196           if (topic_id < 100) {
00197             ROS_ERROR("Buffer overrun when attempting to parse setup message.");
00198             ROS_ERROR_ONCE("Is this firmware from a pre-Groovy rosserial?");
00199           } else {
00200             ROS_WARN("Buffer overrun when attempting to parse user message.");
00201           }
00202         }
00203       } else {
00204         ROS_WARN("Received message with unrecognized topicId (%d).", topic_id);
00205         // TODO: Resynchronize on multiples?
00206       }
00207     }
00208 
00209     // Kickoff next message read.
00210     read_sync_header();
00211   }
00212 
00213   void read_failed(const boost::system::error_code& error) {
00214     if (error == boost::system::errc::no_buffer_space) {
00215       // No worries. Begin syncing on a new message.
00216       ROS_WARN("Overrun on receive buffer. Attempting to regain rx sync.");
00217       read_sync_header();
00218     } else if (error) {
00219       // When some other read error has occurred, delete the whole session, which destroys
00220       // all publishers and subscribers.
00221       socket_.cancel();
00222       ROS_DEBUG_STREAM("Socket asio error: " << error);
00223       ROS_WARN("Stopping session due to read error.");
00224       delete this;
00225     }
00226   }
00227 
00229 
00230   void write_message(Buffer& message,
00231                      const uint16_t topic_id,
00232                      Session::Version version) {
00233     uint8_t overhead_bytes = 0;
00234     switch(version) {
00235       case PROTOCOL_VER2: overhead_bytes = 8; break;
00236       case PROTOCOL_VER1: overhead_bytes = 7; break;
00237       default:
00238         ROS_WARN("Aborting write_message: protocol unspecified.");
00239     }
00240 
00241     uint16_t length = overhead_bytes + message.size();
00242     BufferPtr buffer_ptr(new Buffer(length));
00243 
00244     uint8_t msg_checksum;
00245     ros::serialization::IStream checksum_stream(message.size() > 0 ? &message[0] : NULL, message.size());
00246 
00247     ros::serialization::OStream stream(&buffer_ptr->at(0), buffer_ptr->size());
00248     if (version == PROTOCOL_VER2) {
00249       uint8_t msg_len_checksum = 255 - checksum(message.size());
00250       stream << (uint16_t)0xfeff << (uint16_t)message.size() << msg_len_checksum << topic_id;
00251       msg_checksum = 255 - (checksum(checksum_stream) + checksum(topic_id));
00252     } else if (version == PROTOCOL_VER1) {
00253       stream << (uint16_t)0xffff << topic_id << (uint16_t)message.size();
00254       msg_checksum = 255 - (checksum(checksum_stream) + checksum(topic_id) + checksum(message.size()));
00255     }
00256     memcpy(stream.advance(message.size()), &message[0], message.size());
00257     stream << msg_checksum;
00258 
00259     // Will call immediately if we are already on the io_service thread. Otherwise,
00260     // the request is queued up and executed on that thread.
00261     socket_.get_io_service().dispatch(
00262         boost::bind(&Session::write_buffer, this, buffer_ptr));
00263   }
00264 
00265   // Function which is dispatched onto the io_service thread by write_message, so that
00266   // write_message may be safely called directly from the ROS background spinning thread.
00267   void write_buffer(BufferPtr buffer_ptr) {
00268     boost::asio::async_write(socket_, boost::asio::buffer(*buffer_ptr),
00269           boost::bind(&Session::write_cb, this, boost::asio::placeholders::error, buffer_ptr));
00270   }
00271 
00272   void write_cb(const boost::system::error_code& error,
00273                 BufferPtr buffer_ptr) {
00274     if (error) {
00275       if (error == boost::system::errc::io_error) {
00276         ROS_WARN_THROTTLE(1, "Socket write operation returned IO error.");
00277       } else if (error == boost::system::errc::no_such_device) {
00278         ROS_WARN_THROTTLE(1, "Socket write operation returned no device.");
00279       } else {
00280         socket_.cancel();
00281         ROS_WARN_STREAM_THROTTLE(1, "Unknown error returned during write operation: " << error);
00282         ROS_WARN("Destroying session.");
00283         delete this;
00284       }
00285     }
00286     // Buffer is destructed when this function exits.
00287   }
00288 
00290   void attempt_sync() {
00291     request_topics();
00292     set_sync_timeout(attempt_interval_);
00293   }
00294 
00295   void set_sync_timeout(const boost::posix_time::time_duration& interval) {
00296     sync_timer_.cancel();
00297     sync_timer_.expires_from_now(interval);
00298     sync_timer_.async_wait(boost::bind(&Session::sync_timeout, this,
00299           boost::asio::placeholders::error));
00300   }
00301 
00302   void abort_servers() {
00303     for(std::map<std::string, ServiceServerPtr>::iterator it = service_servers_.begin(); it != service_servers_.end(); ++it)
00304         it->second->abort();
00305   }
00306 
00307   void sync_timeout(const boost::system::error_code& error) {
00308     if (error == boost::asio::error::operation_aborted) {
00309       return;
00310     }
00311     ROS_WARN("Sync with device lost.");
00312     abort_servers();
00313     attempt_sync();
00314   }
00315 
00317   void request_topics() {
00318     // Once this session has previously connected using a given protocol version,
00319     // always attempt that one. If not, though, cycle between available options.
00320     if (client_version != PROTOCOL_UNKNOWN) client_version_try = client_version;
00321 
00322     std::vector<uint8_t> message(0);
00323     if (client_version_try == PROTOCOL_VER2) {
00324         ROS_DEBUG("Sending request topics message for VER2 protocol.");
00325         write_message(message, rosserial_msgs::TopicInfo::ID_PUBLISHER, PROTOCOL_VER2);
00326         client_version_try = PROTOCOL_VER1;
00327     } else if (client_version_try == PROTOCOL_VER1) {
00328         ROS_DEBUG("Sending request topics message for VER1 protocol.");
00329         write_message(message, rosserial_msgs::TopicInfo::ID_PUBLISHER, PROTOCOL_VER1);
00330         client_version_try = PROTOCOL_VER2;
00331     } else {
00332         client_version_try = PROTOCOL_VER2;
00333     }
00334 
00335     // Set timer for future point at which to verify the subscribers and publishers
00336     // created by the client against the expected set given in the parameters.
00337     require_check_timer_.expires_from_now(require_check_interval_);
00338     require_check_timer_.async_wait(boost::bind(&Session::required_topics_check, this,
00339           boost::asio::placeholders::error));
00340   }
00341 
00342   void required_topics_check(const boost::system::error_code& error) {
00343     if (ros::param::has("~require")) {
00344       if (!check_set("~require/publishers", publishers_) ||
00345           !check_set("~require/subscribers", subscribers_) ||
00346           !check_set("~require/service_clients", service_clients_) ||
00347           !check_set("~require/service_servers", service_servers_)) {
00348         ROS_WARN("Connected client failed to establish the interfaces dictated by require parameter. Re-requesting.");
00349         request_topics();
00350       }
00351     }
00352   }
00353 
00354   template<typename M>
00355   bool check_set(std::string param_name, M map) {
00356     XmlRpc::XmlRpcValue param_list;
00357     if(!ros::param::get(param_name, param_list)) return true;
00358     ROS_ASSERT(param_list.getType() == XmlRpc::XmlRpcValue::TypeArray);
00359     for (int i = 0; i < param_list.size(); ++i) {
00360       ROS_ASSERT(param_list[i].getType() == XmlRpc::XmlRpcValue::TypeString);
00361       std::string required_name((std::string(param_list[i])));
00362       // Iterate through map of registered names, to ensure that this one is present.
00363       bool found = false;
00364       for (typename M::iterator j = map.begin(); j != map.end(); ++j) {
00365         if (nh_.resolveName(j->second->get_name()) ==
00366             nh_.resolveName(required_name)) {
00367           found = true;
00368           break;
00369         }
00370       }
00371       if (!found) return false;
00372     }
00373     return true;
00374   }
00375 
00376   static uint8_t checksum(ros::serialization::IStream& stream) {
00377     uint8_t sum = 0;
00378     for (uint16_t i = 0; i < stream.getLength(); ++i) {
00379       sum += stream.getData()[i];
00380     }
00381     return sum;
00382   }
00383 
00384   static uint8_t checksum(uint16_t val) {
00385     return (val >> 8) + val;
00386   }
00387 
00389 
00390   void setup_publisher(ros::serialization::IStream& stream) {
00391     rosserial_msgs::TopicInfo topic_info;
00392     ros::serialization::Serializer<rosserial_msgs::TopicInfo>::read(stream, topic_info);
00393 
00394     PublisherPtr pub(new Publisher(nh_, topic_info));
00395     publishers_[topic_info.topic_id] = pub;
00396     callbacks_[topic_info.topic_id] = boost::bind(&Publisher::handle, pub, _1);
00397 
00398     set_sync_timeout(timeout_interval_);
00399   }
00400 
00401   void setup_subscriber(ros::serialization::IStream& stream) {
00402     rosserial_msgs::TopicInfo topic_info;
00403     ros::serialization::Serializer<rosserial_msgs::TopicInfo>::read(stream, topic_info);
00404 
00405     SubscriberPtr sub(new Subscriber(nh_, topic_info,
00406         boost::bind(&Session::write_message, this, _1, topic_info.topic_id, client_version)));
00407     subscribers_[topic_info.topic_id] = sub;
00408 
00409     set_sync_timeout(timeout_interval_);
00410   }
00411 
00412   // When the rosserial client creates a ServiceClient object (and/or when it registers that object with the NodeHandle)
00413   // it creates a publisher (to publish the service request message to us) and a subscriber (to receive the response)
00414   // the service client callback is attached to the *subscriber*, so when we receive the service response
00415   // and wish to send it over the socket to the client,
00416   // we must attach the topicId that came from the service client subscriber message
00417 
00418   void setup_service_client_publisher(ros::serialization::IStream& stream) {
00419     rosserial_msgs::TopicInfo topic_info;
00420     ros::serialization::Serializer<rosserial_msgs::TopicInfo>::read(stream, topic_info);
00421 
00422     if (!service_clients_.count(topic_info.topic_name)) {
00423       ROS_DEBUG("Creating service client for topic %s",topic_info.topic_name.c_str());
00424       ServiceClientPtr srv(new ServiceClient(
00425         nh_,topic_info,boost::bind(&Session::write_message, this, _1, _2, client_version)));
00426       service_clients_[topic_info.topic_name] = srv;
00427       callbacks_[topic_info.topic_id] = boost::bind(&ServiceClient::handle, srv, _1);
00428     }
00429     if (service_clients_[topic_info.topic_name]->getRequestMessageMD5() != topic_info.md5sum) {
00430       ROS_WARN("Service client setup: Request message MD5 mismatch between rosserial client and ROS");
00431     } else {
00432       ROS_DEBUG("Service client %s: request message MD5 successfully validated as %s",
00433         topic_info.topic_name.c_str(),topic_info.md5sum.c_str());
00434     }
00435     set_sync_timeout(timeout_interval_);
00436   }
00437 
00438   void setup_service_client_subscriber(ros::serialization::IStream& stream) {
00439     rosserial_msgs::TopicInfo topic_info;
00440     ros::serialization::Serializer<rosserial_msgs::TopicInfo>::read(stream, topic_info);
00441 
00442     if (!service_clients_.count(topic_info.topic_name)) {
00443       ROS_DEBUG("Creating service client for topic %s",topic_info.topic_name.c_str());
00444       ServiceClientPtr srv(new ServiceClient(
00445         nh_,topic_info,boost::bind(&Session::write_message, this, _1, _2, client_version)));
00446       service_clients_[topic_info.topic_name] = srv;
00447       callbacks_[topic_info.topic_id] = boost::bind(&ServiceClient::handle, srv, _1);
00448     }
00449     // see above comment regarding the service client callback for why we set topic_id here
00450     service_clients_[topic_info.topic_name]->setTopicId(topic_info.topic_id);
00451     if (service_clients_[topic_info.topic_name]->getResponseMessageMD5() != topic_info.md5sum) {
00452       ROS_WARN("Service client setup: Response message MD5 mismatch between rosserial client and ROS");
00453     } else {
00454       ROS_DEBUG("Service client %s: response message MD5 successfully validated as %s",
00455         topic_info.topic_name.c_str(),topic_info.md5sum.c_str());
00456     }
00457     set_sync_timeout(timeout_interval_);
00458   }
00459 
00460   void setup_service_server_publisher(ros::serialization::IStream& stream) {
00461     rosserial_msgs::TopicInfo topic_info;
00462     ros::serialization::Serializer<rosserial_msgs::TopicInfo>::read(stream, topic_info);
00463 
00464     if (!service_servers_.count(topic_info.topic_name)) {
00465       ROS_DEBUG("Creating service server for topic %s",topic_info.topic_name.c_str());
00466       ServiceServerPtr srv(new ServiceServer(
00467         nh_,topic_info,boost::bind(&Session::write_message, this, _1, _2, client_version)));
00468       service_servers_[topic_info.topic_name] = srv;
00469       callbacks_[topic_info.topic_id] = boost::bind(&ServiceServer::handle, srv, _1);
00470     }
00471     if (service_servers_[topic_info.topic_name]->getResponseMessageMD5() != topic_info.md5sum) {
00472       ROS_WARN("Service client setup: Response message MD5 mismatch between rosserial client and ROS");
00473     } else {
00474       ROS_DEBUG("Service client %s: request message MD5 successfully validated as %s",
00475         topic_info.topic_name.c_str(),topic_info.md5sum.c_str());
00476     }
00477     set_sync_timeout(timeout_interval_);
00478   }
00479 
00480   void setup_service_server_subscriber(ros::serialization::IStream& stream) {
00481     rosserial_msgs::TopicInfo topic_info;
00482     ros::serialization::Serializer<rosserial_msgs::TopicInfo>::read(stream, topic_info);
00483 
00484     if (!service_servers_.count(topic_info.topic_name)) {
00485       ROS_DEBUG("Creating service client for topic %s",topic_info.topic_name.c_str());
00486       ServiceServerPtr srv(new ServiceServer(
00487         nh_,topic_info,boost::bind(&Session::write_message, this, _1, _2, client_version)));
00488       service_servers_[topic_info.topic_name] = srv;
00489       callbacks_[topic_info.topic_id] = boost::bind(&ServiceServer::handle, srv, _1);
00490     }
00491     service_servers_[topic_info.topic_name]->setTopicId(topic_info.topic_id);
00492     if (service_servers_[topic_info.topic_name]->getRequestMessageMD5() != topic_info.md5sum) {
00493       ROS_WARN("Service client setup: Request message MD5 mismatch between rosserial client and ROS");
00494     } else {
00495       ROS_DEBUG("Service client %s: request message MD5 successfully validated as %s",
00496         topic_info.topic_name.c_str(),topic_info.md5sum.c_str());
00497     }
00498     set_sync_timeout(timeout_interval_);
00499   }
00500 
00501 
00502   void handle_time(ros::serialization::IStream& stream) {
00503     std_msgs::Time time;
00504     time.data = ros::Time::now();
00505 
00506     size_t length = ros::serialization::serializationLength(time);
00507     std::vector<uint8_t> message(length);
00508 
00509     ros::serialization::OStream ostream(&message[0], length);
00510     ros::serialization::Serializer<std_msgs::Time>::write(ostream, time);
00511 
00512     write_message(message, rosserial_msgs::TopicInfo::ID_TIME, client_version);
00513 
00514     // The MCU requesting the time from the server is the sync notification. This
00515     // call moves the timeout forward.
00516     set_sync_timeout(timeout_interval_);
00517   }
00518 
00519   Socket socket_;
00520   AsyncReadBuffer<Socket> async_read_buffer_;
00521   enum { buffer_max = 1023 };
00522   ros::NodeHandle nh_;
00523 
00524   Session::Version client_version;
00525   Session::Version client_version_try;
00526   boost::posix_time::time_duration timeout_interval_;
00527   boost::posix_time::time_duration attempt_interval_;
00528   boost::posix_time::time_duration require_check_interval_;
00529   boost::asio::deadline_timer sync_timer_;
00530   boost::asio::deadline_timer require_check_timer_;
00531 
00532   std::map< uint16_t, boost::function<void(ros::serialization::IStream)> > callbacks_;
00533   std::map<uint16_t, PublisherPtr> publishers_;
00534   std::map<uint16_t, SubscriberPtr> subscribers_;
00535   std::map<std::string, ServiceClientPtr> service_clients_;
00536   std::map<std::string, ServiceServerPtr> service_servers_;
00537 };
00538 
00539 }  // namespace
00540 
00541 #endif  // ROSSERIAL_SERVER_SESSION_H


rosserial_server
Author(s): Mike Purvis
autogenerated on Thu Jun 6 2019 19:56:34