client.cpp
Go to the documentation of this file.
1 #include <ros/ros.h>
3 #include <blob/shape_shifter.h>
4 
5 #include <topic_proxy/RequestMessage.h>
6 #include <topic_proxy/AddPublisher.h>
7 
8 namespace topic_proxy
9 {
10 
11 using blob::ShapeShifter;
12 
13 class Client : public TopicProxy
14 {
15 private:
19 
21  {
22  std::string local_topic;
23  std::string datatype;
24  std::string md5sum;
25  std::string message_definition;
26 
28  bool latch;
30  GetMessage::Request request;
31  };
33  std::map<std::string, SubscriptionInfoPtr> subscriptions_;
34 
36  {
37  std::string remote_topic;
39  bool latch;
40  bool compressed;
41  };
43  std::map<std::string, PublicationInfoPtr> publications_;
44 
45  std::string topic_prefix_;
46 
47 public:
48  Client(const std::string& ns)
49  : TopicProxy()
50  , nh_(ns)
51  {
52  init();
53  }
54 
55  Client(const std::string& host, uint32_t port, const std::string& ns = std::string())
56  : TopicProxy(host, port)
57  , nh_(ns)
58  {
59  init();
60  }
61 
62  bool init() {
63  if (connect()) {
64  if (!getHost().empty()) {
65  ROS_INFO("Connected to topic_proxy server at %s:%u", getHost().c_str(), getTCPPort());
66  } else {
67  ROS_INFO("Connected to topic_proxy server at local master %s", ros::master::getURI().c_str());
68  }
69  } else {
70  if (!getHost().empty()) {
71  ROS_WARN("Could not connect to topic_proxy server at %s:%u", getHost().c_str(), getTCPPort());
72  } else {
73  ROS_WARN("Could not connect to topic_proxy server at local master %s", ros::master::getURI().c_str());
74  }
75  return false;
76  }
77 
78  ros::param::get("~topic_prefix", topic_prefix_);
79  request_message_service_ = nh_.advertiseService("request_message", &Client::handleRequestMessage, this);
80  add_publisher_service_ = nh_.advertiseService("add_publisher", &Client::handleAddPublisher, this);
81 
82  XmlRpc::XmlRpcValue subscriptions;
83  ros::param::get("~subscriptions", subscriptions);
84  if (subscriptions.getType() == XmlRpc::XmlRpcValue::TypeArray) {
85  for(int i = 0; i < subscriptions.size(); ++i) {
86  XmlRpc::XmlRpcValue p = subscriptions[i];
87  std::string topic;
89  topic = static_cast<std::string>(p);
90  } else if (p.getType() == XmlRpc::XmlRpcValue::TypeStruct) {
91  if (p.hasMember("topic")) topic = static_cast<std::string>(p["topic"]);
92  }
93  if (topic.empty()) continue;
94 
95  SubscriptionInfoPtr subscription = getSubscription(topic);
96  subscription->local_topic = topic;
97  subscription->request.topic = topic;
98  ros::Duration interval;
100  if (p.hasMember("remote_topic")) subscription->request.topic = static_cast<std::string>(p["remote_topic"]);
101  if (p.hasMember("timeout")) subscription->request.timeout = ros::Duration(static_cast<double>(p["timeout"]));
102  if (p.hasMember("compressed")) subscription->request.compressed = static_cast<bool>(p["compressed"]);
103  if (p.hasMember("interval")) interval = ros::Duration(static_cast<double>(p["interval"]));
104  if (p.hasMember("latch")) subscription->latch = static_cast<bool>(p["latch"]);
105  }
106 
107  if (!interval.isZero()) {
108  subscription->timer = nh_.createTimer(interval, boost::bind(&Client::timerCallback, this, subscription, _1));
109  }
110  }
111  }
112 
113  XmlRpc::XmlRpcValue publications;
114  ros::param::get("~publications", publications);
115  if (publications.getType() == XmlRpc::XmlRpcValue::TypeArray) {
116  for(int i = 0; i < publications.size(); ++i) {
117  XmlRpc::XmlRpcValue p = publications[i];
118  std::string topic;
120  topic = static_cast<std::string>(p);
121  } else if (p.getType() == XmlRpc::XmlRpcValue::TypeStruct) {
122  if (p.hasMember("topic")) topic = static_cast<std::string>(p["topic"]);
123  }
124  if (topic.empty()) continue;
125 
126  AddPublisher::Request request;
127  AddPublisher::Response response;
128  request.topic = topic;
130  if (p.hasMember("remote_topic")) request.remote_topic = static_cast<std::string>(p["remote_topic"]);
131  if (p.hasMember("compressed")) request.compressed = static_cast<bool>(p["compressed"]);
132  if (p.hasMember("latch")) request.latch = static_cast<bool>(p["latch"]);
133  }
134 
135  handleAddPublisher(request, response);
136  }
137  }
138 
139  return true;
140  }
141 
143  {
146  }
147 
148  bool republish(GetMessage::Request& request, bool latch = false)
149  {
150  republish(request, request.topic, latch);
151  }
152 
153  bool republish(GetMessage::Request& request, const std::string& topic, bool latch = false)
154  {
155  MessageInstanceConstPtr instance = send(request);
156  if (!instance) {
157  ROS_ERROR("GetMessage request for topic %s failed", request.topic.c_str());
158  return false;
159  }
160 
161  // advertise locally
162  SubscriptionInfoPtr subscription = getSubscription(topic);
163  if (!subscription->publisher
164  && !instance->type.empty() && !instance->md5sum.empty()) {
165  if (subscription->local_topic.empty()) subscription->local_topic = topic;
166  std::string advertised_topic = nh_.resolveName(topic_prefix_ + subscription->local_topic);
167  if (!getHost().empty()) {
168  ROS_INFO("Advertising topic %s from host %s as %s", request.topic.c_str(), getHost().c_str(), advertised_topic.c_str());
169  } else {
170  ROS_INFO("Advertising topic %s as %s", request.topic.c_str(), advertised_topic.c_str());
171  }
172 
173  ros::AdvertiseOptions ops = ros::AdvertiseOptions(advertised_topic, 10, instance->md5sum, instance->type, instance->message_definition,
174  boost::bind(&Client::connectCallback, this, subscription, _1), boost::bind(&Client::disconnectCallback, this, subscription, _1));
175  ops.latch = latch;
176  subscription->publisher = nh_.advertise(ops);
177  }
178 
179  if (instance->blob.empty()) {
180  ROS_DEBUG("No message received on topic %s", request.topic.c_str());
181  return false;
182  }
183 
184  subscription->publisher.publish(
185  instance->blob.asMessage().morph(instance->md5sum, instance->type, instance->message_definition)
186  );
187  return true;
188  }
189 
190 protected:
191  const SubscriptionInfoPtr& getSubscription(const std::string& topic)
192  {
193  if (subscriptions_.count(topic)) return subscriptions_.at(topic);
194  SubscriptionInfoPtr subscription(new SubscriptionInfo());
195  return subscriptions_.insert(std::pair<std::string, SubscriptionInfoPtr>(topic, subscription)).first->second;
196  }
197 
198  const PublicationInfoPtr& getPublication(const std::string& topic)
199  {
200  if (publications_.count(topic)) return publications_.at(topic);
201  PublicationInfoPtr publication(new PublicationInfo());
202  return publications_.insert(std::pair<std::string, PublicationInfoPtr>(topic, publication)).first->second;
203  }
204 
205  void publishCallback(const PublicationInfoPtr& publication, const blob::ShapeShifterConstPtr& message)
206  {
207  PublishMessage::Request request;
208  request.message.topic = publication->remote_topic;
209  request.message.type = message->getDataType();
210  request.message.md5sum = message->getMD5Sum();
211  request.message.message_definition = message->getMessageDefinition();
212  request.message.blob = message->blob();
213  request.message.blob.setCompressed(publication->compressed);
214  request.latch = publication->latch;
215 
216  if (!send(request)) {
217  ROS_ERROR("PublishMessage request for topic %s failed", request.message.topic.c_str());
218  }
219  }
220 
221  void timerCallback(const SubscriptionInfoPtr& subscription, const ros::TimerEvent& event)
222  {
223  republish(subscription->request, subscription->local_topic, subscription->latch);
224  }
225 
226  bool handleRequestMessage(RequestMessage::Request& request, RequestMessage::Response& response)
227  {
228  SubscriptionInfoPtr subscription = getSubscription(request.topic);
229  subscription->request.topic = request.remote_topic;
230  subscription->request.compressed = request.compressed;
231  subscription->request.timeout = request.timeout;
232  subscription->latch = request.latch;
233  subscription->local_topic = request.topic;
234 
235  if (request.interval > ros::Duration()) {
236  // add new timer
237  subscription->timer = nh_.createTimer(request.interval, boost::bind(&Client::timerCallback, this, subscription, _1));
238  } else {
239  // stop otimer
240  subscription->timer.stop();
241  }
242 
243  // request once
244  if (request.interval.isZero()) {
245  return republish(subscription->request, request.topic, request.latch);
246  }
247 
248  return true;
249  }
250 
251  bool handleAddPublisher(AddPublisher::Request& request, AddPublisher::Response& response)
252  {
253  PublicationInfoPtr publication = getPublication(request.topic);
254 
255  if (!publication->subscriber) {
256  std::string subscribed_topic = nh_.resolveName(topic_prefix_ + request.topic);
257  if (!getHost().empty()) {
258  ROS_INFO("Subscribing to topic %s for host %s as %s", request.topic.c_str(), getHost().c_str(), subscribed_topic.c_str());
259  } else {
260  ROS_INFO("Subscribing to topic %s as %s", request.topic.c_str(), subscribed_topic.c_str());
261  }
262 
263  publication->subscriber = nh_.subscribe<blob::ShapeShifter>(subscribed_topic, 10, boost::bind(&Client::publishCallback, this, publication, _1));
264  publication->remote_topic = request.topic;
265  if (!request.remote_topic.empty()) publication->remote_topic = request.remote_topic;
266  publication->latch = request.latch;
267  publication->compressed = request.compressed;
268  }
269 
270  return true;
271  }
272 
274  {
275  for (std::map<std::string, SubscriptionInfoPtr>::iterator it = subscriptions_.begin(); it != subscriptions_.end(); ++it) {
276  it->second->publisher.shutdown();
277  }
278  subscriptions_.clear();
279  }
280 
282  {
283  for (std::map<std::string, PublicationInfoPtr>::iterator it = publications_.begin(); it != publications_.end(); ++it) {
284  it->second->subscriber.shutdown();
285  }
286  publications_.clear();
287  }
288 
289 protected:
290  void connectCallback(const SubscriptionInfoPtr&, const ros::SingleSubscriberPublisher&)
291  {
292  }
293 
294  void disconnectCallback(const SubscriptionInfoPtr&, const ros::SingleSubscriberPublisher&)
295  {
296  }
297 };
298 
299 }
300 
301 int main(int argc, char **argv)
302 {
303  ros::init(argc, argv, "topic_proxy_client");
304 
305  std::string host;
306  int port = 0;
307 
308  // get host/port from parameter server
309  ros::param::get("~host", host);
310  ros::param::get("~port", port);
311 
312  // get host/port from command line
313  if (argc > 1) host = argv[1];
314  if (argc > 2) port = boost::lexical_cast<uint32_t>(std::string(argv[2]));
315 
316  topic_proxy::Client client(host, port);
317  ros::spin();
318  return 0;
319 }
ROSCPP_DECL const std::string & getURI()
boost::shared_ptr< SubscriptionInfo > SubscriptionInfoPtr
Definition: client.cpp:32
void connectCallback(const SubscriptionInfoPtr &, const ros::SingleSubscriberPublisher &)
Definition: client.cpp:290
std::string topic_prefix_
Definition: client.cpp:45
void publishCallback(const PublicationInfoPtr &publication, const blob::ShapeShifterConstPtr &message)
Definition: client.cpp:205
Subscriber subscribe(const std::string &topic, uint32_t queue_size, void(T::*fp)(M), T *obj, const TransportHints &transport_hints=TransportHints())
int size() const
std::string resolveName(const std::string &name, bool remap=true) const
ROSCPP_DECL void init(int &argc, char **argv, const std::string &name, uint32_t options=0)
void clearSubscriptions()
Definition: client.cpp:273
int main(int argc, char **argv)
Definition: client.cpp:301
ServiceServer advertiseService(const std::string &service, bool(T::*srv_func)(MReq &, MRes &), T *obj)
Type const & getType() const
#define ROS_WARN(...)
std::map< std::string, PublicationInfoPtr > publications_
Definition: client.cpp:43
ROSCPP_DECL void spin(Spinner &spinner)
std::map< std::string, SubscriptionInfoPtr > subscriptions_
Definition: client.cpp:33
void disconnectCallback(const SubscriptionInfoPtr &, const ros::SingleSubscriberPublisher &)
Definition: client.cpp:294
Client(const std::string &ns)
Definition: client.cpp:48
ROSCPP_DECL bool get(const std::string &key, std::string &s)
Client(const std::string &host, uint32_t port, const std::string &ns=std::string())
Definition: client.cpp:55
#define ROS_INFO(...)
Timer createTimer(Rate r, Handler h, Obj o, bool oneshot=false, bool autostart=true) const
uint16_t getTCPPort() const
Definition: topic_proxy.h:59
Publisher advertise(const std::string &topic, uint32_t queue_size, bool latch=false)
bool handleRequestMessage(RequestMessage::Request &request, RequestMessage::Response &response)
Definition: client.cpp:226
bool handleAddPublisher(AddPublisher::Request &request, AddPublisher::Response &response)
Definition: client.cpp:251
const SubscriptionInfoPtr & getSubscription(const std::string &topic)
Definition: client.cpp:191
bool republish(GetMessage::Request &request, bool latch=false)
Definition: client.cpp:148
const std::string & getHost() const
Definition: topic_proxy.h:58
ros::ServiceServer request_message_service_
Definition: client.cpp:17
bool hasMember(const std::string &name) const
MessageInstanceConstPtr send(GetMessage::Request &)
Definition: topic_proxy.cpp:47
void clearPublications()
Definition: client.cpp:281
ros::NodeHandle nh_
Definition: client.cpp:16
bool republish(GetMessage::Request &request, const std::string &topic, bool latch=false)
Definition: client.cpp:153
const PublicationInfoPtr & getPublication(const std::string &topic)
Definition: client.cpp:198
#define ROS_ERROR(...)
ros::ServiceServer add_publisher_service_
Definition: client.cpp:18
void timerCallback(const SubscriptionInfoPtr &subscription, const ros::TimerEvent &event)
Definition: client.cpp:221
boost::shared_ptr< PublicationInfo > PublicationInfoPtr
Definition: client.cpp:42
#define ROS_DEBUG(...)


topic_proxy
Author(s): Johannes Meyer
autogenerated on Sat Jul 27 2019 03:35:25