server.cpp
Go to the documentation of this file.
2 
3 #include <ros/ros.h>
4 #include <ros/callback_queue.h>
5 #include <ros/serialization.h>
6 
7 #include <ros/network.h>
10 
11 #include <blob/shape_shifter.h>
12 
13 namespace topic_proxy
14 {
15 
16 using blob::ShapeShifter;
17 
18 class Server
19 {
20 private:
24 
26 
28  std::string topic;
31  MessageEvent event;
33  };
35  std::map<std::string, SubscriptionInfoPtr> subscriptions_;
36 
37  struct PublicationInfo {
38  std::string topic;
40  };
42  std::map<std::string, PublicationInfoPtr> publications_;
43 
44 public:
46  {
47  get_message_server_ = nh_.advertiseService(g_get_message_service, &Server::handleGetMessage, this);
48  publish_message_server_ = nh_.advertiseService(g_publish_message_service, &Server::handlePublishMessage, this);
49  }
50 
52  {
55  }
56 
57  const std::string& getHost() const
58  {
59  return ros::network::getHost();
60  }
61 
62  uint16_t getTCPPort() const
63  {
64  return ros::ConnectionManager::instance()->getTCPPort();
65  }
66 
67 protected:
68  const SubscriptionInfoPtr& getSubscription(const std::string& topic)
69  {
70  if (subscriptions_.count(topic)) return subscriptions_.at(topic);
71  SubscriptionInfoPtr subscription(new SubscriptionInfo());
72  return subscriptions_.insert(std::pair<std::string, SubscriptionInfoPtr>(topic, subscription)).first->second;
73  }
74 
75  const PublicationInfoPtr& getPublication(const std::string& topic)
76  {
77  if (publications_.count(topic)) return publications_.at(topic);
78  PublicationInfoPtr publication(new PublicationInfo());
79  return publications_.insert(std::pair<std::string, PublicationInfoPtr>(topic, publication)).first->second;
80  }
81 
82  bool handleGetMessage(GetMessage::Request& request, GetMessage::Response& response)
83  {
84  SubscriptionInfoPtr subscription = getSubscription(request.topic);
85 
86  if (!subscription->subscriber) {
87  ROS_INFO("Subscribing to topic %s", request.topic.c_str());
88  ros::SubscribeOptions ops = ros::SubscribeOptions::create<ShapeShifter>(request.topic, 1, boost::bind(&Server::subscriberCallback, this, subscription, _1), ros::VoidConstPtr(), &(subscription->callback_queue));
89  subscription->subscriber = nh_.subscribe(ops);
90  subscription->topic = subscription->subscriber.getTopic();
91  }
92 
93  // wait for exactly one callback and reset event pointer
94  ros::WallDuration timeout(request.timeout.sec, request.timeout.nsec);
95  if (timeout > ros::WallDuration()) {
96  // clear callback queue and ignore all messages received if a timeout was specified
97  subscription->callback_queue.clear();
98  }
99  subscription->event = MessageEvent();
100  subscription->callback_queue.callOne(timeout);
101 
102  ShapeShifter::ConstPtr instance;
103  try {
104  instance = subscription->event.getConstMessage();
105 
106  } catch(ros::Exception& e) {
107  ROS_ERROR("Catched exception while handling a request for topic %s: %s", request.topic.c_str(), e.what());
108  return false;
109  }
110 
111  // any message message has been received?
112  if (instance) {
113  // fill response
114  response.message.topic = subscription->topic;
115  response.message.md5sum = instance->getMD5Sum();
116  response.message.type = instance->getDataType();
117  response.message.message_definition = instance->getMessageDefinition();
118  // response.message.latching = subscription->event.getConnectionHeader()["latching"];
119  response.message.blob = instance->blob();
120  response.message.blob.setCompressed(request.compressed);
121 
122  subscription->last_message = instance;
123 
124  } else {
125  // fill response from last message (but without data)
126  response.message.topic = subscription->topic;
127  if (subscription->last_message) {
128  response.message.md5sum = subscription->last_message->getMD5Sum();
129  response.message.type = subscription->last_message->getDataType();
130  response.message.message_definition = subscription->last_message->getMessageDefinition();
131  }
132  }
133 
134  return true;
135  }
136 
137  void subscriberCallback(const SubscriptionInfoPtr& subscription, const MessageEvent& event)
138  {
139  subscription->event = event;
140  }
141 
142  bool handlePublishMessage(PublishMessage::Request& request, PublishMessage::Response& response)
143  {
144  PublicationInfoPtr publication = getPublication(request.message.topic);
145 
146  if (!publication->publisher) {
147  ROS_INFO("Publishing topic %s (%s)", request.message.topic.c_str(), request.message.type.c_str());
148  ros::AdvertiseOptions ops(request.message.topic, 10, request.message.md5sum, request.message.type, request.message.message_definition,
149  boost::bind(&Server::connectCallback, this, publication, _1), boost::bind(&Server::disconnectCallback, this, publication, _1));
150  ops.latch = request.latch;
151  publication->publisher = nh_.advertise(ops);
152  publication->topic = publication->publisher.getTopic();
153  }
154 
155  publication->publisher.publish(
156  request.message.blob.asMessage().morph(request.message.md5sum, request.message.type, request.message.message_definition)
157  );
158  return true;
159  }
160 
162  for (std::map<std::string, SubscriptionInfoPtr>::iterator it = subscriptions_.begin(); it != subscriptions_.end(); ++it) {
163  it->second->subscriber.shutdown();
164  }
165  subscriptions_.clear();
166  }
167 
169  for (std::map<std::string, PublicationInfoPtr>::iterator it = publications_.begin(); it != publications_.end(); ++it) {
170  it->second->publisher.shutdown();
171  }
172  publications_.clear();
173  }
174 
175 protected:
176  void connectCallback(const PublicationInfoPtr&, const ros::SingleSubscriberPublisher&)
177  {}
178 
179  void disconnectCallback(const PublicationInfoPtr&, const ros::SingleSubscriberPublisher&)
180  {}
181 };
182 
183 } // namespace topic_proxy
184 
185 int main(int argc, char **argv)
186 {
187  // add __tcpros_server_port to remappings
188  std::string tcpros_server_port_argv = "__tcpros_server_port:=" + boost::lexical_cast<std::string>(topic_proxy::g_default_port);
189  std::vector<char *> new_argv;
190  new_argv.reserve(argc + 1);
191  new_argv.assign(&argv[0], &argv[argc]);
192  new_argv.push_back(const_cast<char *>(tcpros_server_port_argv.c_str()));
193  argc = new_argv.size();
194  argv = new_argv.data();
195 
196  ros::init(argc, argv, "topic_proxy_server");
197  topic_proxy::Server server;
198  ROS_INFO("Created topic_proxy server listening on %s:%u", server.getHost().c_str(), server.getTCPPort());
199  ros::spin();
200  return 0;
201 }
ros::CallbackQueue callback_queue
Definition: server.cpp:30
ros::ServiceServer get_message_server_
Definition: server.cpp:22
boost::shared_ptr< void const > VoidConstPtr
ros::NodeHandle nh_
Definition: server.cpp:21
Subscriber subscribe(const std::string &topic, uint32_t queue_size, void(T::*fp)(M), T *obj, const TransportHints &transport_hints=TransportHints())
int main(int argc, char **argv)
Definition: server.cpp:185
ShapeShifter::ConstPtr last_message
Definition: server.cpp:32
void disconnectCallback(const PublicationInfoPtr &, const ros::SingleSubscriberPublisher &)
Definition: server.cpp:179
ros::MessageEvent< const ShapeShifter > MessageEvent
Definition: server.cpp:25
ROSCPP_DECL void init(int &argc, char **argv, const std::string &name, uint32_t options=0)
boost::shared_ptr< SubscriptionInfo > SubscriptionInfoPtr
Definition: server.cpp:34
void clearSubscriptions()
Definition: server.cpp:161
ServiceServer advertiseService(const std::string &service, bool(T::*srv_func)(MReq &, MRes &), T *obj)
bool handlePublishMessage(PublishMessage::Request &request, PublishMessage::Response &response)
Definition: server.cpp:142
uint16_t getTCPPort() const
Definition: server.cpp:62
ROSCPP_DECL void spin(Spinner &spinner)
const uint32_t g_default_port
Definition: topic_proxy.cpp:18
void subscriberCallback(const SubscriptionInfoPtr &subscription, const MessageEvent &event)
Definition: server.cpp:137
std::map< std::string, PublicationInfoPtr > publications_
Definition: server.cpp:42
ROSCPP_DECL const std::string & getHost()
static const ConnectionManagerPtr & instance()
const std::string g_get_message_service
Definition: topic_proxy.cpp:16
#define ROS_INFO(...)
const std::string g_publish_message_service
Definition: topic_proxy.cpp:17
ros::ServiceServer publish_message_server_
Definition: server.cpp:23
Publisher advertise(const std::string &topic, uint32_t queue_size, bool latch=false)
const std::string & getHost() const
Definition: server.cpp:57
const PublicationInfoPtr & getPublication(const std::string &topic)
Definition: server.cpp:75
void clearPublications()
Definition: server.cpp:168
boost::shared_ptr< PublicationInfo > PublicationInfoPtr
Definition: server.cpp:41
std::map< std::string, SubscriptionInfoPtr > subscriptions_
Definition: server.cpp:35
#define ROS_ERROR(...)
bool handleGetMessage(GetMessage::Request &request, GetMessage::Response &response)
Definition: server.cpp:82
void connectCallback(const PublicationInfoPtr &, const ros::SingleSubscriberPublisher &)
Definition: server.cpp:176
const SubscriptionInfoPtr & getSubscription(const std::string &topic)
Definition: server.cpp:68


topic_proxy
Author(s): Johannes Meyer
autogenerated on Sat Jul 27 2019 03:35:25