00001
00035 #ifndef ROSSERIAL_SERVER_SESSION_H
00036 #define ROSSERIAL_SERVER_SESSION_H
00037
00038 #include <map>
00039 #include <boost/bind.hpp>
00040 #include <boost/asio.hpp>
00041 #include <boost/function.hpp>
00042
00043 #include <ros/ros.h>
00044 #include <rosserial_msgs/TopicInfo.h>
00045 #include <topic_tools/shape_shifter.h>
00046 #include <std_msgs/Time.h>
00047
00048 #include "rosserial_server/async_read_buffer.h"
00049 #include "rosserial_server/topic_handlers.h"
00050
00051 namespace rosserial_server
00052 {
00053
00054 typedef std::vector<uint8_t> Buffer;
00055 typedef boost::shared_ptr<Buffer> BufferPtr;
00056
00057 template<typename Socket>
00058 class Session
00059 {
00060 public:
00061 Session(boost::asio::io_service& io_service)
00062 : socket_(io_service), client_version(PROTOCOL_UNKNOWN),
00063 client_version_try(PROTOCOL_VER2),
00064 timeout_interval_(boost::posix_time::milliseconds(5000)),
00065 attempt_interval_(boost::posix_time::milliseconds(1000)),
00066 require_check_interval_(boost::posix_time::milliseconds(1000)),
00067 sync_timer_(io_service),
00068 require_check_timer_(io_service),
00069 async_read_buffer_(socket_, buffer_max,
00070 boost::bind(&Session::read_failed, this,
00071 boost::asio::placeholders::error))
00072 {
00073 callbacks_[rosserial_msgs::TopicInfo::ID_PUBLISHER]
00074 = boost::bind(&Session::setup_publisher, this, _1);
00075 callbacks_[rosserial_msgs::TopicInfo::ID_SUBSCRIBER]
00076 = boost::bind(&Session::setup_subscriber, this, _1);
00077 callbacks_[rosserial_msgs::TopicInfo::ID_SERVICE_CLIENT+rosserial_msgs::TopicInfo::ID_PUBLISHER]
00078 = boost::bind(&Session::setup_service_client_publisher, this, _1);
00079 callbacks_[rosserial_msgs::TopicInfo::ID_SERVICE_CLIENT+rosserial_msgs::TopicInfo::ID_SUBSCRIBER]
00080 = boost::bind(&Session::setup_service_client_subscriber, this, _1);
00081 callbacks_[rosserial_msgs::TopicInfo::ID_SERVICE_SERVER+rosserial_msgs::TopicInfo::ID_PUBLISHER]
00082 = boost::bind(&Session::setup_service_server_publisher, this, _1);
00083 callbacks_[rosserial_msgs::TopicInfo::ID_SERVICE_SERVER+rosserial_msgs::TopicInfo::ID_SUBSCRIBER]
00084 = boost::bind(&Session::setup_service_server_subscriber, this, _1);
00085 callbacks_[rosserial_msgs::TopicInfo::ID_TIME]
00086 = boost::bind(&Session::handle_time, this, _1);
00087 }
00088
00089 virtual ~Session()
00090 {
00091 abort_servers();
00092 ROS_INFO("Ending session.");
00093 }
00094
00095 Socket& socket()
00096 {
00097 return socket_;
00098 }
00099
00100 void start()
00101 {
00102 ROS_INFO("Starting session.");
00103
00104 attempt_sync();
00105 read_sync_header();
00106 }
00107
00108 enum Version {
00109 PROTOCOL_UNKNOWN = 0,
00110 PROTOCOL_VER1 = 1,
00111 PROTOCOL_VER2 = 2,
00112 PROTOCOL_MAX
00113 };
00114
00115 private:
00117
00118
00119 void read_sync_header() {
00120 async_read_buffer_.read(1, boost::bind(&Session::read_sync_first, this, _1));
00121 }
00122
00123 void read_sync_first(ros::serialization::IStream& stream) {
00124 uint8_t sync;
00125 stream >> sync;
00126 if (sync == 0xff) {
00127 async_read_buffer_.read(1, boost::bind(&Session::read_sync_second, this, _1));
00128 } else {
00129 read_sync_header();
00130 }
00131 }
00132
00133 void read_sync_second(ros::serialization::IStream& stream) {
00134 uint8_t sync;
00135 stream >> sync;
00136 if (client_version == PROTOCOL_UNKNOWN) {
00137 if (sync == 0xff) {
00138 ROS_WARN("Attached client is using protocol VER1 (groovy)");
00139 client_version = PROTOCOL_VER1;
00140 } else if (sync == 0xfe) {
00141 ROS_INFO("Attached client is using protocol VER2 (hydro)");
00142 client_version = PROTOCOL_VER2;
00143 }
00144 }
00145 if (sync == 0xff && client_version == PROTOCOL_VER1) {
00146 async_read_buffer_.read(4, boost::bind(&Session::read_id_length, this, _1));
00147 } else if (sync == 0xfe && client_version == PROTOCOL_VER2) {
00148 async_read_buffer_.read(5, boost::bind(&Session::read_id_length, this, _1));
00149 } else {
00150 read_sync_header();
00151 }
00152 }
00153
00154 void read_id_length(ros::serialization::IStream& stream) {
00155 uint16_t topic_id, length;
00156 uint8_t length_checksum;
00157 if (client_version == PROTOCOL_VER2) {
00158
00159 stream >> length >> length_checksum;
00160 if (length_checksum + checksum(length) != 0xff) {
00161 uint8_t csl = checksum(length);
00162 ROS_WARN("Bad message header length checksum. Dropping message from client. T%d L%d C%d %d", topic_id, length, length_checksum, csl);
00163 read_sync_header();
00164 return;
00165 } else {
00166 stream >> topic_id;
00167 }
00168 } else if (client_version == PROTOCOL_VER1) {
00169
00170 stream >> topic_id >> length;
00171 }
00172 ROS_DEBUG("Received message header with length %d and topic_id=%d", length, topic_id);
00173
00174
00175 async_read_buffer_.read(length + 1, boost::bind(&Session::read_body, this,
00176 _1, topic_id));
00177 }
00178
00179 void read_body(ros::serialization::IStream& stream, uint16_t topic_id) {
00180 ROS_DEBUG("Received body of length %d for message on topic %d.", stream.getLength(), topic_id);
00181
00182 ros::serialization::IStream checksum_stream(stream.getData(), stream.getLength());
00183
00184 uint8_t msg_checksum = checksum(checksum_stream) + checksum(topic_id);
00185 if (client_version == PROTOCOL_VER1) {
00186 msg_checksum += checksum(stream.getLength() - 1);
00187 }
00188
00189 if (msg_checksum != 0xff) {
00190 ROS_WARN("Rejecting message on topicId=%d, length=%d with bad checksum.", topic_id, stream.getLength());
00191 } else {
00192 if (callbacks_.count(topic_id) == 1) {
00193 try {
00194 callbacks_[topic_id](stream);
00195 } catch(ros::serialization::StreamOverrunException e) {
00196 if (topic_id < 100) {
00197 ROS_ERROR("Buffer overrun when attempting to parse setup message.");
00198 ROS_ERROR_ONCE("Is this firmware from a pre-Groovy rosserial?");
00199 } else {
00200 ROS_WARN("Buffer overrun when attempting to parse user message.");
00201 }
00202 }
00203 } else {
00204 ROS_WARN("Received message with unrecognized topicId (%d).", topic_id);
00205
00206 }
00207 }
00208
00209
00210 read_sync_header();
00211 }
00212
00213 void read_failed(const boost::system::error_code& error) {
00214 if (error == boost::system::errc::no_buffer_space) {
00215
00216 ROS_WARN("Overrun on receive buffer. Attempting to regain rx sync.");
00217 read_sync_header();
00218 } else if (error) {
00219
00220
00221 socket_.cancel();
00222 ROS_DEBUG_STREAM("Socket asio error: " << error);
00223 ROS_WARN("Stopping session due to read error.");
00224 delete this;
00225 }
00226 }
00227
00229
00230 void write_message(Buffer& message,
00231 const uint16_t topic_id,
00232 Session::Version version) {
00233 uint8_t overhead_bytes = 0;
00234 switch(version) {
00235 case PROTOCOL_VER2: overhead_bytes = 8; break;
00236 case PROTOCOL_VER1: overhead_bytes = 7; break;
00237 default:
00238 ROS_WARN("Aborting write_message: protocol unspecified.");
00239 }
00240
00241 uint16_t length = overhead_bytes + message.size();
00242 BufferPtr buffer_ptr(new Buffer(length));
00243
00244 uint8_t msg_checksum;
00245 ros::serialization::IStream checksum_stream(message.size() > 0 ? &message[0] : NULL, message.size());
00246
00247 ros::serialization::OStream stream(&buffer_ptr->at(0), buffer_ptr->size());
00248 if (version == PROTOCOL_VER2) {
00249 uint8_t msg_len_checksum = 255 - checksum(message.size());
00250 stream << (uint16_t)0xfeff << (uint16_t)message.size() << msg_len_checksum << topic_id;
00251 msg_checksum = 255 - (checksum(checksum_stream) + checksum(topic_id));
00252 } else if (version == PROTOCOL_VER1) {
00253 stream << (uint16_t)0xffff << topic_id << (uint16_t)message.size();
00254 msg_checksum = 255 - (checksum(checksum_stream) + checksum(topic_id) + checksum(message.size()));
00255 }
00256 memcpy(stream.advance(message.size()), &message[0], message.size());
00257 stream << msg_checksum;
00258
00259
00260
00261 socket_.get_io_service().dispatch(
00262 boost::bind(&Session::write_buffer, this, buffer_ptr));
00263 }
00264
00265
00266
00267 void write_buffer(BufferPtr buffer_ptr) {
00268 boost::asio::async_write(socket_, boost::asio::buffer(*buffer_ptr),
00269 boost::bind(&Session::write_cb, this, boost::asio::placeholders::error, buffer_ptr));
00270 }
00271
00272 void write_cb(const boost::system::error_code& error,
00273 BufferPtr buffer_ptr) {
00274 if (error) {
00275 if (error == boost::system::errc::io_error) {
00276 ROS_WARN_THROTTLE(1, "Socket write operation returned IO error.");
00277 } else if (error == boost::system::errc::no_such_device) {
00278 ROS_WARN_THROTTLE(1, "Socket write operation returned no device.");
00279 } else {
00280 socket_.cancel();
00281 ROS_WARN_STREAM_THROTTLE(1, "Unknown error returned during write operation: " << error);
00282 ROS_WARN("Destroying session.");
00283 delete this;
00284 }
00285 }
00286
00287 }
00288
00290 void attempt_sync() {
00291 request_topics();
00292 set_sync_timeout(attempt_interval_);
00293 }
00294
00295 void set_sync_timeout(const boost::posix_time::time_duration& interval) {
00296 sync_timer_.cancel();
00297 sync_timer_.expires_from_now(interval);
00298 sync_timer_.async_wait(boost::bind(&Session::sync_timeout, this,
00299 boost::asio::placeholders::error));
00300 }
00301
00302 void abort_servers() {
00303 for(std::map<std::string, ServiceServerPtr>::iterator it = service_servers_.begin(); it != service_servers_.end(); ++it)
00304 it->second->abort();
00305 }
00306
00307 void sync_timeout(const boost::system::error_code& error) {
00308 if (error == boost::asio::error::operation_aborted) {
00309 return;
00310 }
00311 ROS_WARN("Sync with device lost.");
00312 abort_servers();
00313 attempt_sync();
00314 }
00315
00317 void request_topics() {
00318
00319
00320 if (client_version != PROTOCOL_UNKNOWN) client_version_try = client_version;
00321
00322 std::vector<uint8_t> message(0);
00323 if (client_version_try == PROTOCOL_VER2) {
00324 ROS_DEBUG("Sending request topics message for VER2 protocol.");
00325 write_message(message, rosserial_msgs::TopicInfo::ID_PUBLISHER, PROTOCOL_VER2);
00326 client_version_try = PROTOCOL_VER1;
00327 } else if (client_version_try == PROTOCOL_VER1) {
00328 ROS_DEBUG("Sending request topics message for VER1 protocol.");
00329 write_message(message, rosserial_msgs::TopicInfo::ID_PUBLISHER, PROTOCOL_VER1);
00330 client_version_try = PROTOCOL_VER2;
00331 } else {
00332 client_version_try = PROTOCOL_VER2;
00333 }
00334
00335
00336
00337 require_check_timer_.expires_from_now(require_check_interval_);
00338 require_check_timer_.async_wait(boost::bind(&Session::required_topics_check, this,
00339 boost::asio::placeholders::error));
00340 }
00341
00342 void required_topics_check(const boost::system::error_code& error) {
00343 if (ros::param::has("~require")) {
00344 if (!check_set("~require/publishers", publishers_) ||
00345 !check_set("~require/subscribers", subscribers_) ||
00346 !check_set("~require/service_clients", service_clients_) ||
00347 !check_set("~require/service_servers", service_servers_)) {
00348 ROS_WARN("Connected client failed to establish the interfaces dictated by require parameter. Re-requesting.");
00349 request_topics();
00350 }
00351 }
00352 }
00353
00354 template<typename M>
00355 bool check_set(std::string param_name, M map) {
00356 XmlRpc::XmlRpcValue param_list;
00357 if(!ros::param::get(param_name, param_list)) return true;
00358 ROS_ASSERT(param_list.getType() == XmlRpc::XmlRpcValue::TypeArray);
00359 for (int i = 0; i < param_list.size(); ++i) {
00360 ROS_ASSERT(param_list[i].getType() == XmlRpc::XmlRpcValue::TypeString);
00361 std::string required_name((std::string(param_list[i])));
00362
00363 bool found = false;
00364 for (typename M::iterator j = map.begin(); j != map.end(); ++j) {
00365 if (nh_.resolveName(j->second->get_name()) ==
00366 nh_.resolveName(required_name)) {
00367 found = true;
00368 break;
00369 }
00370 }
00371 if (!found) return false;
00372 }
00373 return true;
00374 }
00375
00376 static uint8_t checksum(ros::serialization::IStream& stream) {
00377 uint8_t sum = 0;
00378 for (uint16_t i = 0; i < stream.getLength(); ++i) {
00379 sum += stream.getData()[i];
00380 }
00381 return sum;
00382 }
00383
00384 static uint8_t checksum(uint16_t val) {
00385 return (val >> 8) + val;
00386 }
00387
00389
00390 void setup_publisher(ros::serialization::IStream& stream) {
00391 rosserial_msgs::TopicInfo topic_info;
00392 ros::serialization::Serializer<rosserial_msgs::TopicInfo>::read(stream, topic_info);
00393
00394 PublisherPtr pub(new Publisher(nh_, topic_info));
00395 publishers_[topic_info.topic_id] = pub;
00396 callbacks_[topic_info.topic_id] = boost::bind(&Publisher::handle, pub, _1);
00397
00398 set_sync_timeout(timeout_interval_);
00399 }
00400
00401 void setup_subscriber(ros::serialization::IStream& stream) {
00402 rosserial_msgs::TopicInfo topic_info;
00403 ros::serialization::Serializer<rosserial_msgs::TopicInfo>::read(stream, topic_info);
00404
00405 SubscriberPtr sub(new Subscriber(nh_, topic_info,
00406 boost::bind(&Session::write_message, this, _1, topic_info.topic_id, client_version)));
00407 subscribers_[topic_info.topic_id] = sub;
00408
00409 set_sync_timeout(timeout_interval_);
00410 }
00411
00412
00413
00414
00415
00416
00417
00418 void setup_service_client_publisher(ros::serialization::IStream& stream) {
00419 rosserial_msgs::TopicInfo topic_info;
00420 ros::serialization::Serializer<rosserial_msgs::TopicInfo>::read(stream, topic_info);
00421
00422 if (!service_clients_.count(topic_info.topic_name)) {
00423 ROS_DEBUG("Creating service client for topic %s",topic_info.topic_name.c_str());
00424 ServiceClientPtr srv(new ServiceClient(
00425 nh_,topic_info,boost::bind(&Session::write_message, this, _1, _2, client_version)));
00426 service_clients_[topic_info.topic_name] = srv;
00427 callbacks_[topic_info.topic_id] = boost::bind(&ServiceClient::handle, srv, _1);
00428 }
00429 if (service_clients_[topic_info.topic_name]->getRequestMessageMD5() != topic_info.md5sum) {
00430 ROS_WARN("Service client setup: Request message MD5 mismatch between rosserial client and ROS");
00431 } else {
00432 ROS_DEBUG("Service client %s: request message MD5 successfully validated as %s",
00433 topic_info.topic_name.c_str(),topic_info.md5sum.c_str());
00434 }
00435 set_sync_timeout(timeout_interval_);
00436 }
00437
00438 void setup_service_client_subscriber(ros::serialization::IStream& stream) {
00439 rosserial_msgs::TopicInfo topic_info;
00440 ros::serialization::Serializer<rosserial_msgs::TopicInfo>::read(stream, topic_info);
00441
00442 if (!service_clients_.count(topic_info.topic_name)) {
00443 ROS_DEBUG("Creating service client for topic %s",topic_info.topic_name.c_str());
00444 ServiceClientPtr srv(new ServiceClient(
00445 nh_,topic_info,boost::bind(&Session::write_message, this, _1, _2, client_version)));
00446 service_clients_[topic_info.topic_name] = srv;
00447 callbacks_[topic_info.topic_id] = boost::bind(&ServiceClient::handle, srv, _1);
00448 }
00449
00450 service_clients_[topic_info.topic_name]->setTopicId(topic_info.topic_id);
00451 if (service_clients_[topic_info.topic_name]->getResponseMessageMD5() != topic_info.md5sum) {
00452 ROS_WARN("Service client setup: Response message MD5 mismatch between rosserial client and ROS");
00453 } else {
00454 ROS_DEBUG("Service client %s: response message MD5 successfully validated as %s",
00455 topic_info.topic_name.c_str(),topic_info.md5sum.c_str());
00456 }
00457 set_sync_timeout(timeout_interval_);
00458 }
00459
00460 void setup_service_server_publisher(ros::serialization::IStream& stream) {
00461 rosserial_msgs::TopicInfo topic_info;
00462 ros::serialization::Serializer<rosserial_msgs::TopicInfo>::read(stream, topic_info);
00463
00464 if (!service_servers_.count(topic_info.topic_name)) {
00465 ROS_DEBUG("Creating service server for topic %s",topic_info.topic_name.c_str());
00466 ServiceServerPtr srv(new ServiceServer(
00467 nh_,topic_info,boost::bind(&Session::write_message, this, _1, _2, client_version)));
00468 service_servers_[topic_info.topic_name] = srv;
00469 callbacks_[topic_info.topic_id] = boost::bind(&ServiceServer::handle, srv, _1);
00470 }
00471 if (service_servers_[topic_info.topic_name]->getResponseMessageMD5() != topic_info.md5sum) {
00472 ROS_WARN("Service client setup: Response message MD5 mismatch between rosserial client and ROS");
00473 } else {
00474 ROS_DEBUG("Service client %s: request message MD5 successfully validated as %s",
00475 topic_info.topic_name.c_str(),topic_info.md5sum.c_str());
00476 }
00477 set_sync_timeout(timeout_interval_);
00478 }
00479
00480 void setup_service_server_subscriber(ros::serialization::IStream& stream) {
00481 rosserial_msgs::TopicInfo topic_info;
00482 ros::serialization::Serializer<rosserial_msgs::TopicInfo>::read(stream, topic_info);
00483
00484 if (!service_servers_.count(topic_info.topic_name)) {
00485 ROS_DEBUG("Creating service client for topic %s",topic_info.topic_name.c_str());
00486 ServiceServerPtr srv(new ServiceServer(
00487 nh_,topic_info,boost::bind(&Session::write_message, this, _1, _2, client_version)));
00488 service_servers_[topic_info.topic_name] = srv;
00489 callbacks_[topic_info.topic_id] = boost::bind(&ServiceServer::handle, srv, _1);
00490 }
00491 service_servers_[topic_info.topic_name]->setTopicId(topic_info.topic_id);
00492 if (service_servers_[topic_info.topic_name]->getRequestMessageMD5() != topic_info.md5sum) {
00493 ROS_WARN("Service client setup: Request message MD5 mismatch between rosserial client and ROS");
00494 } else {
00495 ROS_DEBUG("Service client %s: request message MD5 successfully validated as %s",
00496 topic_info.topic_name.c_str(),topic_info.md5sum.c_str());
00497 }
00498 set_sync_timeout(timeout_interval_);
00499 }
00500
00501
00502 void handle_time(ros::serialization::IStream& stream) {
00503 std_msgs::Time time;
00504 time.data = ros::Time::now();
00505
00506 size_t length = ros::serialization::serializationLength(time);
00507 std::vector<uint8_t> message(length);
00508
00509 ros::serialization::OStream ostream(&message[0], length);
00510 ros::serialization::Serializer<std_msgs::Time>::write(ostream, time);
00511
00512 write_message(message, rosserial_msgs::TopicInfo::ID_TIME, client_version);
00513
00514
00515
00516 set_sync_timeout(timeout_interval_);
00517 }
00518
00519 Socket socket_;
00520 AsyncReadBuffer<Socket> async_read_buffer_;
00521 enum { buffer_max = 1023 };
00522 ros::NodeHandle nh_;
00523
00524 Session::Version client_version;
00525 Session::Version client_version_try;
00526 boost::posix_time::time_duration timeout_interval_;
00527 boost::posix_time::time_duration attempt_interval_;
00528 boost::posix_time::time_duration require_check_interval_;
00529 boost::asio::deadline_timer sync_timer_;
00530 boost::asio::deadline_timer require_check_timer_;
00531
00532 std::map< uint16_t, boost::function<void(ros::serialization::IStream)> > callbacks_;
00533 std::map<uint16_t, PublisherPtr> publishers_;
00534 std::map<uint16_t, SubscriberPtr> subscribers_;
00535 std::map<std::string, ServiceClientPtr> service_clients_;
00536 std::map<std::string, ServiceServerPtr> service_servers_;
00537 };
00538
00539 }
00540
00541 #endif // ROSSERIAL_SERVER_SESSION_H