mux.cpp
Go to the documentation of this file.
1 // demux is a generic ROS topic demultiplexer: one input topic is fanned out
3 // to 1 of N output topics. A service is provided to select between the outputs
4 //
5 // Copyright (C) 2009, Morgan Quigley
6 //
7 // Redistribution and use in source and binary forms, with or without
8 // modification, are permitted provided that the following conditions are met:
9 // * Redistributions of source code must retain the above copyright notice,
10 // this list of conditions and the following disclaimer.
11 // * Redistributions in binary form must reproduce the above copyright
12 // notice, this list of conditions and the following disclaimer in the
13 // documentation and/or other materials provided with the distribution.
14 // * Neither the name of Stanford University nor the names of its
15 // contributors may be used to endorse or promote products derived from
16 // this software without specific prior written permission.
17 //
18 // THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
19 // AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
20 // IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
21 // ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
22 // LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
23 // CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
24 // SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
25 // INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
26 // CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
27 // ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
28 // POSSIBILITY OF SUCH DAMAGE.
30 
31 
32 #include <cstdio>
33 #include <vector>
34 #include <list>
35 #include "ros/console.h"
36 #include "std_msgs/String.h"
37 #include "topic_tools/MuxSelect.h"
38 #include "topic_tools/MuxAdd.h"
39 #include "topic_tools/MuxList.h"
40 #include "topic_tools/MuxDelete.h"
42 #include "topic_tools/parse.h"
43 
44 using std::string;
45 using std::vector;
46 using std::list;
47 using namespace topic_tools;
48 
49 const static string g_none_topic = "__none";
50 
51 static ros::NodeHandle *g_node = NULL;
52 static bool g_lazy = false;
53 static bool g_advertised = false;
54 static string g_output_topic;
57 
58 struct sub_info_t
59 {
60  std::string topic_name;
63 };
64 
66 
67 static list<struct sub_info_t> g_subs;
68 static list<struct sub_info_t>::iterator g_selected = g_subs.end();
69 
71 {
72  // If we're in lazy subscribe mode, and the first subscriber just
73  // connected, then subscribe
74  if(g_lazy && g_selected != g_subs.end() && !g_selected->sub)
75  {
76  ROS_DEBUG("lazy mode; resubscribing to %s", g_selected->topic_name.c_str());
77  g_selected->sub = new ros::Subscriber(g_node->subscribe<ShapeShifter>(g_selected->topic_name, 10, boost::bind(in_cb, _1, g_selected->msg)));
78  }
79 }
80 
81 bool sel_srv_cb( topic_tools::MuxSelect::Request &req,
82  topic_tools::MuxSelect::Response &res )
83 {
84  bool ret = false;
85  if (g_selected != g_subs.end()) {
86  res.prev_topic = g_selected->topic_name;
87 
88  // Unsubscribe to old topic if lazy
89  if (g_lazy) {
90  ROS_DEBUG("Unsubscribing to %s, lazy", res.prev_topic.c_str());
91  if (g_selected->sub)
92  g_selected->sub->shutdown();
93  delete g_selected->sub;
94  g_selected->sub = NULL;
95  }
96  }
97  else
98  res.prev_topic = string("");
99 
100  // see if it's the magical '__none' topic, in which case we open the circuit
101  if (req.topic == g_none_topic)
102  {
103  ROS_INFO("mux selected to no input.");
104 
105  g_selected = g_subs.end();
106  ret = true;
107  }
108  else
109  {
110  ROS_INFO("trying to switch mux to %s", req.topic.c_str());
111  // spin through our vector of inputs and find this guy
112  for (list<struct sub_info_t>::iterator it = g_subs.begin();
113  it != g_subs.end();
114  ++it)
115  {
116  if (ros::names::resolve(it->topic_name) == ros::names::resolve(req.topic))
117  {
118  g_selected = it;
119  ROS_INFO("mux selected input: [%s]", it->topic_name.c_str());
120  ret = true;
121 
122  if (!g_selected->sub && (!g_advertised || (g_advertised && g_pub.getNumSubscribers()))) {
123  g_selected->sub = new ros::Subscriber(g_node->subscribe<ShapeShifter>(g_selected->topic_name, 10, boost::bind(in_cb, _1, g_selected->msg)));
124  }
125  }
126  }
127  }
128 
129  if(ret)
130  {
131  std_msgs::String t;
132  t.data = req.topic;
133  g_pub_selected.publish(t);
134  }
135 
136  return ret;
137 }
138 
139 bool sel_srv_cb_dep( topic_tools::MuxSelect::Request &req,
140  topic_tools::MuxSelect::Response &res )
141 {
142  ROS_WARN("the <topic>_select service is deprecated; use mux/select instead");
143  return sel_srv_cb(req,res);
144 }
145 
146 
148  ShapeShifter* s)
149 {
150  if (!g_advertised)
151  {
152  ROS_INFO("advertising");
153  g_pub = msg->advertise(*g_node, g_output_topic, 10, false, conn_cb);
154  g_advertised = true;
155 
156  // If lazy, unregister from all but the selected topic
157  if (g_lazy) {
158  for (static list<struct sub_info_t>::iterator it = g_subs.begin(); it != g_subs.end(); ++it) {
159  if (it != g_selected) {
160  ROS_INFO("Unregistering from %s", it->topic_name.c_str());
161  if (it->sub)
162  it->sub->shutdown();
163  delete it->sub;
164  it->sub = NULL;
165  }
166  }
167  }
168  }
169 
170  if (s != g_selected->msg)
171  return;
172 
173  // If we're in lazy subscribe mode, and nobody's listening, then unsubscribe
174  if (g_lazy && !g_pub.getNumSubscribers() && g_selected != g_subs.end()) {
175  ROS_INFO("lazy mode; unsubscribing");
176  g_selected->sub->shutdown();
177  delete g_selected->sub;
178  g_selected->sub = NULL;
179  }
180  else
181  g_pub.publish(msg);
182 }
183 
184 bool list_topic_cb(topic_tools::MuxList::Request& req,
185  topic_tools::MuxList::Response& res)
186 {
187  (void)req;
188  for (list<struct sub_info_t>::iterator it = g_subs.begin();
189  it != g_subs.end();
190  ++it)
191  {
192  res.topics.push_back(it->topic_name);
193  }
194 
195  return true;
196 }
197 
198 bool add_topic_cb(topic_tools::MuxAdd::Request& req,
199  topic_tools::MuxAdd::Response& res)
200 {
201  (void)res;
202  // Check that it's not already in our list
203  ROS_INFO("trying to add %s to mux", req.topic.c_str());
204 
205  // Can't add the __none topic
206  if(req.topic == g_none_topic)
207  {
208  ROS_WARN("failed to add topic %s to mux, because it's reserved for special use",
209  req.topic.c_str());
210  return false;
211  }
212 
213  // spin through our vector of inputs and find this guy
214  for (list<struct sub_info_t>::iterator it = g_subs.begin();
215  it != g_subs.end();
216  ++it)
217  {
218  if (ros::names::resolve(it->topic_name) == ros::names::resolve(req.topic))
219  {
220  ROS_WARN("tried to add a topic that mux was already listening to: [%s]",
221  it->topic_name.c_str());
222  return false;
223  }
224  }
225 
226  struct sub_info_t sub_info;
227  sub_info.msg = new ShapeShifter;
228  sub_info.topic_name = ros::names::resolve(req.topic);
229  try
230  {
231  if (g_lazy)
232  sub_info.sub = NULL;
233  else
234  sub_info.sub = new ros::Subscriber(g_node->subscribe<ShapeShifter>(sub_info.topic_name, 10, boost::bind(in_cb, _1, sub_info.msg)));
235  }
236  catch(ros::InvalidNameException& e)
237  {
238  ROS_WARN("failed to add topic %s to mux, because it's an invalid name: %s",
239  req.topic.c_str(), e.what());
240  delete sub_info.msg;
241  return false;
242  }
243  g_subs.push_back(sub_info);
244 
245  ROS_INFO("added %s to mux", req.topic.c_str());
246 
247  return true;
248 }
249 
250 bool del_topic_cb(topic_tools::MuxDelete::Request& req,
251  topic_tools::MuxDelete::Response& res)
252 {
253  (void)res;
254  // Check that it's in our list
255  ROS_INFO("trying to delete %s from mux", req.topic.c_str());
256  // spin through our vector of inputs and find this guy
257  for (list<struct sub_info_t>::iterator it = g_subs.begin();
258  it != g_subs.end();
259  ++it)
260  {
261  if (ros::names::resolve(it->topic_name) == ros::names::resolve(req.topic))
262  {
263  // Can't delete the currently selected input, #2863
264  if(it == g_selected)
265  {
266  ROS_WARN("tried to delete currently selected topic %s from mux", req.topic.c_str());
267  return false;
268  }
269  if (it->sub)
270  it->sub->shutdown();
271  delete it->sub;
272  delete it->msg;
273  g_subs.erase(it);
274  ROS_INFO("deleted topic %s from mux", req.topic.c_str());
275  return true;
276  }
277  }
278 
279  ROS_WARN("tried to delete non-subscribed topic %s from mux", req.topic.c_str());
280  return false;
281 }
282 
283 int main(int argc, char **argv)
284 {
285  vector<string> args;
286  ros::removeROSArgs(argc, (const char**)argv, args);
287 
288  if (args.size() < 3)
289  {
290  printf("\nusage: mux OUT_TOPIC IN_TOPIC1 [IN_TOPIC2 [...]]\n\n");
291  return 1;
292  }
293  std::string topic_name;
294  if(!getBaseName(args[1], topic_name))
295  return 1;
296  ros::init(argc, argv, topic_name + string("_mux"),
298  vector<string> topics;
299  for (unsigned int i = 2; i < args.size(); i++)
300  topics.push_back(args[i]);
301  ros::NodeHandle n;
302  g_node = &n;
303  g_output_topic = args[1];
304  // Put our API into the "mux" namespace, which the user should usually remap
305  ros::NodeHandle mux_nh("mux"), pnh("~");
306  pnh.getParam("lazy", g_lazy);
307 
308  // Latched publisher for selected input topic name
309  g_pub_selected = mux_nh.advertise<std_msgs::String>(string("selected"), 1, true);
310 
311  for (size_t i = 0; i < topics.size(); i++)
312  {
313  struct sub_info_t sub_info;
314  sub_info.msg = new ShapeShifter;
315  sub_info.topic_name = ros::names::resolve(topics[i]);
316  sub_info.sub = new ros::Subscriber(n.subscribe<ShapeShifter>(sub_info.topic_name, 10, boost::bind(in_cb, _1, sub_info.msg)));
317 
318  g_subs.push_back(sub_info);
319  }
320 
321  // Set initial input topic from optional param, defaults to second argument
322  std::string initial_topic;
323  pnh.getParam("initial_topic", initial_topic);
324  std_msgs::String t;
325  if (initial_topic.empty()) // If param is not set, default to first in list
326  {
327  g_selected = g_subs.begin();
328  t.data = g_selected->topic_name;
329  }
330  else if (initial_topic == g_none_topic) // Set no initial input if param was __none
331  {
332  ROS_INFO("mux selected to no input.");
333  g_selected = g_subs.end();
334  t.data = g_none_topic;
335  }
336  else // Attempt to set initial topic if it is in the list
337  {
338  ROS_INFO("trying to switch mux to %s", initial_topic.c_str());
339  // spin through our vector of inputs and find this guy
340  for (list<struct sub_info_t>::iterator it = g_subs.begin();
341  it != g_subs.end();
342  ++it)
343  {
344  if (ros::names::resolve(it->topic_name) == ros::names::resolve(initial_topic))
345  {
346  g_selected = it;
347  t.data = initial_topic;
348  ROS_INFO("mux selected input: [%s]", it->topic_name.c_str());
349  break;
350  }
351  }
352  if (t.data.empty()) // If it wasn't in the list, default to no input. Or should we crash here?
353  {
354  g_selected = g_subs.end();
355  t.data = g_none_topic;
356  }
357  }
358  g_pub_selected.publish(t);
359 
360  // Backward compatibility
362  // New service
363  ros::ServiceServer ss_select = mux_nh.advertiseService(string("select"), sel_srv_cb);
364  ros::ServiceServer ss_add = mux_nh.advertiseService(string("add"), add_topic_cb);
365  ros::ServiceServer ss_list = mux_nh.advertiseService(string("list"), list_topic_cb);
366  ros::ServiceServer ss_del = mux_nh.advertiseService(string("delete"), del_topic_cb);
367  ros::spin();
368  for (list<struct sub_info_t>::iterator it = g_subs.begin();
369  it != g_subs.end();
370  ++it)
371  {
372  if (it->sub)
373  it->sub->shutdown();
374  delete it->sub;
375  delete it->msg;
376  }
377 
378  g_subs.clear();
379  return 0;
380 }
381 
void in_cb(const boost::shared_ptr< ShapeShifter const > &msg, ShapeShifter *s)
Definition: mux.cpp:147
int main(int argc, char **argv)
Definition: mux.cpp:283
Definition: args.py:1
bool add_topic_cb(topic_tools::MuxAdd::Request &req, topic_tools::MuxAdd::Response &res)
Definition: mux.cpp:198
void publish(const boost::shared_ptr< M > &message) const
static bool g_lazy
Definition: mux.cpp:52
static list< struct sub_info_t > g_subs
Definition: mux.cpp:67
Subscriber subscribe(const std::string &topic, uint32_t queue_size, void(T::*fp)(M), T *obj, const TransportHints &transport_hints=TransportHints())
bool del_topic_cb(topic_tools::MuxDelete::Request &req, topic_tools::MuxDelete::Response &res)
Definition: mux.cpp:250
ROSCPP_DECL void init(int &argc, char **argv, const std::string &name, uint32_t options=0)
bool list_topic_cb(topic_tools::MuxList::Request &req, topic_tools::MuxList::Response &res)
Definition: mux.cpp:184
ROSCPP_DECL std::string resolve(const std::string &name, bool remap=true)
ServiceServer advertiseService(const std::string &service, bool(T::*srv_func)(MReq &, MRes &), T *obj)
#define ROS_WARN(...)
static ros::Publisher g_pub_selected
Definition: mux.cpp:56
static bool g_advertised
Definition: mux.cpp:53
static ros::NodeHandle * g_node
Definition: mux.cpp:51
static list< struct sub_info_t >::iterator g_selected
Definition: mux.cpp:68
TOPIC_TOOLS_DECL bool getBaseName(const std::string &full_name, std::string &base_name)
Definition: parse.cpp:40
#define ROS_INFO(...)
bool sel_srv_cb_dep(topic_tools::MuxSelect::Request &req, topic_tools::MuxSelect::Response &res)
Definition: mux.cpp:139
ROSCPP_DECL void spin()
std::string topic_name
Definition: mux.cpp:60
ShapeShifter * msg
Definition: mux.cpp:62
static const string g_none_topic
Definition: mux.cpp:49
uint32_t getNumSubscribers() const
bool getParam(const std::string &key, std::string &s) const
void conn_cb(const ros::SingleSubscriberPublisher &)
Definition: mux.cpp:70
ROSCPP_DECL void removeROSArgs(int argc, const char *const *argv, V_string &args_out)
bool sel_srv_cb(topic_tools::MuxSelect::Request &req, topic_tools::MuxSelect::Response &res)
Definition: mux.cpp:81
static string g_output_topic
Definition: mux.cpp:54
ros::Subscriber * sub
Definition: mux.cpp:61
static ros::Publisher g_pub
Definition: mux.cpp:55
#define ROS_DEBUG(...)


topic_tools
Author(s): Morgan Quigley, Brian Gerkey
autogenerated on Sun Feb 3 2019 03:30:24