00001
00035 #ifndef ROSSERIAL_SERVER_SESSION_H
00036 #define ROSSERIAL_SERVER_SESSION_H
00037
00038 #include <map>
00039 #include <boost/bind.hpp>
00040 #include <boost/asio.hpp>
00041 #include <boost/function.hpp>
00042
00043 #include <ros/callback_queue.h>
00044 #include <ros/ros.h>
00045 #include <rosserial_msgs/TopicInfo.h>
00046 #include <rosserial_msgs/Log.h>
00047 #include <topic_tools/shape_shifter.h>
00048 #include <std_msgs/Time.h>
00049
00050 #include "rosserial_server/async_read_buffer.h"
00051 #include "rosserial_server/topic_handlers.h"
00052
00053 namespace rosserial_server
00054 {
00055
00056 typedef std::vector<uint8_t> Buffer;
00057 typedef boost::shared_ptr<Buffer> BufferPtr;
00058
00059 template<typename Socket>
00060 class Session : boost::noncopyable
00061 {
00062 public:
00063 Session(boost::asio::io_service& io_service)
00064 : socket_(io_service),
00065 sync_timer_(io_service),
00066 require_check_timer_(io_service),
00067 ros_spin_timer_(io_service),
00068 async_read_buffer_(socket_, buffer_max,
00069 boost::bind(&Session::read_failed, this,
00070 boost::asio::placeholders::error))
00071 {
00072 active_ = false;
00073
00074 timeout_interval_ = boost::posix_time::milliseconds(5000);
00075 attempt_interval_ = boost::posix_time::milliseconds(1000);
00076 require_check_interval_ = boost::posix_time::milliseconds(1000);
00077 ros_spin_interval_ = boost::posix_time::milliseconds(10);
00078 require_param_name_ = "~require";
00079
00080 nh_.setCallbackQueue(&ros_callback_queue_);
00081
00082
00083
00084
00085 ros_spin_timer_.expires_from_now(ros_spin_interval_);
00086 ros_spin_timer_.async_wait(boost::bind(&Session::ros_spin_timeout, this,
00087 boost::asio::placeholders::error));
00088 }
00089
00090 Socket& socket()
00091 {
00092 return socket_;
00093 }
00094
00095 void start()
00096 {
00097 ROS_DEBUG("Starting session.");
00098
00099 callbacks_[rosserial_msgs::TopicInfo::ID_PUBLISHER]
00100 = boost::bind(&Session::setup_publisher, this, _1);
00101 callbacks_[rosserial_msgs::TopicInfo::ID_SUBSCRIBER]
00102 = boost::bind(&Session::setup_subscriber, this, _1);
00103 callbacks_[rosserial_msgs::TopicInfo::ID_SERVICE_CLIENT+rosserial_msgs::TopicInfo::ID_PUBLISHER]
00104 = boost::bind(&Session::setup_service_client_publisher, this, _1);
00105 callbacks_[rosserial_msgs::TopicInfo::ID_SERVICE_CLIENT+rosserial_msgs::TopicInfo::ID_SUBSCRIBER]
00106 = boost::bind(&Session::setup_service_client_subscriber, this, _1);
00107 callbacks_[rosserial_msgs::TopicInfo::ID_LOG]
00108 = boost::bind(&Session::handle_log, this, _1);
00109 callbacks_[rosserial_msgs::TopicInfo::ID_TIME]
00110 = boost::bind(&Session::handle_time, this, _1);
00111
00112 active_ = true;
00113 attempt_sync();
00114 read_sync_header();
00115 }
00116
00117 void stop()
00118 {
00119
00120 ros_callback_queue_.clear();
00121
00122
00123 sync_timer_.cancel();
00124 require_check_timer_.cancel();
00125
00126
00127
00128 callbacks_.clear();
00129 subscribers_.clear();
00130 publishers_.clear();
00131 services_.clear();
00132
00133
00134 socket_.close();
00135 active_ = false;
00136 }
00137
00138 bool is_active()
00139 {
00140 return active_;
00141 }
00142
00149 void set_require_param(std::string param_name)
00150 {
00151 require_param_name_ = param_name;
00152 }
00153
00154 private:
00159 void ros_spin_timeout(const boost::system::error_code& error) {
00160 ros_callback_queue_.callAvailable();
00161
00162 if (ros::ok())
00163 {
00164
00165 ros_spin_timer_.expires_from_now(ros_spin_interval_);
00166 ros_spin_timer_.async_wait(boost::bind(&Session::ros_spin_timeout, this,
00167 boost::asio::placeholders::error));
00168 }
00169 }
00170
00172
00173
00174 void read_sync_header() {
00175 async_read_buffer_.read(1, boost::bind(&Session::read_sync_first, this, _1));
00176 }
00177
00178 void read_sync_first(ros::serialization::IStream& stream) {
00179 uint8_t sync;
00180 stream >> sync;
00181 if (sync == 0xff) {
00182 async_read_buffer_.read(1, boost::bind(&Session::read_sync_second, this, _1));
00183 } else {
00184 read_sync_header();
00185 }
00186 }
00187
00188 void read_sync_second(ros::serialization::IStream& stream) {
00189 uint8_t sync;
00190 stream >> sync;
00191 if (sync == 0xfe) {
00192 async_read_buffer_.read(5, boost::bind(&Session::read_id_length, this, _1));
00193 } else {
00194 read_sync_header();
00195 }
00196 }
00197
00198 void read_id_length(ros::serialization::IStream& stream) {
00199 uint16_t topic_id, length;
00200 uint8_t length_checksum;
00201
00202
00203 stream >> length >> length_checksum;
00204 if (length_checksum + checksum(length) != 0xff) {
00205 uint8_t csl = checksum(length);
00206 ROS_WARN("Bad message header length checksum. Dropping message from client. T%d L%d C%d %d", topic_id, length, length_checksum, csl);
00207 read_sync_header();
00208 return;
00209 } else {
00210 stream >> topic_id;
00211 }
00212 ROS_DEBUG("Received message header with length %d and topic_id=%d", length, topic_id);
00213
00214
00215 async_read_buffer_.read(length + 1, boost::bind(&Session::read_body, this,
00216 _1, topic_id));
00217 }
00218
00219 void read_body(ros::serialization::IStream& stream, uint16_t topic_id) {
00220 ROS_DEBUG("Received body of length %d for message on topic %d.", stream.getLength(), topic_id);
00221
00222 ros::serialization::IStream checksum_stream(stream.getData(), stream.getLength());
00223 uint8_t msg_checksum = checksum(checksum_stream) + checksum(topic_id);
00224
00225 if (msg_checksum != 0xff) {
00226 ROS_WARN("Rejecting message on topicId=%d, length=%d with bad checksum.", topic_id, stream.getLength());
00227 } else {
00228 if (callbacks_.count(topic_id) == 1) {
00229 try {
00230 callbacks_[topic_id](stream);
00231 } catch(ros::serialization::StreamOverrunException e) {
00232 if (topic_id < 100) {
00233 ROS_ERROR("Buffer overrun when attempting to parse setup message.");
00234 ROS_ERROR_ONCE("Is this firmware from a pre-Groovy rosserial?");
00235 } else {
00236 ROS_WARN("Buffer overrun when attempting to parse user message.");
00237 }
00238 }
00239 } else {
00240 ROS_WARN("Received message with unrecognized topicId (%d).", topic_id);
00241
00242 }
00243 }
00244
00245
00246 read_sync_header();
00247 }
00248
00249 void read_failed(const boost::system::error_code& error) {
00250 if (error == boost::system::errc::no_buffer_space) {
00251
00252 ROS_WARN("Overrun on receive buffer. Attempting to regain rx sync.");
00253 read_sync_header();
00254 } else if (error) {
00255
00256
00257 ROS_WARN_STREAM("Socket asio error, closing socket: " << error);
00258 stop();
00259 }
00260 }
00261
00263
00264 void write_message(Buffer& message, const uint16_t topic_id) {
00265 uint8_t overhead_bytes = 8;
00266 uint16_t length = overhead_bytes + message.size();
00267 BufferPtr buffer_ptr(new Buffer(length));
00268
00269 uint8_t msg_checksum;
00270 ros::serialization::IStream checksum_stream(message.size() > 0 ? &message[0] : NULL, message.size());
00271
00272 ros::serialization::OStream stream(&buffer_ptr->at(0), buffer_ptr->size());
00273 uint8_t msg_len_checksum = 255 - checksum(message.size());
00274 stream << (uint16_t)0xfeff << (uint16_t)message.size() << msg_len_checksum << topic_id;
00275 msg_checksum = 255 - (checksum(checksum_stream) + checksum(topic_id));
00276
00277 memcpy(stream.advance(message.size()), &message[0], message.size());
00278 stream << msg_checksum;
00279
00280 ROS_DEBUG_NAMED("async_write", "Sending buffer of %d bytes to client.", length);
00281 boost::asio::async_write(socket_, boost::asio::buffer(*buffer_ptr),
00282 boost::bind(&Session::write_completion_cb, this, boost::asio::placeholders::error, buffer_ptr));
00283 }
00284
00285 void write_completion_cb(const boost::system::error_code& error,
00286 BufferPtr buffer_ptr) {
00287 if (error) {
00288 if (error == boost::system::errc::io_error) {
00289 ROS_WARN_THROTTLE(1, "Socket write operation returned IO error.");
00290 } else if (error == boost::system::errc::no_such_device) {
00291 ROS_WARN_THROTTLE(1, "Socket write operation returned no device.");
00292 } else {
00293 ROS_WARN_STREAM_THROTTLE(1, "Unknown error returned during write operation: " << error);
00294 }
00295 stop();
00296 }
00297
00298 }
00299
00301 void attempt_sync() {
00302 request_topics();
00303 set_sync_timeout(attempt_interval_);
00304 }
00305
00306 void set_sync_timeout(const boost::posix_time::time_duration& interval) {
00307 if (ros::ok())
00308 {
00309 sync_timer_.cancel();
00310 sync_timer_.expires_from_now(interval);
00311 sync_timer_.async_wait(boost::bind(&Session::sync_timeout, this,
00312 boost::asio::placeholders::error));
00313 }
00314 }
00315
00316 void sync_timeout(const boost::system::error_code& error) {
00317 if (error != boost::asio::error::operation_aborted) {
00318 ROS_DEBUG("Sync with device lost.");
00319 stop();
00320 }
00321 }
00322
00324 void request_topics() {
00325 std::vector<uint8_t> message(0);
00326 ROS_DEBUG("Sending request topics message for VER2 protocol.");
00327 write_message(message, rosserial_msgs::TopicInfo::ID_PUBLISHER);
00328
00329
00330
00331 require_check_timer_.expires_from_now(require_check_interval_);
00332 require_check_timer_.async_wait(boost::bind(&Session::required_topics_check, this,
00333 boost::asio::placeholders::error));
00334 }
00335
00336 void required_topics_check(const boost::system::error_code& error) {
00337 if (error != boost::asio::error::operation_aborted) {
00338 if (ros::param::has(require_param_name_)) {
00339 if (!check_set(require_param_name_ + "/publishers", publishers_) ||
00340 !check_set(require_param_name_ + "/subscribers", subscribers_)) {
00341 ROS_WARN("Connected client failed to establish the publishers and subscribers dictated by require parameter. Re-requesting topics.");
00342 request_topics();
00343 }
00344 }
00345 }
00346 }
00347
00348 template<typename M>
00349 bool check_set(std::string param_name, M map) {
00350 XmlRpc::XmlRpcValue param_list;
00351 ros::param::get(param_name, param_list);
00352 ROS_ASSERT(param_list.getType() == XmlRpc::XmlRpcValue::TypeArray);
00353 for (int i = 0; i < param_list.size(); ++i) {
00354 ROS_ASSERT(param_list[i].getType() == XmlRpc::XmlRpcValue::TypeString);
00355 std::string required_topic((std::string(param_list[i])));
00356
00357 bool found = false;
00358 for (typename M::iterator j = map.begin(); j != map.end(); ++j) {
00359 if (nh_.resolveName(j->second->get_topic()) ==
00360 nh_.resolveName(required_topic)) {
00361 found = true;
00362 ROS_INFO_STREAM("Verified connection to topic " << required_topic << ", given in parameter " << param_name);
00363 break;
00364 }
00365 }
00366 if (!found) {
00367 ROS_WARN_STREAM("Missing connection to topic " << required_topic << ", required by parameter " << param_name);
00368 return false;
00369 }
00370 }
00371 return true;
00372 }
00373
00374 static uint8_t checksum(ros::serialization::IStream& stream) {
00375 uint8_t sum = 0;
00376 for (uint16_t i = 0; i < stream.getLength(); ++i) {
00377 sum += stream.getData()[i];
00378 }
00379 return sum;
00380 }
00381
00382 static uint8_t checksum(uint16_t val) {
00383 return (val >> 8) + val;
00384 }
00385
00387
00388 void setup_publisher(ros::serialization::IStream& stream) {
00389 rosserial_msgs::TopicInfo topic_info;
00390 ros::serialization::Serializer<rosserial_msgs::TopicInfo>::read(stream, topic_info);
00391
00392 PublisherPtr pub(new Publisher(nh_, topic_info));
00393 callbacks_[topic_info.topic_id] = boost::bind(&Publisher::handle, pub, _1);
00394 publishers_[topic_info.topic_id] = pub;
00395
00396 set_sync_timeout(timeout_interval_);
00397 }
00398
00399 void setup_subscriber(ros::serialization::IStream& stream) {
00400 rosserial_msgs::TopicInfo topic_info;
00401 ros::serialization::Serializer<rosserial_msgs::TopicInfo>::read(stream, topic_info);
00402
00403 SubscriberPtr sub(new Subscriber(nh_, topic_info,
00404 boost::bind(&Session::write_message, this, _1, topic_info.topic_id)));
00405 subscribers_[topic_info.topic_id] = sub;
00406
00407 set_sync_timeout(timeout_interval_);
00408 }
00409
00410
00411
00412
00413
00414
00415
00416 void setup_service_client_publisher(ros::serialization::IStream& stream) {
00417 rosserial_msgs::TopicInfo topic_info;
00418 ros::serialization::Serializer<rosserial_msgs::TopicInfo>::read(stream, topic_info);
00419
00420 if (!services_.count(topic_info.topic_name)) {
00421 ROS_DEBUG("Creating service client for topic %s",topic_info.topic_name.c_str());
00422 ServiceClientPtr srv(new ServiceClient(
00423 nh_,topic_info,boost::bind(&Session::write_message, this, _1, _2)));
00424 services_[topic_info.topic_name] = srv;
00425 callbacks_[topic_info.topic_id] = boost::bind(&ServiceClient::handle, srv, _1);
00426 }
00427 if (services_[topic_info.topic_name]->getRequestMessageMD5() != topic_info.md5sum) {
00428 ROS_WARN("Service client setup: Request message MD5 mismatch between rosserial client and ROS");
00429 } else {
00430 ROS_DEBUG("Service client %s: request message MD5 successfully validated as %s",
00431 topic_info.topic_name.c_str(),topic_info.md5sum.c_str());
00432 }
00433 set_sync_timeout(timeout_interval_);
00434 }
00435
00436 void setup_service_client_subscriber(ros::serialization::IStream& stream) {
00437 rosserial_msgs::TopicInfo topic_info;
00438 ros::serialization::Serializer<rosserial_msgs::TopicInfo>::read(stream, topic_info);
00439
00440 if (!services_.count(topic_info.topic_name)) {
00441 ROS_DEBUG("Creating service client for topic %s",topic_info.topic_name.c_str());
00442 ServiceClientPtr srv(new ServiceClient(
00443 nh_,topic_info,boost::bind(&Session::write_message, this, _1, _2)));
00444 services_[topic_info.topic_name] = srv;
00445 callbacks_[topic_info.topic_id] = boost::bind(&ServiceClient::handle, srv, _1);
00446 }
00447
00448 services_[topic_info.topic_name]->setTopicId(topic_info.topic_id);
00449 if (services_[topic_info.topic_name]->getResponseMessageMD5() != topic_info.md5sum) {
00450 ROS_WARN("Service client setup: Response message MD5 mismatch between rosserial client and ROS");
00451 } else {
00452 ROS_DEBUG("Service client %s: response message MD5 successfully validated as %s",
00453 topic_info.topic_name.c_str(),topic_info.md5sum.c_str());
00454 }
00455 set_sync_timeout(timeout_interval_);
00456 }
00457
00458 void handle_log(ros::serialization::IStream& stream) {
00459 rosserial_msgs::Log l;
00460 ros::serialization::Serializer<rosserial_msgs::Log>::read(stream, l);
00461 if(l.level == rosserial_msgs::Log::ROSDEBUG) ROS_DEBUG("%s", l.msg.c_str());
00462 else if(l.level == rosserial_msgs::Log::INFO) ROS_INFO("%s", l.msg.c_str());
00463 else if(l.level == rosserial_msgs::Log::WARN) ROS_WARN("%s", l.msg.c_str());
00464 else if(l.level == rosserial_msgs::Log::ERROR) ROS_ERROR("%s", l.msg.c_str());
00465 else if(l.level == rosserial_msgs::Log::FATAL) ROS_FATAL("%s", l.msg.c_str());
00466 }
00467
00468 void handle_time(ros::serialization::IStream& stream) {
00469 std_msgs::Time time;
00470 time.data = ros::Time::now();
00471
00472 size_t length = ros::serialization::serializationLength(time);
00473 std::vector<uint8_t> message(length);
00474
00475 ros::serialization::OStream ostream(&message[0], length);
00476 ros::serialization::Serializer<std_msgs::Time>::write(ostream, time);
00477
00478 write_message(message, rosserial_msgs::TopicInfo::ID_TIME);
00479
00480
00481
00482 set_sync_timeout(timeout_interval_);
00483 }
00484
00485 Socket socket_;
00486 AsyncReadBuffer<Socket> async_read_buffer_;
00487 enum { buffer_max = 1023 };
00488 bool active_;
00489
00490 ros::NodeHandle nh_;
00491 ros::CallbackQueue ros_callback_queue_;
00492
00493 boost::posix_time::time_duration timeout_interval_;
00494 boost::posix_time::time_duration attempt_interval_;
00495 boost::posix_time::time_duration require_check_interval_;
00496 boost::posix_time::time_duration ros_spin_interval_;
00497 boost::asio::deadline_timer sync_timer_;
00498 boost::asio::deadline_timer require_check_timer_;
00499 boost::asio::deadline_timer ros_spin_timer_;
00500 std::string require_param_name_;
00501
00502 std::map<uint16_t, boost::function<void(ros::serialization::IStream&)> > callbacks_;
00503 std::map<uint16_t, PublisherPtr> publishers_;
00504 std::map<uint16_t, SubscriberPtr> subscribers_;
00505 std::map<std::string, ServiceClientPtr> services_;
00506 };
00507
00508 }
00509
00510 #endif // ROSSERIAL_SERVER_SESSION_H