35 #ifndef ROSSERIAL_SERVER_SESSION_H 36 #define ROSSERIAL_SERVER_SESSION_H 39 #include <boost/bind.hpp> 40 #include <boost/asio.hpp> 41 #include <boost/function.hpp> 45 #include <rosserial_msgs/TopicInfo.h> 46 #include <rosserial_msgs/Log.h> 48 #include <std_msgs/Time.h> 59 template<
typename Socket>
63 Session(boost::asio::io_service& io_service)
70 boost::asio::placeholders::error))
87 boost::asio::placeholders::error));
99 callbacks_[rosserial_msgs::TopicInfo::ID_PUBLISHER]
101 callbacks_[rosserial_msgs::TopicInfo::ID_SUBSCRIBER]
103 callbacks_[rosserial_msgs::TopicInfo::ID_SERVICE_CLIENT+rosserial_msgs::TopicInfo::ID_PUBLISHER]
105 callbacks_[rosserial_msgs::TopicInfo::ID_SERVICE_CLIENT+rosserial_msgs::TopicInfo::ID_SUBSCRIBER]
109 callbacks_[rosserial_msgs::TopicInfo::ID_TIME]
167 boost::asio::placeholders::error));
199 uint16_t topic_id, length;
200 uint8_t length_checksum;
203 stream >> length >> length_checksum;
204 if (length_checksum +
checksum(length) != 0xff) {
206 ROS_WARN(
"Bad message header length checksum. Dropping message from client. T%d L%d C%d %d", topic_id, length, length_checksum, csl);
212 ROS_DEBUG(
"Received message header with length %d and topic_id=%d", length, topic_id);
220 ROS_DEBUG(
"Received body of length %d for message on topic %d.", stream.
getLength(), topic_id);
225 if (msg_checksum != 0xff) {
226 ROS_WARN(
"Rejecting message on topicId=%d, length=%d with bad checksum.", topic_id, stream.
getLength());
232 if (topic_id < 100) {
233 ROS_ERROR(
"Buffer overrun when attempting to parse setup message.");
236 ROS_WARN(
"Buffer overrun when attempting to parse user message.");
240 ROS_WARN(
"Received message with unrecognized topicId (%d).", topic_id);
250 if (error == boost::system::errc::no_buffer_space) {
252 ROS_WARN(
"Overrun on receive buffer. Attempting to regain rx sync.");
265 uint8_t overhead_bytes = 8;
266 uint16_t length = overhead_bytes + message.size();
267 BufferPtr buffer_ptr(
new Buffer(length));
269 uint8_t msg_checksum;
273 uint8_t msg_len_checksum = 255 -
checksum(message.size());
274 stream << (uint16_t)0xfeff << (uint16_t)message.size() << msg_len_checksum << topic_id;
277 memcpy(stream.advance(message.size()), &message[0], message.size());
278 stream << msg_checksum;
280 ROS_DEBUG_NAMED(
"async_write",
"Sending buffer of %d bytes to client.", length);
281 boost::asio::async_write(
socket_, boost::asio::buffer(*buffer_ptr),
286 BufferPtr buffer_ptr) {
288 if (error == boost::system::errc::io_error) {
290 }
else if (error == boost::system::errc::no_such_device) {
312 boost::asio::placeholders::error));
317 if (error != boost::asio::error::operation_aborted) {
325 std::vector<uint8_t> message(0);
326 ROS_DEBUG(
"Sending request topics message for VER2 protocol.");
327 write_message(message, rosserial_msgs::TopicInfo::ID_PUBLISHER);
333 boost::asio::placeholders::error));
337 if (error != boost::asio::error::operation_aborted) {
341 ROS_WARN(
"Connected client failed to establish the publishers and subscribers dictated by require parameter. Re-requesting topics.");
353 for (
int i = 0; i < param_list.
size(); ++i) {
355 std::string required_topic((std::string(param_list[i])));
358 for (
typename M::iterator j = map.begin(); j != map.end(); ++j) {
362 ROS_INFO_STREAM(
"Verified connection to topic " << required_topic <<
", given in parameter " << param_name);
367 ROS_WARN_STREAM(
"Missing connection to topic " << required_topic <<
", required by parameter " << param_name);
376 for (uint16_t i = 0; i < stream.
getLength(); ++i) {
383 return (val >> 8) + val;
389 rosserial_msgs::TopicInfo topic_info;
400 rosserial_msgs::TopicInfo topic_info;
417 rosserial_msgs::TopicInfo topic_info;
420 if (!
services_.count(topic_info.topic_name)) {
421 ROS_DEBUG(
"Creating service client for topic %s",topic_info.topic_name.c_str());
427 if (
services_[topic_info.topic_name]->getRequestMessageMD5() != topic_info.md5sum) {
428 ROS_WARN(
"Service client setup: Request message MD5 mismatch between rosserial client and ROS");
430 ROS_DEBUG(
"Service client %s: request message MD5 successfully validated as %s",
431 topic_info.topic_name.c_str(),topic_info.md5sum.c_str());
437 rosserial_msgs::TopicInfo topic_info;
440 if (!
services_.count(topic_info.topic_name)) {
441 ROS_DEBUG(
"Creating service client for topic %s",topic_info.topic_name.c_str());
448 services_[topic_info.topic_name]->setTopicId(topic_info.topic_id);
449 if (
services_[topic_info.topic_name]->getResponseMessageMD5() != topic_info.md5sum) {
450 ROS_WARN(
"Service client setup: Response message MD5 mismatch between rosserial client and ROS");
452 ROS_DEBUG(
"Service client %s: response message MD5 successfully validated as %s",
453 topic_info.topic_name.c_str(),topic_info.md5sum.c_str());
459 rosserial_msgs::Log l;
461 if(l.level == rosserial_msgs::Log::ROSDEBUG)
ROS_DEBUG(
"%s", l.msg.c_str());
462 else if(l.level == rosserial_msgs::Log::INFO)
ROS_INFO(
"%s", l.msg.c_str());
463 else if(l.level == rosserial_msgs::Log::WARN)
ROS_WARN(
"%s", l.msg.c_str());
464 else if(l.level == rosserial_msgs::Log::ERROR)
ROS_ERROR(
"%s", l.msg.c_str());
465 else if(l.level == rosserial_msgs::Log::FATAL)
ROS_FATAL(
"%s", l.msg.c_str());
473 std::vector<uint8_t> message(length);
502 std::map<uint16_t, boost::function<void(ros::serialization::IStream&)> >
callbacks_;
510 #endif // ROSSERIAL_SERVER_SESSION_H static void write(Stream &stream, typename boost::call_traits< T >::param_type t)
boost::asio::deadline_timer require_check_timer_
void set_require_param(std::string param_name)
void setup_subscriber(ros::serialization::IStream &stream)
#define ROS_WARN_THROTTLE(rate,...)
void write_message(Buffer &message, const uint16_t topic_id)
void read_body(ros::serialization::IStream &stream, uint16_t topic_id)
boost::asio::deadline_timer ros_spin_timer_
void ros_spin_timeout(const boost::system::error_code &error)
Helper object for successive reads from a ReadStream.
static uint8_t checksum(uint16_t val)
void read_sync_second(ros::serialization::IStream &stream)
std::string resolveName(const std::string &name, bool remap=true) const
std::vector< uint8_t > Buffer
bool check_set(std::string param_name, M map)
void set_sync_timeout(const boost::posix_time::time_duration &interval)
std::map< uint16_t, boost::function< void(ros::serialization::IStream &)> > callbacks_
static uint8_t checksum(ros::serialization::IStream &stream)
boost::posix_time::time_duration require_check_interval_
Session(boost::asio::io_service &io_service)
boost::posix_time::time_duration attempt_interval_
Type const & getType() const
AsyncReadBuffer< Socket > async_read_buffer_
void read_failed(const boost::system::error_code &error)
boost::posix_time::time_duration timeout_interval_
void setup_publisher(ros::serialization::IStream &stream)
std::string require_param_name_
std::map< uint16_t, PublisherPtr > publishers_
#define ROS_DEBUG_NAMED(name,...)
void handle_time(ros::serialization::IStream &stream)
boost::asio::deadline_timer sync_timer_
ROSCPP_DECL bool get(const std::string &key, std::string &s)
#define ROS_ERROR_ONCE(...)
void setCallbackQueue(CallbackQueueInterface *queue)
Classes which manage the Publish and Subscribe relationships that a Session has with its client...
ros::CallbackQueue ros_callback_queue_
static void read(Stream &stream, typename boost::call_traits< T >::reference t)
void required_topics_check(const boost::system::error_code &error)
void setup_service_client_publisher(ros::serialization::IStream &stream)
std::map< uint16_t, SubscriberPtr > subscribers_
ROSCPP_DECL bool has(const std::string &key)
#define ROS_WARN_STREAM(args)
void read_sync_first(ros::serialization::IStream &stream)
void handle(ros::serialization::IStream stream)
#define ROS_WARN_STREAM_THROTTLE(rate, args)
void write_completion_cb(const boost::system::error_code &error, BufferPtr buffer_ptr)
void sync_timeout(const boost::system::error_code &error)
uint32_t serializationLength(const T &t)
#define ROS_INFO_STREAM(args)
void read(size_t requested_bytes, boost::function< void(ros::serialization::IStream &)> callback)
Commands a fixed number of bytes from the buffer. This may be fulfilled from existing buffer content...
boost::posix_time::time_duration ros_spin_interval_
boost::shared_ptr< Buffer > BufferPtr
void read_id_length(ros::serialization::IStream &stream)
void handle_log(ros::serialization::IStream &stream)
void handle(ros::serialization::IStream stream)
void setup_service_client_subscriber(ros::serialization::IStream &stream)
std::map< std::string, ServiceClientPtr > services_