00001
00035 #include <map>
00036 #include <boost/bind.hpp>
00037 #include <boost/asio.hpp>
00038 #include <boost/function.hpp>
00039
00040 #include <ros/ros.h>
00041 #include <rosserial_msgs/TopicInfo.h>
00042 #include <topic_tools/shape_shifter.h>
00043 #include <std_msgs/Time.h>
00044
00045 #include "AsyncReadBuffer.h"
00046 #include "topic_handlers.h"
00047
00048 typedef std::vector<uint8_t> Buffer;
00049 typedef boost::shared_ptr<Buffer> BufferPtr;
00050
00051 template<typename Socket>
00052 class Session
00053 {
00054 public:
00055 Session(boost::asio::io_service& io_service)
00056 : socket_(io_service), client_version(PROTOCOL_UNKNOWN),
00057 client_version_try(PROTOCOL_VER2),
00058 timeout_interval_(boost::posix_time::milliseconds(5000)),
00059 attempt_interval_(boost::posix_time::milliseconds(1000)),
00060 require_check_interval_(boost::posix_time::milliseconds(1000)),
00061 sync_timer_(io_service),
00062 require_check_timer_(io_service),
00063 async_read_buffer_(socket_, buffer_max,
00064 boost::bind(&Session::read_failed, this,
00065 boost::asio::placeholders::error))
00066 {
00067 callbacks_[rosserial_msgs::TopicInfo::ID_PUBLISHER]
00068 = boost::bind(&Session::setup_publisher, this, _1);
00069 callbacks_[rosserial_msgs::TopicInfo::ID_SUBSCRIBER]
00070 = boost::bind(&Session::setup_subscriber, this, _1);
00071 callbacks_[rosserial_msgs::TopicInfo::ID_TIME]
00072 = boost::bind(&Session::handle_time, this, _1);
00073 }
00074
00075 virtual ~Session()
00076 {
00077 ROS_INFO("Ending session.");
00078 }
00079
00080 Socket& socket()
00081 {
00082 return socket_;
00083 }
00084
00085 void start()
00086 {
00087 ROS_INFO("Starting session.");
00088
00089 attempt_sync();
00090 read_sync_header();
00091 }
00092
00093 enum Version {
00094 PROTOCOL_UNKNOWN = 0,
00095 PROTOCOL_VER1 = 1,
00096 PROTOCOL_VER2 = 2,
00097 PROTOCOL_MAX
00098 };
00099
00100 private:
00102
00103
00104 void read_sync_header() {
00105 async_read_buffer_.read(1, boost::bind(&Session::read_sync_first, this, _1));
00106 }
00107
00108 void read_sync_first(ros::serialization::IStream& stream) {
00109 uint8_t sync;
00110 stream >> sync;
00111 if (sync == 0xff) {
00112 async_read_buffer_.read(1, boost::bind(&Session::read_sync_second, this, _1));
00113 } else {
00114 read_sync_header();
00115 }
00116 }
00117
00118 void read_sync_second(ros::serialization::IStream& stream) {
00119 uint8_t sync;
00120 stream >> sync;
00121 if (client_version == PROTOCOL_UNKNOWN) {
00122 if (sync == 0xff) {
00123 ROS_WARN("Attached client is using protocol VER1 (groovy)");
00124 client_version = PROTOCOL_VER1;
00125 } else if (sync == 0xfe) {
00126 ROS_INFO("Attached client is using protocol VER2 (hydro)");
00127 client_version = PROTOCOL_VER2;
00128 }
00129 }
00130 if (sync == 0xff && client_version == PROTOCOL_VER1) {
00131 async_read_buffer_.read(4, boost::bind(&Session::read_id_length, this, _1));
00132 } else if (sync == 0xfe && client_version == PROTOCOL_VER2) {
00133 async_read_buffer_.read(5, boost::bind(&Session::read_id_length, this, _1));
00134 } else {
00135 read_sync_header();
00136 }
00137 }
00138
00139 void read_id_length(ros::serialization::IStream& stream) {
00140 uint16_t topic_id, length;
00141 uint8_t length_checksum;
00142 if (client_version == PROTOCOL_VER2) {
00143
00144 stream >> length >> length_checksum;
00145 if (length_checksum + checksum(length) != 0xff) {
00146 uint8_t csl = checksum(length);
00147 ROS_WARN("Bad message header length checksum. Dropping message from client. T%d L%d C%d %d", topic_id, length, length_checksum, csl);
00148 read_sync_header();
00149 return;
00150 } else {
00151 stream >> topic_id;
00152 }
00153 } else if (client_version == PROTOCOL_VER1) {
00154
00155 stream >> topic_id >> length;
00156 }
00157 ROS_DEBUG("Received message header with length %d and topic_id=%d", length, topic_id);
00158
00159
00160 async_read_buffer_.read(length + 1, boost::bind(&Session::read_body, this,
00161 _1, topic_id));
00162 }
00163
00164 void read_body(ros::serialization::IStream& stream, uint16_t topic_id) {
00165 ROS_DEBUG("Received body of length %d for message on topic %d.", stream.getLength(), topic_id);
00166
00167 ros::serialization::IStream checksum_stream(stream.getData(), stream.getLength());
00168
00169 uint8_t msg_checksum = checksum(checksum_stream) + checksum(topic_id);
00170 if (client_version == PROTOCOL_VER1) {
00171 msg_checksum += checksum(stream.getLength() - 1);
00172 }
00173
00174 if (msg_checksum != 0xff) {
00175 ROS_WARN("Rejecting message on topicId=%d, length=%d with bad checksum.", topic_id, stream.getLength());
00176 } else {
00177 if (callbacks_.count(topic_id) == 1) {
00178 try {
00179 callbacks_[topic_id](stream);
00180 } catch(ros::serialization::StreamOverrunException e) {
00181 if (topic_id < 100) {
00182 ROS_ERROR("Buffer overrun when attempting to parse setup message.");
00183 ROS_ERROR_ONCE("Is this firmware from a pre-Groovy rosserial?");
00184 } else {
00185 ROS_WARN("Buffer overrun when attempting to parse user message.");
00186 }
00187 }
00188 } else {
00189 ROS_WARN("Received message with unrecognized topicId (%d).", topic_id);
00190
00191 }
00192 }
00193
00194
00195 read_sync_header();
00196 }
00197
00198 void read_failed(const boost::system::error_code& error) {
00199 if (error == boost::system::errc::no_buffer_space) {
00200
00201 ROS_WARN("Overrun on receive buffer. Attempting to regain rx sync.");
00202 read_sync_header();
00203 } else if (error) {
00204
00205
00206 socket_.cancel();
00207 ROS_DEBUG_STREAM("Socket asio error: " << error);
00208 ROS_WARN("Stopping session due to read error.");
00209 delete this;
00210 }
00211 }
00212
00214
00215 void write_message(Buffer& message,
00216 const uint16_t topic_id,
00217 Session::Version version) {
00218 uint8_t overhead_bytes = 0;
00219 switch(version) {
00220 case PROTOCOL_VER2: overhead_bytes = 8; break;
00221 case PROTOCOL_VER1: overhead_bytes = 7; break;
00222 default:
00223 ROS_WARN("Aborting write_message: protocol unspecified.");
00224 }
00225
00226 uint16_t length = overhead_bytes + message.size();
00227 BufferPtr buffer_ptr(new Buffer(length));
00228
00229 uint8_t msg_checksum;
00230 ros::serialization::IStream checksum_stream(message.size() > 0 ? &message[0] : NULL, message.size());
00231
00232 ros::serialization::OStream stream(&buffer_ptr->at(0), buffer_ptr->size());
00233 if (version == PROTOCOL_VER2) {
00234 uint8_t msg_len_checksum = 255 - checksum(message.size());
00235 stream << (uint16_t)0xfeff << (uint16_t)message.size() << msg_len_checksum << topic_id;
00236 msg_checksum = 255 - (checksum(checksum_stream) + checksum(topic_id));
00237 } else if (version == PROTOCOL_VER1) {
00238 stream << (uint16_t)0xffff << topic_id << (uint16_t)message.size();
00239 msg_checksum = 255 - (checksum(checksum_stream) + checksum(topic_id) + checksum(message.size()));
00240 }
00241 memcpy(stream.advance(message.size()), &message[0], message.size());
00242 stream << msg_checksum;
00243
00244
00245
00246 socket_.get_io_service().dispatch(
00247 boost::bind(&Session::write_buffer, this, buffer_ptr));
00248 }
00249
00250
00251
00252 void write_buffer(BufferPtr buffer_ptr) {
00253 boost::asio::async_write(socket_, boost::asio::buffer(*buffer_ptr),
00254 boost::bind(&Session::write_cb, this, boost::asio::placeholders::error, buffer_ptr));
00255 }
00256
00257 void write_cb(const boost::system::error_code& error,
00258 BufferPtr buffer_ptr) {
00259 if (error) {
00260 ROS_DEBUG_STREAM("Socket asio error: " << error);
00261 ROS_WARN("Stopping session due to write error.");
00262 delete this;
00263 }
00264
00265 }
00266
00268 void attempt_sync() {
00269 request_topics();
00270 set_sync_timeout(attempt_interval_);
00271 }
00272
00273 void set_sync_timeout(const boost::posix_time::time_duration& interval) {
00274 sync_timer_.cancel();
00275 sync_timer_.expires_from_now(interval);
00276 sync_timer_.async_wait(boost::bind(&Session::sync_timeout, this,
00277 boost::asio::placeholders::error));
00278 }
00279
00280 void sync_timeout(const boost::system::error_code& error) {
00281 if (error == boost::asio::error::operation_aborted) {
00282 return;
00283 }
00284 ROS_WARN("Sync with device lost.");
00285 attempt_sync();
00286 }
00287
00289 void request_topics() {
00290
00291
00292 if (client_version != PROTOCOL_UNKNOWN) client_version_try = client_version;
00293
00294 std::vector<uint8_t> message(0);
00295 if (client_version_try == PROTOCOL_VER2) {
00296 ROS_DEBUG("Sending request topics message for VER2 protocol.");
00297 write_message(message, rosserial_msgs::TopicInfo::ID_PUBLISHER, PROTOCOL_VER2);
00298 client_version_try = PROTOCOL_VER1;
00299 } else if (client_version_try == PROTOCOL_VER1) {
00300 ROS_DEBUG("Sending request topics message for VER1 protocol.");
00301 write_message(message, rosserial_msgs::TopicInfo::ID_PUBLISHER, PROTOCOL_VER1);
00302 client_version_try = PROTOCOL_VER2;
00303 } else {
00304 client_version_try = PROTOCOL_VER2;
00305 }
00306
00307
00308
00309 require_check_timer_.expires_from_now(require_check_interval_);
00310 require_check_timer_.async_wait(boost::bind(&Session::required_topics_check, this,
00311 boost::asio::placeholders::error));
00312 }
00313
00314 void required_topics_check(const boost::system::error_code& error) {
00315 if (ros::param::has("~require")) {
00316 if (!check_set("~require/publishers", publishers_) ||
00317 !check_set("~require/subscribers", subscribers_)) {
00318 ROS_WARN("Connected client failed to establish the publishers and subscribers dictated by require parameter. Re-requesting topics.");
00319 request_topics();
00320 }
00321 }
00322 }
00323
00324 template<typename M>
00325 bool check_set(std::string param_name, M map) {
00326 XmlRpc::XmlRpcValue param_list;
00327 ros::param::get(param_name, param_list);
00328 ROS_ASSERT(param_list.getType() == XmlRpc::XmlRpcValue::TypeArray);
00329 for (int i = 0; i < param_list.size(); ++i) {
00330 ROS_ASSERT(param_list[i].getType() == XmlRpc::XmlRpcValue::TypeString);
00331 std::string required_topic((std::string(param_list[i])));
00332
00333 bool found = false;
00334 for (typename M::iterator j = map.begin(); j != map.end(); ++j) {
00335 if (nh_.resolveName(j->second->get_topic()) ==
00336 nh_.resolveName(required_topic)) {
00337 found = true;
00338 break;
00339 }
00340 }
00341 if (!found) return false;
00342 }
00343 return true;
00344 }
00345
00346 static uint8_t checksum(ros::serialization::IStream& stream) {
00347 uint8_t sum = 0;
00348 for (uint16_t i = 0; i < stream.getLength(); ++i) {
00349 sum += stream.getData()[i];
00350 }
00351 return sum;
00352 }
00353
00354 static uint8_t checksum(uint16_t val) {
00355 return (val >> 8) | val;
00356 }
00357
00359
00360 void setup_publisher(ros::serialization::IStream& stream) {
00361 rosserial_msgs::TopicInfo topic_info;
00362 ros::serialization::Serializer<rosserial_msgs::TopicInfo>::read(stream, topic_info);
00363
00364 PublisherPtr pub(new Publisher(nh_, topic_info));
00365 publishers_[topic_info.topic_id] = pub;
00366 callbacks_[topic_info.topic_id] = boost::bind(&Publisher::handle, pub, _1);
00367
00368 set_sync_timeout(timeout_interval_);
00369 }
00370
00371 void setup_subscriber(ros::serialization::IStream& stream) {
00372 rosserial_msgs::TopicInfo topic_info;
00373 ros::serialization::Serializer<rosserial_msgs::TopicInfo>::read(stream, topic_info);
00374
00375 SubscriberPtr sub(new Subscriber(nh_, topic_info,
00376 boost::bind(&Session::write_message, this, _1, topic_info.topic_id, client_version)));
00377 subscribers_[topic_info.topic_id] = sub;
00378
00379 set_sync_timeout(timeout_interval_);
00380 }
00381
00382 void handle_time(ros::serialization::IStream& stream) {
00383 std_msgs::Time time;
00384 time.data = ros::Time::now();
00385
00386 size_t length = ros::serialization::serializationLength(time);
00387 std::vector<uint8_t> message(length);
00388
00389 ros::serialization::OStream ostream(&message[0], length);
00390 ros::serialization::Serializer<std_msgs::Time>::write(ostream, time);
00391
00392 write_message(message, rosserial_msgs::TopicInfo::ID_TIME, client_version);
00393
00394
00395
00396 set_sync_timeout(timeout_interval_);
00397 }
00398
00399 Socket socket_;
00400 AsyncReadBuffer<Socket> async_read_buffer_;
00401 enum { buffer_max = 1023 };
00402 ros::NodeHandle nh_;
00403
00404 Session::Version client_version;
00405 Session::Version client_version_try;
00406 boost::posix_time::time_duration timeout_interval_;
00407 boost::posix_time::time_duration attempt_interval_;
00408 boost::posix_time::time_duration require_check_interval_;
00409 boost::asio::deadline_timer sync_timer_;
00410 boost::asio::deadline_timer require_check_timer_;
00411
00412 std::map< uint16_t, boost::function<void(ros::serialization::IStream)> > callbacks_;
00413 std::map<uint16_t, PublisherPtr> publishers_;
00414 std::map<uint16_t, SubscriberPtr> subscribers_;
00415 };
00416
00417