Session.h
Go to the documentation of this file.
00001 
00035 #include <map>
00036 #include <boost/bind.hpp>
00037 #include <boost/asio.hpp>
00038 #include <boost/function.hpp>
00039 
00040 #include <ros/ros.h>
00041 #include <rosserial_msgs/TopicInfo.h>
00042 #include <topic_tools/shape_shifter.h>
00043 #include <std_msgs/Time.h>
00044 
00045 #include "AsyncReadBuffer.h"
00046 #include "topic_handlers.h"
00047 
00048 typedef std::vector<uint8_t> Buffer;
00049 typedef boost::shared_ptr<Buffer> BufferPtr;
00050 
00051 template<typename Socket>
00052 class Session
00053 {
00054 public:
00055   Session(boost::asio::io_service& io_service)
00056     : socket_(io_service), client_version(PROTOCOL_UNKNOWN),
00057       client_version_try(PROTOCOL_VER2),
00058       timeout_interval_(boost::posix_time::milliseconds(5000)),
00059       attempt_interval_(boost::posix_time::milliseconds(1000)),
00060       require_check_interval_(boost::posix_time::milliseconds(1000)),
00061       sync_timer_(io_service), 
00062       require_check_timer_(io_service), 
00063       async_read_buffer_(socket_, buffer_max, 
00064                          boost::bind(&Session::read_failed, this,
00065                                      boost::asio::placeholders::error))
00066   {
00067     callbacks_[rosserial_msgs::TopicInfo::ID_PUBLISHER]
00068         = boost::bind(&Session::setup_publisher, this, _1);
00069     callbacks_[rosserial_msgs::TopicInfo::ID_SUBSCRIBER]
00070         = boost::bind(&Session::setup_subscriber, this, _1);
00071     callbacks_[rosserial_msgs::TopicInfo::ID_TIME]
00072         = boost::bind(&Session::handle_time, this, _1);
00073   }
00074 
00075   virtual ~Session()
00076   {
00077     ROS_INFO("Ending session.");  
00078   }
00079 
00080   Socket& socket()
00081   {
00082     return socket_;
00083   }
00084 
00085   void start()
00086   {
00087     ROS_INFO("Starting session.");
00088 
00089     attempt_sync();
00090     read_sync_header();
00091   }
00092 
00093   enum Version {
00094     PROTOCOL_UNKNOWN = 0,
00095     PROTOCOL_VER1 = 1,
00096     PROTOCOL_VER2 = 2,
00097     PROTOCOL_MAX
00098   };
00099 
00100 private:
00102   // TODO: Total message timeout, implement primarily in ReadBuffer.
00103 
00104   void read_sync_header() {
00105     async_read_buffer_.read(1, boost::bind(&Session::read_sync_first, this, _1));
00106   }
00107   
00108   void read_sync_first(ros::serialization::IStream& stream) {
00109     uint8_t sync;
00110     stream >> sync;
00111     if (sync == 0xff) {
00112       async_read_buffer_.read(1, boost::bind(&Session::read_sync_second, this, _1));
00113     } else {
00114       read_sync_header();
00115     }
00116   }
00117   
00118   void read_sync_second(ros::serialization::IStream& stream) {
00119     uint8_t sync;
00120     stream >> sync;
00121     if (client_version == PROTOCOL_UNKNOWN) {
00122       if (sync == 0xff) {
00123         ROS_WARN("Attached client is using protocol VER1 (groovy)");
00124         client_version = PROTOCOL_VER1;
00125       } else if (sync == 0xfe) {
00126         ROS_INFO("Attached client is using protocol VER2 (hydro)");
00127         client_version = PROTOCOL_VER2;
00128       }
00129     }
00130     if (sync == 0xff && client_version == PROTOCOL_VER1) {
00131       async_read_buffer_.read(4, boost::bind(&Session::read_id_length, this, _1));
00132     } else if (sync == 0xfe && client_version == PROTOCOL_VER2) {
00133       async_read_buffer_.read(5, boost::bind(&Session::read_id_length, this, _1)); 
00134     } else {
00135       read_sync_header();
00136     }
00137   }
00138 
00139   void read_id_length(ros::serialization::IStream& stream) {
00140     uint16_t topic_id, length;
00141     uint8_t length_checksum;
00142     if (client_version == PROTOCOL_VER2) {
00143       // Complex header with checksum byte for length field.
00144       stream >> length >> length_checksum;
00145       if (length_checksum + checksum(length) != 0xff) {
00146         uint8_t csl = checksum(length);
00147         ROS_WARN("Bad message header length checksum. Dropping message from client. T%d L%d C%d %d", topic_id, length, length_checksum, csl);
00148         read_sync_header();
00149         return;
00150       } else {
00151         stream >> topic_id;
00152       }
00153     } else if (client_version == PROTOCOL_VER1) {
00154       // Simple header in VER1 protocol.
00155       stream >> topic_id >> length;
00156     }
00157     ROS_DEBUG("Received message header with length %d and topic_id=%d", length, topic_id);
00158 
00159     // Read message length + checksum byte.
00160     async_read_buffer_.read(length + 1, boost::bind(&Session::read_body, this,
00161                                                     _1, topic_id));
00162   }
00163 
00164   void read_body(ros::serialization::IStream& stream, uint16_t topic_id) {
00165     ROS_DEBUG("Received body of length %d for message on topic %d.", stream.getLength(), topic_id);
00166     
00167     ros::serialization::IStream checksum_stream(stream.getData(), stream.getLength());
00168 
00169     uint8_t msg_checksum = checksum(checksum_stream) + checksum(topic_id);
00170     if (client_version == PROTOCOL_VER1) {
00171       msg_checksum += checksum(stream.getLength() - 1);
00172     }
00173 
00174     if (msg_checksum != 0xff) {
00175       ROS_WARN("Rejecting message on topicId=%d, length=%d with bad checksum.", topic_id, stream.getLength());
00176     } else {
00177       if (callbacks_.count(topic_id) == 1) {
00178         try {
00179           callbacks_[topic_id](stream);
00180         } catch(ros::serialization::StreamOverrunException e) {
00181           if (topic_id < 100) {
00182             ROS_ERROR("Buffer overrun when attempting to parse setup message.");
00183             ROS_ERROR_ONCE("Is this firmware from a pre-Groovy rosserial?");
00184           } else {
00185             ROS_WARN("Buffer overrun when attempting to parse user message.");
00186           }
00187         }
00188       } else {
00189         ROS_WARN("Received message with unrecognized topicId (%d).", topic_id);
00190         // TODO: Resynchronize on multiples?
00191       }
00192     }
00193 
00194     // Kickoff next message read.
00195     read_sync_header();
00196   }
00197 
00198   void read_failed(const boost::system::error_code& error) {
00199     if (error == boost::system::errc::no_buffer_space) {
00200       // No worries. Begin syncing on a new message.
00201       ROS_WARN("Overrun on receive buffer. Attempting to regain rx sync.");
00202       read_sync_header();
00203     } else if (error) {
00204       // When some other read error has occurred, delete the whole session, which destroys
00205       // all publishers and subscribers.
00206       socket_.cancel();
00207       ROS_DEBUG_STREAM("Socket asio error: " << error);
00208       ROS_WARN("Stopping session due to read error.");
00209       delete this;
00210     }
00211   }
00212 
00214 
00215   void write_message(Buffer& message,
00216                      const uint16_t topic_id, 
00217                      Session::Version version) {
00218     uint8_t overhead_bytes = 0;
00219     switch(version) {
00220       case PROTOCOL_VER2: overhead_bytes = 8; break;
00221       case PROTOCOL_VER1: overhead_bytes = 7; break;
00222       default:
00223         ROS_WARN("Aborting write_message: protocol unspecified.");
00224     }
00225 
00226     uint16_t length = overhead_bytes + message.size();
00227     BufferPtr buffer_ptr(new Buffer(length));
00228 
00229     uint8_t msg_checksum;
00230     ros::serialization::IStream checksum_stream(message.size() > 0 ? &message[0] : NULL, message.size());
00231    
00232     ros::serialization::OStream stream(&buffer_ptr->at(0), buffer_ptr->size()); 
00233     if (version == PROTOCOL_VER2) {
00234       uint8_t msg_len_checksum = 255 - checksum(message.size());
00235       stream << (uint16_t)0xfeff << (uint16_t)message.size() << msg_len_checksum << topic_id;
00236       msg_checksum = 255 - (checksum(checksum_stream) + checksum(topic_id));
00237     } else if (version == PROTOCOL_VER1) {
00238       stream << (uint16_t)0xffff << topic_id << (uint16_t)message.size();
00239       msg_checksum = 255 - (checksum(checksum_stream) + checksum(topic_id) + checksum(message.size()));
00240     }
00241     memcpy(stream.advance(message.size()), &message[0], message.size());
00242     stream << msg_checksum;
00243 
00244     // Will call immediately if we are already on the io_service thread. Otherwise,
00245     // the request is queued up and executed on that thread.
00246     socket_.get_io_service().dispatch(
00247         boost::bind(&Session::write_buffer, this, buffer_ptr));
00248   }
00249 
00250   // Function which is dispatched onto the io_service thread by write_message, so that 
00251   // write_message may be safely called directly from the ROS background spinning thread.
00252   void write_buffer(BufferPtr buffer_ptr) {
00253     boost::asio::async_write(socket_, boost::asio::buffer(*buffer_ptr),
00254           boost::bind(&Session::write_cb, this, boost::asio::placeholders::error, buffer_ptr));
00255   }
00256 
00257   void write_cb(const boost::system::error_code& error,
00258                 BufferPtr buffer_ptr) { 
00259     if (error) {
00260       ROS_DEBUG_STREAM("Socket asio error: " << error);
00261       ROS_WARN("Stopping session due to write error.");
00262       delete this;
00263     }
00264     // Buffer is destructed when this function exits.
00265   }
00266 
00268   void attempt_sync() {
00269     request_topics();
00270     set_sync_timeout(attempt_interval_);
00271   }
00272 
00273   void set_sync_timeout(const boost::posix_time::time_duration& interval) {
00274     sync_timer_.cancel();
00275     sync_timer_.expires_from_now(interval);
00276     sync_timer_.async_wait(boost::bind(&Session::sync_timeout, this,
00277           boost::asio::placeholders::error));
00278   }
00279 
00280   void sync_timeout(const boost::system::error_code& error) {
00281     if (error == boost::asio::error::operation_aborted) {
00282       return;
00283     }
00284     ROS_WARN("Sync with device lost.");
00285     attempt_sync(); 
00286   }
00287 
00289   void request_topics() {
00290     // Once this session has previously connected using a given protocol version,
00291     // always attempt that one. If not, though, cycle between available options.
00292     if (client_version != PROTOCOL_UNKNOWN) client_version_try = client_version;
00293 
00294     std::vector<uint8_t> message(0);
00295     if (client_version_try == PROTOCOL_VER2) {
00296         ROS_DEBUG("Sending request topics message for VER2 protocol.");
00297         write_message(message, rosserial_msgs::TopicInfo::ID_PUBLISHER, PROTOCOL_VER2);
00298         client_version_try = PROTOCOL_VER1;
00299     } else if (client_version_try == PROTOCOL_VER1) {
00300         ROS_DEBUG("Sending request topics message for VER1 protocol.");
00301         write_message(message, rosserial_msgs::TopicInfo::ID_PUBLISHER, PROTOCOL_VER1);
00302         client_version_try = PROTOCOL_VER2;
00303     } else {
00304         client_version_try = PROTOCOL_VER2;
00305     }
00306 
00307     // Set timer for future point at which to verify the subscribers and publishers
00308     // created by the client against the expected set given in the parameters.
00309     require_check_timer_.expires_from_now(require_check_interval_);
00310     require_check_timer_.async_wait(boost::bind(&Session::required_topics_check, this,
00311           boost::asio::placeholders::error)); 
00312   }
00313 
00314   void required_topics_check(const boost::system::error_code& error) {
00315     if (ros::param::has("~require")) {
00316       if (!check_set("~require/publishers", publishers_) ||
00317           !check_set("~require/subscribers", subscribers_)) {
00318         ROS_WARN("Connected client failed to establish the publishers and subscribers dictated by require parameter. Re-requesting topics.");
00319         request_topics();
00320       }
00321     }
00322   }
00323   
00324   template<typename M>
00325   bool check_set(std::string param_name, M map) {
00326     XmlRpc::XmlRpcValue param_list;
00327     ros::param::get(param_name, param_list);
00328     ROS_ASSERT(param_list.getType() == XmlRpc::XmlRpcValue::TypeArray);
00329     for (int i = 0; i < param_list.size(); ++i) {
00330       ROS_ASSERT(param_list[i].getType() == XmlRpc::XmlRpcValue::TypeString);
00331       std::string required_topic((std::string(param_list[i])));
00332       // Iterate through map of registered topics, to ensure that this one is present.
00333       bool found = false;
00334       for (typename M::iterator j = map.begin(); j != map.end(); ++j) {
00335         if (nh_.resolveName(j->second->get_topic()) ==
00336             nh_.resolveName(required_topic)) {
00337           found = true;
00338           break;
00339         }
00340       }
00341       if (!found) return false;
00342     }  
00343     return true;
00344   }
00345 
00346   static uint8_t checksum(ros::serialization::IStream& stream) {
00347     uint8_t sum = 0;
00348     for (uint16_t i = 0; i < stream.getLength(); ++i) {
00349       sum += stream.getData()[i];
00350     }
00351     return sum;
00352   }
00353 
00354   static uint8_t checksum(uint16_t val) {
00355     return (val >> 8) | val;
00356   }
00357 
00359 
00360   void setup_publisher(ros::serialization::IStream& stream) {
00361     rosserial_msgs::TopicInfo topic_info;
00362     ros::serialization::Serializer<rosserial_msgs::TopicInfo>::read(stream, topic_info);
00363 
00364     PublisherPtr pub(new Publisher(nh_, topic_info));
00365     publishers_[topic_info.topic_id] = pub;
00366     callbacks_[topic_info.topic_id] = boost::bind(&Publisher::handle, pub, _1);
00367 
00368     set_sync_timeout(timeout_interval_);
00369   }
00370   
00371   void setup_subscriber(ros::serialization::IStream& stream) {
00372     rosserial_msgs::TopicInfo topic_info;
00373     ros::serialization::Serializer<rosserial_msgs::TopicInfo>::read(stream, topic_info);
00374 
00375     SubscriberPtr sub(new Subscriber(nh_, topic_info,
00376         boost::bind(&Session::write_message, this, _1, topic_info.topic_id, client_version)));
00377     subscribers_[topic_info.topic_id] = sub;
00378 
00379     set_sync_timeout(timeout_interval_);
00380   }
00381 
00382   void handle_time(ros::serialization::IStream& stream) {
00383     std_msgs::Time time;
00384     time.data = ros::Time::now();
00385 
00386     size_t length = ros::serialization::serializationLength(time);
00387     std::vector<uint8_t> message(length);
00388 
00389     ros::serialization::OStream ostream(&message[0], length);
00390     ros::serialization::Serializer<std_msgs::Time>::write(ostream, time); 
00391  
00392     write_message(message, rosserial_msgs::TopicInfo::ID_TIME, client_version);
00393 
00394     // The MCU requesting the time from the server is the sync notification. This
00395     // call moves the timeout forward.
00396     set_sync_timeout(timeout_interval_);
00397   }
00398 
00399   Socket socket_;
00400   AsyncReadBuffer<Socket> async_read_buffer_;
00401   enum { buffer_max = 1023 };
00402   ros::NodeHandle nh_;
00403 
00404   Session::Version client_version;
00405   Session::Version client_version_try;
00406   boost::posix_time::time_duration timeout_interval_;
00407   boost::posix_time::time_duration attempt_interval_;
00408   boost::posix_time::time_duration require_check_interval_;
00409   boost::asio::deadline_timer sync_timer_;
00410   boost::asio::deadline_timer require_check_timer_;
00411 
00412   std::map< uint16_t, boost::function<void(ros::serialization::IStream)> > callbacks_;
00413   std::map<uint16_t, PublisherPtr> publishers_;
00414   std::map<uint16_t, SubscriberPtr> subscribers_;
00415 };
00416 
00417 


rosserial_server
Author(s): Mike Purvis
autogenerated on Fri Aug 28 2015 12:44:56