mux.cpp
Go to the documentation of this file.
1 // demux is a generic ROS topic demultiplexer: one input topic is fanned out
3 // to 1 of N output topics. A service is provided to select between the outputs
4 //
5 // Copyright (C) 2009, Morgan Quigley
6 //
7 // Redistribution and use in source and binary forms, with or without
8 // modification, are permitted provided that the following conditions are met:
9 // * Redistributions of source code must retain the above copyright notice,
10 // this list of conditions and the following disclaimer.
11 // * Redistributions in binary form must reproduce the above copyright
12 // notice, this list of conditions and the following disclaimer in the
13 // documentation and/or other materials provided with the distribution.
14 // * Neither the name of Stanford University nor the names of its
15 // contributors may be used to endorse or promote products derived from
16 // this software without specific prior written permission.
17 //
18 // THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
19 // AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
20 // IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
21 // ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
22 // LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
23 // CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
24 // SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
25 // INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
26 // CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
27 // ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
28 // POSSIBILITY OF SUCH DAMAGE.
30 
31 
32 #include <cstdio>
33 #include <vector>
34 #include <list>
35 #include "ros/console.h"
36 #include "std_msgs/String.h"
37 #include "topic_tools/MuxSelect.h"
38 #include "topic_tools/MuxAdd.h"
39 #include "topic_tools/MuxList.h"
40 #include "topic_tools/MuxDelete.h"
42 #include "topic_tools/parse.h"
43 
44 using std::string;
45 using std::vector;
46 using std::list;
47 using namespace topic_tools;
48 
49 const static string g_none_topic = "__none";
50 
51 static ros::NodeHandle *g_node = NULL;
52 static bool g_lazy = false;
53 static bool g_latch = false;
54 static bool g_advertised = false;
55 static string g_output_topic;
58 
59 struct sub_info_t
60 {
61  std::string topic_name;
64 };
65 
67 
68 static list<struct sub_info_t> g_subs;
69 static list<struct sub_info_t>::iterator g_selected = g_subs.end();
70 
72 {
73  // If we're in lazy subscribe mode, and the first subscriber just
74  // connected, then subscribe
75  if(g_lazy && g_selected != g_subs.end() && !g_selected->sub)
76  {
77  ROS_DEBUG("lazy mode; resubscribing to %s", g_selected->topic_name.c_str());
78  g_selected->sub = new ros::Subscriber(g_node->subscribe<ShapeShifter>(g_selected->topic_name, 10, boost::bind(in_cb, _1, g_selected->msg)));
79  }
80 }
81 
82 bool sel_srv_cb( topic_tools::MuxSelect::Request &req,
83  topic_tools::MuxSelect::Response &res )
84 {
85  bool ret = false;
86  if (g_selected != g_subs.end()) {
87  res.prev_topic = g_selected->topic_name;
88 
89  // Unsubscribe to old topic if lazy
90  if (g_lazy) {
91  ROS_DEBUG("Unsubscribing to %s, lazy", res.prev_topic.c_str());
92  if (g_selected->sub)
93  g_selected->sub->shutdown();
94  delete g_selected->sub;
95  g_selected->sub = NULL;
96  }
97  }
98  else
99  res.prev_topic = string("");
100 
101  // see if it's the magical '__none' topic, in which case we open the circuit
102  if (req.topic == g_none_topic)
103  {
104  ROS_INFO("mux selected to no input.");
105 
106  g_selected = g_subs.end();
107  ret = true;
108  }
109  else
110  {
111  ROS_INFO("trying to switch mux to %s", req.topic.c_str());
112  // spin through our vector of inputs and find this guy
113  for (list<struct sub_info_t>::iterator it = g_subs.begin();
114  it != g_subs.end();
115  ++it)
116  {
117  if (ros::names::resolve(it->topic_name) == ros::names::resolve(req.topic))
118  {
119  g_selected = it;
120  ROS_INFO("mux selected input: [%s]", it->topic_name.c_str());
121  ret = true;
122 
123  if (!g_selected->sub && (!g_advertised || (g_advertised && g_pub.getNumSubscribers()))) {
124  g_selected->sub = new ros::Subscriber(g_node->subscribe<ShapeShifter>(g_selected->topic_name, 10, boost::bind(in_cb, _1, g_selected->msg)));
125  }
126  }
127  }
128  }
129 
130  if(ret)
131  {
132  std_msgs::String t;
133  t.data = req.topic;
134  g_pub_selected.publish(t);
135  }
136 
137  return ret;
138 }
139 
140 bool sel_srv_cb_dep( topic_tools::MuxSelect::Request &req,
141  topic_tools::MuxSelect::Response &res )
142 {
143  ROS_WARN("the <topic>_select service is deprecated; use mux/select instead");
144  return sel_srv_cb(req,res);
145 }
146 
147 
149  ShapeShifter* s)
150 {
151  if (!g_advertised)
152  {
153  ROS_INFO("advertising");
154  g_pub = msg->advertise(*g_node, g_output_topic, 10, g_latch, conn_cb);
155  g_advertised = true;
156 
157  // If lazy, unregister from all but the selected topic
158  if (g_lazy) {
159  for (static list<struct sub_info_t>::iterator it = g_subs.begin(); it != g_subs.end(); ++it) {
160  if (it != g_selected) {
161  ROS_INFO("Unregistering from %s", it->topic_name.c_str());
162  if (it->sub)
163  it->sub->shutdown();
164  delete it->sub;
165  it->sub = NULL;
166  }
167  }
168  }
169  }
170 
171  if (g_selected == g_subs.end() || s != g_selected->msg)
172  return;
173 
174  // If we're in lazy subscribe mode, and nobody's listening, then unsubscribe
175  if (g_lazy && !g_pub.getNumSubscribers() && g_selected != g_subs.end()) {
176  ROS_INFO("lazy mode; unsubscribing");
177  g_selected->sub->shutdown();
178  delete g_selected->sub;
179  g_selected->sub = NULL;
180  }
181  else
182  g_pub.publish(msg);
183 }
184 
185 bool list_topic_cb(topic_tools::MuxList::Request& req,
186  topic_tools::MuxList::Response& res)
187 {
188  (void)req;
189  for (list<struct sub_info_t>::iterator it = g_subs.begin();
190  it != g_subs.end();
191  ++it)
192  {
193  res.topics.push_back(it->topic_name);
194  }
195 
196  return true;
197 }
198 
199 bool add_topic_cb(topic_tools::MuxAdd::Request& req,
200  topic_tools::MuxAdd::Response& res)
201 {
202  (void)res;
203  // Check that it's not already in our list
204  ROS_INFO("trying to add %s to mux", req.topic.c_str());
205 
206  // Can't add the __none topic
207  if(req.topic == g_none_topic)
208  {
209  ROS_WARN("failed to add topic %s to mux, because it's reserved for special use",
210  req.topic.c_str());
211  return false;
212  }
213 
214  // spin through our vector of inputs and find this guy
215  for (list<struct sub_info_t>::iterator it = g_subs.begin();
216  it != g_subs.end();
217  ++it)
218  {
219  if (ros::names::resolve(it->topic_name) == ros::names::resolve(req.topic))
220  {
221  ROS_WARN("tried to add a topic that mux was already listening to: [%s]",
222  it->topic_name.c_str());
223  return false;
224  }
225  }
226 
227  struct sub_info_t sub_info;
228  sub_info.msg = new ShapeShifter;
229  sub_info.topic_name = ros::names::resolve(req.topic);
230  try
231  {
232  if (g_lazy)
233  sub_info.sub = NULL;
234  else
235  sub_info.sub = new ros::Subscriber(g_node->subscribe<ShapeShifter>(sub_info.topic_name, 10, boost::bind(in_cb, _1, sub_info.msg)));
236  }
237  catch(ros::InvalidNameException& e)
238  {
239  ROS_WARN("failed to add topic %s to mux, because it's an invalid name: %s",
240  req.topic.c_str(), e.what());
241  delete sub_info.msg;
242  return false;
243  }
244  g_subs.push_back(sub_info);
245 
246  ROS_INFO("added %s to mux", req.topic.c_str());
247 
248  return true;
249 }
250 
251 bool del_topic_cb(topic_tools::MuxDelete::Request& req,
252  topic_tools::MuxDelete::Response& res)
253 {
254  (void)res;
255  // Check that it's in our list
256  ROS_INFO("trying to delete %s from mux", req.topic.c_str());
257  // spin through our vector of inputs and find this guy
258  for (list<struct sub_info_t>::iterator it = g_subs.begin();
259  it != g_subs.end();
260  ++it)
261  {
262  if (ros::names::resolve(it->topic_name) == ros::names::resolve(req.topic))
263  {
264  // Can't delete the currently selected input, #2863
265  if(it == g_selected)
266  {
267  ROS_WARN("tried to delete currently selected topic %s from mux", req.topic.c_str());
268  return false;
269  }
270  if (it->sub)
271  it->sub->shutdown();
272  delete it->sub;
273  delete it->msg;
274  g_subs.erase(it);
275  ROS_INFO("deleted topic %s from mux", req.topic.c_str());
276  return true;
277  }
278  }
279 
280  ROS_WARN("tried to delete non-subscribed topic %s from mux", req.topic.c_str());
281  return false;
282 }
283 
284 int main(int argc, char **argv)
285 {
286  vector<string> args;
287  ros::removeROSArgs(argc, (const char**)argv, args);
288 
289  if (args.size() < 3)
290  {
291  printf("\nusage: mux OUT_TOPIC IN_TOPIC1 [IN_TOPIC2 [...]]\n\n");
292  return 1;
293  }
294  std::string topic_name;
295  if(!getBaseName(args[1], topic_name))
296  return 1;
297  ros::init(argc, argv, topic_name + string("_mux"),
299  vector<string> topics;
300  for (unsigned int i = 2; i < args.size(); i++)
301  topics.push_back(args[i]);
302  ros::NodeHandle n;
303  g_node = &n;
304  g_output_topic = args[1];
305  // Put our API into the "mux" namespace, which the user should usually remap
306  ros::NodeHandle mux_nh("mux"), pnh("~");
307  pnh.getParam("lazy", g_lazy);
308  pnh.getParam("latch", g_latch);
309 
310  // Latched publisher for selected input topic name
311  g_pub_selected = mux_nh.advertise<std_msgs::String>(string("selected"), 1, true);
312 
313  for (size_t i = 0; i < topics.size(); i++)
314  {
315  struct sub_info_t sub_info;
316  sub_info.msg = new ShapeShifter;
317  sub_info.topic_name = ros::names::resolve(topics[i]);
318  sub_info.sub = new ros::Subscriber(n.subscribe<ShapeShifter>(sub_info.topic_name, 10, boost::bind(in_cb, _1, sub_info.msg)));
319 
320  g_subs.push_back(sub_info);
321  }
322 
323  // Set initial input topic from optional param, defaults to second argument
324  std::string initial_topic;
325  pnh.getParam("initial_topic", initial_topic);
326  std_msgs::String t;
327  if (initial_topic.empty()) // If param is not set, default to first in list
328  {
329  g_selected = g_subs.begin();
330  t.data = g_selected->topic_name;
331  }
332  else if (initial_topic == g_none_topic) // Set no initial input if param was __none
333  {
334  ROS_INFO("mux selected to no input.");
335  g_selected = g_subs.end();
336  t.data = g_none_topic;
337  }
338  else // Attempt to set initial topic if it is in the list
339  {
340  ROS_INFO("trying to switch mux to %s", initial_topic.c_str());
341  // spin through our vector of inputs and find this guy
342  for (list<struct sub_info_t>::iterator it = g_subs.begin();
343  it != g_subs.end();
344  ++it)
345  {
346  if (ros::names::resolve(it->topic_name) == ros::names::resolve(initial_topic))
347  {
348  g_selected = it;
349  t.data = initial_topic;
350  ROS_INFO("mux selected input: [%s]", it->topic_name.c_str());
351  break;
352  }
353  }
354  if (t.data.empty()) // If it wasn't in the list, default to no input. Or should we crash here?
355  {
356  g_selected = g_subs.end();
357  t.data = g_none_topic;
358  }
359  }
360  g_pub_selected.publish(t);
361 
362  // Backward compatibility
364  // New service
365  ros::ServiceServer ss_select = mux_nh.advertiseService(string("select"), sel_srv_cb);
366  ros::ServiceServer ss_add = mux_nh.advertiseService(string("add"), add_topic_cb);
367  ros::ServiceServer ss_list = mux_nh.advertiseService(string("list"), list_topic_cb);
368  ros::ServiceServer ss_del = mux_nh.advertiseService(string("delete"), del_topic_cb);
369  ros::spin();
370  for (list<struct sub_info_t>::iterator it = g_subs.begin();
371  it != g_subs.end();
372  ++it)
373  {
374  if (it->sub)
375  it->sub->shutdown();
376  delete it->sub;
377  delete it->msg;
378  }
379 
380  g_subs.clear();
381  return 0;
382 }
383 
int main(int argc, char **argv)
Definition: mux.cpp:284
Definition: args.py:1
bool add_topic_cb(topic_tools::MuxAdd::Request &req, topic_tools::MuxAdd::Response &res)
Definition: mux.cpp:199
void in_cb(const boost::shared_ptr< ShapeShifter const > &msg, ShapeShifter *s)
Definition: mux.cpp:148
static bool g_lazy
Definition: mux.cpp:52
static list< struct sub_info_t > g_subs
Definition: mux.cpp:68
Subscriber subscribe(const std::string &topic, uint32_t queue_size, void(T::*fp)(M), T *obj, const TransportHints &transport_hints=TransportHints())
bool del_topic_cb(topic_tools::MuxDelete::Request &req, topic_tools::MuxDelete::Response &res)
Definition: mux.cpp:251
ROSCPP_DECL void init(int &argc, char **argv, const std::string &name, uint32_t options=0)
bool list_topic_cb(topic_tools::MuxList::Request &req, topic_tools::MuxList::Response &res)
Definition: mux.cpp:185
ROSCPP_DECL std::string resolve(const std::string &name, bool remap=true)
ServiceServer advertiseService(const std::string &service, bool(T::*srv_func)(MReq &, MRes &), T *obj)
#define ROS_WARN(...)
static ros::Publisher g_pub_selected
Definition: mux.cpp:57
static bool g_advertised
Definition: mux.cpp:54
static ros::NodeHandle * g_node
Definition: mux.cpp:51
static list< struct sub_info_t >::iterator g_selected
Definition: mux.cpp:69
void publish(const boost::shared_ptr< M > &message) const
TOPIC_TOOLS_DECL bool getBaseName(const std::string &full_name, std::string &base_name)
Definition: parse.cpp:40
#define ROS_INFO(...)
bool getParam(const std::string &key, std::string &s) const
bool sel_srv_cb_dep(topic_tools::MuxSelect::Request &req, topic_tools::MuxSelect::Response &res)
Definition: mux.cpp:140
ROSCPP_DECL void spin()
std::string topic_name
Definition: mux.cpp:61
static bool g_latch
Definition: mux.cpp:53
ShapeShifter * msg
Definition: mux.cpp:63
static const string g_none_topic
Definition: mux.cpp:49
void conn_cb(const ros::SingleSubscriberPublisher &)
Definition: mux.cpp:71
ROSCPP_DECL void removeROSArgs(int argc, const char *const *argv, V_string &args_out)
bool sel_srv_cb(topic_tools::MuxSelect::Request &req, topic_tools::MuxSelect::Response &res)
Definition: mux.cpp:82
static string g_output_topic
Definition: mux.cpp:55
uint32_t getNumSubscribers() const
ros::Subscriber * sub
Definition: mux.cpp:62
static ros::Publisher g_pub
Definition: mux.cpp:56
#define ROS_DEBUG(...)


topic_tools
Author(s): Morgan Quigley, Brian Gerkey, Dirk Thomas
autogenerated on Mon Feb 28 2022 23:34:17