35 #ifndef ROSSERIAL_SERVER_SESSION_H 36 #define ROSSERIAL_SERVER_SESSION_H 39 #include <boost/bind.hpp> 40 #include <boost/asio.hpp> 41 #include <boost/function.hpp> 45 #include <rosserial_msgs/TopicInfo.h> 46 #include <rosserial_msgs/Log.h> 48 #include <std_msgs/Time.h> 59 template<
typename Socket>
63 Session(boost::asio::io_service& io_service)
71 boost::asio::placeholders::error))
88 boost::asio::placeholders::error));
100 callbacks_[rosserial_msgs::TopicInfo::ID_PUBLISHER]
102 callbacks_[rosserial_msgs::TopicInfo::ID_SUBSCRIBER]
104 callbacks_[rosserial_msgs::TopicInfo::ID_SERVICE_CLIENT+rosserial_msgs::TopicInfo::ID_PUBLISHER]
106 callbacks_[rosserial_msgs::TopicInfo::ID_SERVICE_CLIENT+rosserial_msgs::TopicInfo::ID_SUBSCRIBER]
110 callbacks_[rosserial_msgs::TopicInfo::ID_TIME]
177 boost::asio::placeholders::error));
213 uint16_t topic_id, length;
214 uint8_t length_checksum;
217 stream >> length >> length_checksum;
218 if (length_checksum +
checksum(length) != 0xff) {
220 ROS_WARN(
"Bad message header length checksum. Dropping message from client. T%d L%d C%d %d", topic_id, length, length_checksum, csl);
226 ROS_DEBUG(
"Received message header with length %d and topic_id=%d", length, topic_id);
234 ROS_DEBUG(
"Received body of length %d for message on topic %d.", stream.
getLength(), topic_id);
239 if (msg_checksum != 0xff) {
240 ROS_WARN(
"Rejecting message on topicId=%d, length=%d with bad checksum.", topic_id, stream.
getLength());
246 if (topic_id < 100) {
247 ROS_ERROR(
"Buffer overrun when attempting to parse setup message.");
250 ROS_WARN(
"Buffer overrun when attempting to parse user message.");
254 ROS_WARN(
"Received message with unrecognized topicId (%d).", topic_id);
264 if (error == boost::system::errc::no_buffer_space) {
266 ROS_WARN(
"Overrun on receive buffer. Attempting to regain rx sync.");
279 uint8_t overhead_bytes = 8;
280 uint16_t length = overhead_bytes + message.size();
281 BufferPtr buffer_ptr(
new Buffer(length));
283 uint8_t msg_checksum;
287 uint8_t msg_len_checksum = 255 -
checksum(message.size());
288 stream << (uint16_t)0xfeff << (uint16_t)message.size() << msg_len_checksum << topic_id;
291 memcpy(stream.advance(message.size()), &message[0], message.size());
292 stream << msg_checksum;
294 ROS_DEBUG_NAMED(
"async_write",
"Sending buffer of %d bytes to client.", length);
295 boost::asio::async_write(
socket_, boost::asio::buffer(*buffer_ptr),
300 BufferPtr buffer_ptr) {
302 if (error == boost::system::errc::io_error) {
304 }
else if (error == boost::system::errc::no_such_device) {
326 boost::asio::placeholders::error));
335 if (error != boost::asio::error::operation_aborted) {
343 std::vector<uint8_t> message(0);
344 ROS_DEBUG(
"Sending request topics message for VER2 protocol.");
345 write_message(message, rosserial_msgs::TopicInfo::ID_PUBLISHER);
351 boost::asio::placeholders::error));
355 if (error != boost::asio::error::operation_aborted) {
359 ROS_WARN(
"Connected client failed to establish the publishers and subscribers dictated by require parameter. Re-requesting topics.");
371 for (
int i = 0; i < param_list.
size(); ++i) {
373 std::string required_topic((std::string(param_list[i])));
376 for (
typename M::iterator j = map.begin(); j != map.end(); ++j) {
380 ROS_INFO_STREAM(
"Verified connection to topic " << required_topic <<
", given in parameter " << param_name);
385 ROS_WARN_STREAM(
"Missing connection to topic " << required_topic <<
", required by parameter " << param_name);
394 for (uint16_t i = 0; i < stream.
getLength(); ++i) {
401 return (val >> 8) + val;
407 rosserial_msgs::TopicInfo topic_info;
418 rosserial_msgs::TopicInfo topic_info;
435 rosserial_msgs::TopicInfo topic_info;
438 if (!
services_.count(topic_info.topic_name)) {
439 ROS_DEBUG(
"Creating service client for topic %s",topic_info.topic_name.c_str());
445 if (
services_[topic_info.topic_name]->getRequestMessageMD5() != topic_info.md5sum) {
446 ROS_WARN(
"Service client setup: Request message MD5 mismatch between rosserial client and ROS");
448 ROS_DEBUG(
"Service client %s: request message MD5 successfully validated as %s",
449 topic_info.topic_name.c_str(),topic_info.md5sum.c_str());
455 rosserial_msgs::TopicInfo topic_info;
458 if (!
services_.count(topic_info.topic_name)) {
459 ROS_DEBUG(
"Creating service client for topic %s",topic_info.topic_name.c_str());
466 services_[topic_info.topic_name]->setTopicId(topic_info.topic_id);
467 if (
services_[topic_info.topic_name]->getResponseMessageMD5() != topic_info.md5sum) {
468 ROS_WARN(
"Service client setup: Response message MD5 mismatch between rosserial client and ROS");
470 ROS_DEBUG(
"Service client %s: response message MD5 successfully validated as %s",
471 topic_info.topic_name.c_str(),topic_info.md5sum.c_str());
477 rosserial_msgs::Log l;
479 if(l.level == rosserial_msgs::Log::ROSDEBUG)
ROS_DEBUG(
"%s", l.msg.c_str());
480 else if(l.level == rosserial_msgs::Log::INFO)
ROS_INFO(
"%s", l.msg.c_str());
481 else if(l.level == rosserial_msgs::Log::WARN)
ROS_WARN(
"%s", l.msg.c_str());
482 else if(l.level == rosserial_msgs::Log::ERROR)
ROS_ERROR(
"%s", l.msg.c_str());
483 else if(l.level == rosserial_msgs::Log::FATAL)
ROS_FATAL(
"%s", l.msg.c_str());
491 std::vector<uint8_t> message(length);
521 std::map<uint16_t, boost::function<void(ros::serialization::IStream&)> >
callbacks_;
529 #endif // ROSSERIAL_SERVER_SESSION_H static void write(Stream &stream, typename boost::call_traits< T >::param_type t)
boost::asio::deadline_timer require_check_timer_
void set_require_param(std::string param_name)
void setup_subscriber(ros::serialization::IStream &stream)
#define ROS_WARN_STREAM_THROTTLE(period, args)
void write_message(Buffer &message, const uint16_t topic_id)
void read_body(ros::serialization::IStream &stream, uint16_t topic_id)
boost::asio::deadline_timer ros_spin_timer_
void ros_spin_timeout(const boost::system::error_code &error)
Helper object for successive reads from a ReadStream.
static uint8_t checksum(uint16_t val)
void read_sync_second(ros::serialization::IStream &stream)
std::vector< uint8_t > Buffer
bool check_set(std::string param_name, M map)
void set_sync_timeout(const boost::posix_time::time_duration &interval)
std::map< uint16_t, boost::function< void(ros::serialization::IStream &)> > callbacks_
static uint8_t checksum(ros::serialization::IStream &stream)
boost::posix_time::time_duration require_check_interval_
Session(boost::asio::io_service &io_service)
boost::posix_time::time_duration attempt_interval_
AsyncReadBuffer< Socket > async_read_buffer_
void read_failed(const boost::system::error_code &error)
boost::posix_time::time_duration timeout_interval_
boost::asio::io_service & io_service_
void setup_publisher(ros::serialization::IStream &stream)
std::string require_param_name_
std::map< uint16_t, PublisherPtr > publishers_
#define ROS_DEBUG_NAMED(name,...)
void handle_time(ros::serialization::IStream &stream)
Type const & getType() const
boost::asio::deadline_timer sync_timer_
#define ROS_WARN_THROTTLE(period,...)
ROSCPP_DECL bool get(const std::string &key, std::string &s)
#define ROS_ERROR_ONCE(...)
void setCallbackQueue(CallbackQueueInterface *queue)
Classes which manage the Publish and Subscribe relationships that a Session has with its client...
ros::CallbackQueue ros_callback_queue_
static void read(Stream &stream, typename boost::call_traits< T >::reference t)
void required_topics_check(const boost::system::error_code &error)
std::string resolveName(const std::string &name, bool remap=true) const
void setup_service_client_publisher(ros::serialization::IStream &stream)
std::map< uint16_t, SubscriberPtr > subscribers_
ROSCPP_DECL bool has(const std::string &key)
#define ROS_WARN_STREAM(args)
void read_sync_first(ros::serialization::IStream &stream)
void handle(ros::serialization::IStream stream)
void write_completion_cb(const boost::system::error_code &error, BufferPtr buffer_ptr)
void sync_timeout(const boost::system::error_code &error)
uint32_t serializationLength(const T &t)
#define ROS_INFO_STREAM(args)
void read(size_t requested_bytes, boost::function< void(ros::serialization::IStream &)> callback)
Commands a fixed number of bytes from the buffer. This may be fulfilled from existing buffer content...
boost::posix_time::time_duration ros_spin_interval_
boost::shared_ptr< Buffer > BufferPtr
void read_id_length(ros::serialization::IStream &stream)
void handle_log(ros::serialization::IStream &stream)
void handle(ros::serialization::IStream stream)
void setup_service_client_subscriber(ros::serialization::IStream &stream)
std::map< std::string, ServiceClientPtr > services_