topic_handlers.h
Go to the documentation of this file.
1 
35 #ifndef ROSSERIAL_SERVER_TOPIC_HANDLERS_H
36 #define ROSSERIAL_SERVER_TOPIC_HANDLERS_H
37 
38 #include <ros/ros.h>
39 #include <rosserial_msgs/TopicInfo.h>
40 #include <rosserial_msgs/RequestMessageInfo.h>
41 #include <rosserial_msgs/RequestServiceInfo.h>
43 
44 namespace rosserial_server
45 {
46 
47 class Publisher {
48 public:
49  Publisher(ros::NodeHandle& nh, const rosserial_msgs::TopicInfo& topic_info) {
50  if (!message_service_.isValid()) {
51  // lazy-initialize the service caller.
52  message_service_ = nh.serviceClient<rosserial_msgs::RequestMessageInfo>("message_info");
54  ROS_WARN("Timed out waiting for message_info service to become available.");
55  }
56  }
57 
58  rosserial_msgs::RequestMessageInfo info;
59  info.request.type = topic_info.message_type;
60  if (message_service_.call(info)) {
61  if (info.response.md5 != topic_info.md5sum) {
62  ROS_WARN_STREAM("Message" << topic_info.message_type << "MD5 sum from client does not match that in system. Will avoid using system's message definition.");
63  info.response.definition = "";
64  }
65  } else {
66  ROS_WARN("Failed to call message_info service. Proceeding without full message definition.");
67  }
68 
69  message_.morph(topic_info.md5sum, topic_info.message_type, info.response.definition, "false");
70  publisher_ = message_.advertise(nh, topic_info.topic_name, 1);
71  }
72 
76  }
77 
78  std::string get_topic() {
79  return publisher_.getTopic();
80  }
81 
82 private:
85 
87 };
88 
91 
92 
93 class Subscriber {
94 public:
95  Subscriber(ros::NodeHandle& nh, rosserial_msgs::TopicInfo& topic_info,
96  boost::function<void(std::vector<uint8_t>& buffer)> write_fn)
97  : write_fn_(write_fn) {
100  topic_info.topic_name, 1, boost::bind(&Subscriber::handle, this, _1));
101  opts.md5sum = topic_info.md5sum;
102  opts.datatype = topic_info.message_type;
103  subscriber_ = nh.subscribe(opts);
104  }
105 
106  std::string get_topic() {
107  return subscriber_.getTopic();
108  }
109 
110 private:
112  size_t length = ros::serialization::serializationLength(*msg);
113  std::vector<uint8_t> buffer(length);
114 
115  ros::serialization::OStream ostream(&buffer[0], length);
117 
118  write_fn_(buffer);
119  }
120 
122  boost::function<void(std::vector<uint8_t>& buffer)> write_fn_;
123 };
124 
126 
128 public:
129  ServiceClient(ros::NodeHandle& nh, rosserial_msgs::TopicInfo& topic_info,
130  boost::function<void(std::vector<uint8_t>& buffer, const uint16_t topic_id)> write_fn)
131  : write_fn_(write_fn) {
132  topic_id_ = -1;
133  if (!service_info_service_.isValid()) {
134  // lazy-initialize the service caller.
135  service_info_service_ = nh.serviceClient<rosserial_msgs::RequestServiceInfo>("service_info");
136  if (!service_info_service_.waitForExistence(ros::Duration(5.0))) {
137  ROS_WARN("Timed out waiting for service_info service to become available.");
138  }
139  }
140 
141  rosserial_msgs::RequestServiceInfo info;
142  info.request.service = topic_info.message_type;
143  ROS_DEBUG("Calling service_info service for topic name %s",topic_info.topic_name.c_str());
144  if (service_info_service_.call(info)) {
145  request_message_md5_ = info.response.request_md5;
146  response_message_md5_ = info.response.response_md5;
147  } else {
148  ROS_WARN("Failed to call service_info service. The service client will be created with blank md5sum.");
149  }
151  opts.service = topic_info.topic_name;
152  opts.md5sum = service_md5_ = info.response.service_md5;
153  opts.persistent = false; // always false for now
154  service_client_ = nh.serviceClient(opts);
155  }
156  void setTopicId(uint16_t topic_id) {
157  topic_id_ = topic_id;
158  }
159  std::string getRequestMessageMD5() {
160  return request_message_md5_;
161  }
162  std::string getResponseMessageMD5() {
163  return response_message_md5_;
164  }
165 
167  // deserialize request message
169 
170  // perform service call
171  // note that at present, at least for rosserial-windows a service call returns nothing,
172  // so we discard the return value of this call() invocation.
173  service_client_.call(request_message_, response_message_, service_md5_);
174 
175  // write service response over the wire
176  size_t length = ros::serialization::serializationLength(response_message_);
177  std::vector<uint8_t> buffer(length);
178  ros::serialization::OStream ostream(&buffer[0], length);
180  write_fn_(buffer,topic_id_);
181  }
182 
183 private:
188  boost::function<void(std::vector<uint8_t>& buffer, const uint16_t topic_id)> write_fn_;
189  std::string service_md5_;
190  std::string request_message_md5_;
192  uint16_t topic_id_;
193 };
194 
197 
198 } // namespace
199 
200 #endif // ROSSERIAL_SERVER_TOPIC_HANDLERS_H
ServiceClient serviceClient(const std::string &service_name, bool persistent=false, const M_string &header_values=M_string())
Subscriber(ros::NodeHandle &nh, rosserial_msgs::TopicInfo &topic_info, boost::function< void(std::vector< uint8_t > &buffer)> write_fn)
static void write(Stream &stream, typename boost::call_traits< T >::param_type t)
void publish(const boost::shared_ptr< M > &message) const
Subscriber subscribe(const std::string &topic, uint32_t queue_size, void(T::*fp)(M), T *obj, const TransportHints &transport_hints=TransportHints())
void handle(const boost::shared_ptr< topic_tools::ShapeShifter const > &msg)
void morph(const std::string &md5sum, const std::string &datatype, const std::string &msg_def, const std::string &latching)
boost::shared_ptr< Publisher > PublisherPtr
bool call(MReq &req, MRes &res)
ServiceClient(ros::NodeHandle &nh, rosserial_msgs::TopicInfo &topic_info, boost::function< void(std::vector< uint8_t > &buffer, const uint16_t topic_id)> write_fn)
#define ROS_WARN(...)
ros::ServiceClient service_client_
boost::shared_ptr< Subscriber > SubscriberPtr
Publisher(ros::NodeHandle &nh, const rosserial_msgs::TopicInfo &topic_info)
bool waitForExistence(ros::Duration timeout=ros::Duration(-1))
topic_tools::ShapeShifter request_message_
topic_tools::ShapeShifter response_message_
static void read(Stream &stream, typename boost::call_traits< T >::reference t)
static ros::ServiceClient message_service_
void setTopicId(uint16_t topic_id)
bool isValid() const
#define ROS_WARN_STREAM(args)
boost::shared_ptr< ServiceClient > ServiceClientPtr
void handle(ros::serialization::IStream stream)
topic_tools::ShapeShifter message_
static ros::ServiceClient service_info_service_
uint32_t serializationLength(const T &t)
void init(const std::string &_topic, uint32_t _queue_size, const boost::function< void(const boost::shared_ptr< M const > &)> &_callback, const boost::function< boost::shared_ptr< M >(void)> &factory_fn=DefaultMessageCreator< M >())
std::string getTopic() const
void handle(ros::serialization::IStream stream)
ros::Publisher advertise(ros::NodeHandle &nh, const std::string &topic, uint32_t queue_size_, bool latch=false, const ros::SubscriberStatusCallback &connect_cb=ros::SubscriberStatusCallback()) const
#define ROS_DEBUG(...)


rosserial_server
Author(s): Mike Purvis
autogenerated on Mon Jun 10 2019 14:53:36