mux.cpp
Go to the documentation of this file.
1 // demux is a generic ROS topic demultiplexer: one input topic is fanned out
3 // to 1 of N output topics. A service is provided to select between the outputs
4 //
5 // Copyright (C) 2009, Morgan Quigley
6 //
7 // Redistribution and use in source and binary forms, with or without
8 // modification, are permitted provided that the following conditions are met:
9 // * Redistributions of source code must retain the above copyright notice,
10 // this list of conditions and the following disclaimer.
11 // * Redistributions in binary form must reproduce the above copyright
12 // notice, this list of conditions and the following disclaimer in the
13 // documentation and/or other materials provided with the distribution.
14 // * Neither the name of Stanford University nor the names of its
15 // contributors may be used to endorse or promote products derived from
16 // this software without specific prior written permission.
17 //
18 // THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
19 // AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
20 // IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
21 // ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
22 // LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
23 // CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
24 // SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
25 // INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
26 // CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
27 // ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
28 // POSSIBILITY OF SUCH DAMAGE.
30 
31 
32 #include <cstdio>
33 #include <vector>
34 #include <list>
35 #include <thread>
36 #include "ros/console.h"
37 #include "std_msgs/String.h"
38 #include "topic_tools/MuxSelect.h"
39 #include "topic_tools/MuxAdd.h"
40 #include "topic_tools/MuxList.h"
41 #include "topic_tools/MuxDelete.h"
43 #include "topic_tools/parse.h"
44 
45 using std::string;
46 using std::vector;
47 using std::list;
48 using namespace topic_tools;
49 
50 const static string g_none_topic = "__none";
51 
52 static ros::NodeHandle *g_node = NULL;
53 static bool g_lazy = false;
54 static bool g_latch = false;
55 static bool g_advertised = false;
56 static bool g_wait_pub_init = false;
57 static double g_wait_second = 0.1;
58 static string g_output_topic;
61 
62 struct sub_info_t
63 {
64  std::string topic_name;
67 };
68 
70 
71 static list<struct sub_info_t> g_subs;
72 static list<struct sub_info_t>::iterator g_selected = g_subs.end();
73 
75 {
76  // If we're in lazy subscribe mode, and the first subscriber just
77  // connected, then subscribe
78  if(g_lazy && g_selected != g_subs.end() && !g_selected->sub)
79  {
80  ROS_DEBUG("lazy mode; resubscribing to %s", g_selected->topic_name.c_str());
81  g_selected->sub = new ros::Subscriber(g_node->subscribe<ShapeShifter>(g_selected->topic_name, 10, boost::bind(in_cb, boost::placeholders::_1, g_selected->msg)));
82  }
83 }
84 
85 bool sel_srv_cb( topic_tools::MuxSelect::Request &req,
86  topic_tools::MuxSelect::Response &res )
87 {
88  bool ret = false;
89  if (g_selected != g_subs.end()) {
90  res.prev_topic = g_selected->topic_name;
91 
92  // Unsubscribe to old topic if lazy
93  if (g_lazy) {
94  ROS_DEBUG("Unsubscribing to %s, lazy", res.prev_topic.c_str());
95  if (g_selected->sub)
96  g_selected->sub->shutdown();
97  delete g_selected->sub;
98  g_selected->sub = NULL;
99  }
100  }
101  else
102  res.prev_topic = string("");
103 
104  // see if it's the magical '__none' topic, in which case we open the circuit
105  if (req.topic == g_none_topic)
106  {
107  ROS_INFO("mux selected to no input.");
108 
109  g_selected = g_subs.end();
110  ret = true;
111  }
112  else
113  {
114  ROS_INFO("trying to switch mux to %s", req.topic.c_str());
115  // spin through our vector of inputs and find this guy
116  for (list<struct sub_info_t>::iterator it = g_subs.begin();
117  it != g_subs.end();
118  ++it)
119  {
120  if (ros::names::resolve(it->topic_name) == ros::names::resolve(req.topic))
121  {
122  g_selected = it;
123  ROS_INFO("mux selected input: [%s]", it->topic_name.c_str());
124  ret = true;
125 
126  if (!g_selected->sub && (!g_advertised || (g_advertised && g_pub.getNumSubscribers()))) {
127  g_selected->sub = new ros::Subscriber(g_node->subscribe<ShapeShifter>(g_selected->topic_name, 10, boost::bind(in_cb, boost::placeholders::_1, g_selected->msg)));
128  }
129  }
130  }
131  }
132 
133  if(ret)
134  {
135  std_msgs::String t;
136  t.data = req.topic;
138  }
139 
140  return ret;
141 }
142 
143 bool sel_srv_cb_dep( topic_tools::MuxSelect::Request &req,
144  topic_tools::MuxSelect::Response &res )
145 {
146  ROS_WARN("the <topic>_select service is deprecated; use mux/select instead");
147  return sel_srv_cb(req,res);
148 }
149 
150 
152  ShapeShifter* s)
153 {
154  if (!g_advertised)
155  {
156  ROS_INFO("advertising");
157  g_pub = msg->advertise(*g_node, g_output_topic, 10, g_latch, conn_cb);
158  // we need sleep for publisher initialization
159  // otherwise the first topic will drop.
160  if (g_wait_pub_init) {
161  std::this_thread::sleep_for(std::chrono::microseconds(static_cast<int>(g_wait_second * 1000000)));
162  }
163  g_advertised = true;
164 
165  // If lazy, unregister from all but the selected topic
166  if (g_lazy) {
167  for (static list<struct sub_info_t>::iterator it = g_subs.begin(); it != g_subs.end(); ++it) {
168  if (it != g_selected) {
169  ROS_INFO("Unregistering from %s", it->topic_name.c_str());
170  if (it->sub)
171  it->sub->shutdown();
172  delete it->sub;
173  it->sub = NULL;
174  }
175  }
176  }
177  }
178 
179  if (g_selected == g_subs.end() || s != g_selected->msg)
180  return;
181 
182  // If we're in lazy subscribe mode, and nobody's listening, then unsubscribe
183  if (g_lazy && !g_pub.getNumSubscribers() && g_selected != g_subs.end()) {
184  ROS_INFO("lazy mode; unsubscribing");
185  g_selected->sub->shutdown();
186  delete g_selected->sub;
187  g_selected->sub = NULL;
188  }
189  else
190  g_pub.publish(msg);
191 }
192 
193 bool list_topic_cb(topic_tools::MuxList::Request& req,
194  topic_tools::MuxList::Response& res)
195 {
196  (void)req;
197  for (list<struct sub_info_t>::iterator it = g_subs.begin();
198  it != g_subs.end();
199  ++it)
200  {
201  res.topics.push_back(it->topic_name);
202  }
203 
204  return true;
205 }
206 
207 bool add_topic_cb(topic_tools::MuxAdd::Request& req,
208  topic_tools::MuxAdd::Response& res)
209 {
210  (void)res;
211  // Check that it's not already in our list
212  ROS_INFO("trying to add %s to mux", req.topic.c_str());
213 
214  // Can't add the __none topic
215  if(req.topic == g_none_topic)
216  {
217  ROS_WARN("failed to add topic %s to mux, because it's reserved for special use",
218  req.topic.c_str());
219  return false;
220  }
221 
222  // spin through our vector of inputs and find this guy
223  for (list<struct sub_info_t>::iterator it = g_subs.begin();
224  it != g_subs.end();
225  ++it)
226  {
227  if (ros::names::resolve(it->topic_name) == ros::names::resolve(req.topic))
228  {
229  ROS_WARN("tried to add a topic that mux was already listening to: [%s]",
230  it->topic_name.c_str());
231  return false;
232  }
233  }
234 
235  struct sub_info_t sub_info;
236  sub_info.msg = new ShapeShifter;
237  sub_info.topic_name = ros::names::resolve(req.topic);
238  try
239  {
240  if (g_lazy)
241  sub_info.sub = NULL;
242  else
243  sub_info.sub = new ros::Subscriber(g_node->subscribe<ShapeShifter>(sub_info.topic_name, 10, boost::bind(in_cb, boost::placeholders::_1, sub_info.msg)));
244  }
245  catch(ros::InvalidNameException& e)
246  {
247  ROS_WARN("failed to add topic %s to mux, because it's an invalid name: %s",
248  req.topic.c_str(), e.what());
249  delete sub_info.msg;
250  return false;
251  }
252  g_subs.push_back(sub_info);
253 
254  ROS_INFO("added %s to mux", req.topic.c_str());
255 
256  return true;
257 }
258 
259 bool del_topic_cb(topic_tools::MuxDelete::Request& req,
260  topic_tools::MuxDelete::Response& res)
261 {
262  (void)res;
263  // Check that it's in our list
264  ROS_INFO("trying to delete %s from mux", req.topic.c_str());
265  // spin through our vector of inputs and find this guy
266  for (list<struct sub_info_t>::iterator it = g_subs.begin();
267  it != g_subs.end();
268  ++it)
269  {
270  if (ros::names::resolve(it->topic_name) == ros::names::resolve(req.topic))
271  {
272  // Can't delete the currently selected input, #2863
273  if(it == g_selected)
274  {
275  ROS_WARN("tried to delete currently selected topic %s from mux", req.topic.c_str());
276  return false;
277  }
278  if (it->sub)
279  it->sub->shutdown();
280  delete it->sub;
281  delete it->msg;
282  g_subs.erase(it);
283  ROS_INFO("deleted topic %s from mux", req.topic.c_str());
284  return true;
285  }
286  }
287 
288  ROS_WARN("tried to delete non-subscribed topic %s from mux", req.topic.c_str());
289  return false;
290 }
291 
292 int main(int argc, char **argv)
293 {
294  vector<string> args;
295  ros::removeROSArgs(argc, (const char**)argv, args);
296 
297  if (args.size() < 3)
298  {
299  printf("\nusage: mux OUT_TOPIC IN_TOPIC1 [IN_TOPIC2 [...]]\n\n");
300  return 1;
301  }
302  std::string topic_name;
303  if(!getBaseName(args[1], topic_name))
304  return 1;
305  ros::init(argc, argv, topic_name + string("_mux"),
307  vector<string> topics;
308  for (unsigned int i = 2; i < args.size(); i++)
309  topics.push_back(args[i]);
310  ros::NodeHandle n;
311  g_node = &n;
312  g_output_topic = args[1];
313  // Put our API into the "mux" namespace, which the user should usually remap
314  ros::NodeHandle mux_nh("mux"), pnh("~");
315  pnh.getParam("lazy", g_lazy);
316  pnh.getParam("latch", g_latch);
317 
318  // Latched publisher for selected input topic name
319  g_pub_selected = mux_nh.advertise<std_msgs::String>(string("selected"), 1, true);
320 
321  for (size_t i = 0; i < topics.size(); i++)
322  {
323  struct sub_info_t sub_info;
324  sub_info.msg = new ShapeShifter;
325  sub_info.topic_name = ros::names::resolve(topics[i]);
326  sub_info.sub = new ros::Subscriber(n.subscribe<ShapeShifter>(sub_info.topic_name, 10, boost::bind(in_cb, boost::placeholders::_1, sub_info.msg)));
327 
328  g_subs.push_back(sub_info);
329  }
330 
331  // Set initial input topic from optional param, defaults to second argument
332  std::string initial_topic;
333  pnh.getParam("initial_topic", initial_topic);
334  std_msgs::String t;
335  if (initial_topic.empty()) // If param is not set, default to first in list
336  {
337  g_selected = g_subs.begin();
338  t.data = g_selected->topic_name;
339  }
340  else if (initial_topic == g_none_topic) // Set no initial input if param was __none
341  {
342  ROS_INFO("mux selected to no input.");
343  g_selected = g_subs.end();
344  t.data = g_none_topic;
345  }
346  else // Attempt to set initial topic if it is in the list
347  {
348  ROS_INFO("trying to switch mux to %s", initial_topic.c_str());
349  // spin through our vector of inputs and find this guy
350  for (list<struct sub_info_t>::iterator it = g_subs.begin();
351  it != g_subs.end();
352  ++it)
353  {
354  if (ros::names::resolve(it->topic_name) == ros::names::resolve(initial_topic))
355  {
356  g_selected = it;
357  t.data = initial_topic;
358  ROS_INFO("mux selected input: [%s]", it->topic_name.c_str());
359  break;
360  }
361  }
362  if (t.data.empty()) // If it wasn't in the list, default to no input. Or should we crash here?
363  {
364  g_selected = g_subs.end();
365  t.data = g_none_topic;
366  }
367  }
368 
369  // Wait publisher initialization for few second (default: 0.1s)
370  // This option is to avoid dropping the first topic publishing.
371  pnh.getParam("wait_publisher_initialization", g_wait_pub_init);
372  pnh.getParam("wait_publisher_second", g_wait_second);
374 
375  // Backward compatibility
377  // New service
378  ros::ServiceServer ss_select = mux_nh.advertiseService(string("select"), sel_srv_cb);
379  ros::ServiceServer ss_add = mux_nh.advertiseService(string("add"), add_topic_cb);
380  ros::ServiceServer ss_list = mux_nh.advertiseService(string("list"), list_topic_cb);
381  ros::ServiceServer ss_del = mux_nh.advertiseService(string("delete"), del_topic_cb);
382  ros::spin();
383  for (list<struct sub_info_t>::iterator it = g_subs.begin();
384  it != g_subs.end();
385  ++it)
386  {
387  if (it->sub)
388  it->sub->shutdown();
389  delete it->sub;
390  delete it->msg;
391  }
392 
393  g_subs.clear();
394  return 0;
395 }
396 
ros::init_options::AnonymousName
AnonymousName
g_wait_pub_init
static bool g_wait_pub_init
Definition: mux.cpp:56
sub_info_t::topic_name
std::string topic_name
Definition: mux.cpp:64
ros::Publisher
sel_srv_cb_dep
bool sel_srv_cb_dep(topic_tools::MuxSelect::Request &req, topic_tools::MuxSelect::Response &res)
Definition: mux.cpp:143
boost::shared_ptr
ros::init
ROSCPP_DECL void init(const M_string &remappings, const std::string &name, uint32_t options=0)
shape_shifter.h
s
XmlRpcServer s
ros::NodeHandle::getParam
bool getParam(const std::string &key, bool &b) const
add_topic_cb
bool add_topic_cb(topic_tools::MuxAdd::Request &req, topic_tools::MuxAdd::Response &res)
Definition: mux.cpp:207
g_wait_second
static double g_wait_second
Definition: mux.cpp:57
sel_srv_cb
bool sel_srv_cb(topic_tools::MuxSelect::Request &req, topic_tools::MuxSelect::Response &res)
Definition: mux.cpp:85
ros::NodeHandle::advertiseService
ServiceServer advertiseService(AdvertiseServiceOptions &ops)
main
int main(int argc, char **argv)
Definition: mux.cpp:292
ros::Publisher::publish
void publish(const boost::shared_ptr< M > &message) const
ros::names::resolve
ROSCPP_DECL std::string resolve(const std::string &name, bool remap=true)
parse.h
ros::ServiceServer
console.h
ros::SingleSubscriberPublisher
g_advertised
static bool g_advertised
Definition: mux.cpp:55
del_topic_cb
bool del_topic_cb(topic_tools::MuxDelete::Request &req, topic_tools::MuxDelete::Response &res)
Definition: mux.cpp:259
ROS_DEBUG
#define ROS_DEBUG(...)
g_subs
static list< struct sub_info_t > g_subs
Definition: mux.cpp:71
sub_info_t
Definition: mux.cpp:62
ROS_WARN
#define ROS_WARN(...)
g_node
static ros::NodeHandle * g_node
Definition: mux.cpp:52
ros::NodeHandle::subscribe
Subscriber subscribe(const std::string &topic, uint32_t queue_size, const boost::function< void(C)> &callback, const VoidConstPtr &tracked_object=VoidConstPtr(), const TransportHints &transport_hints=TransportHints())
g_pub_selected
static ros::Publisher g_pub_selected
Definition: mux.cpp:60
topic_tools::getBaseName
TOPIC_TOOLS_DECL bool getBaseName(const std::string &full_name, std::string &base_name)
Definition: parse.cpp:40
g_none_topic
const static string g_none_topic
Definition: mux.cpp:50
list_topic_cb
bool list_topic_cb(topic_tools::MuxList::Request &req, topic_tools::MuxList::Response &res)
Definition: mux.cpp:193
g_pub
static ros::Publisher g_pub
Definition: mux.cpp:59
g_lazy
static bool g_lazy
Definition: mux.cpp:53
g_output_topic
static string g_output_topic
Definition: mux.cpp:58
conn_cb
void conn_cb(const ros::SingleSubscriberPublisher &)
Definition: mux.cpp:74
ros::InvalidNameException
g_selected
static list< struct sub_info_t >::iterator g_selected
Definition: mux.cpp:72
sub_info_t::msg
ShapeShifter * msg
Definition: mux.cpp:66
g_latch
static bool g_latch
Definition: mux.cpp:54
ros::spin
ROSCPP_DECL void spin()
ros::Publisher::getNumSubscribers
uint32_t getNumSubscribers() const
args
Definition: args.py:1
ros::removeROSArgs
ROSCPP_DECL void removeROSArgs(int argc, const char *const *argv, V_string &args_out)
topic_tools::ShapeShifter
Definition: shape_shifter.h:91
ROS_INFO
#define ROS_INFO(...)
topic_tools
Definition: parse.h:33
in_cb
void in_cb(const boost::shared_ptr< ShapeShifter const > &msg, ShapeShifter *s)
Definition: mux.cpp:151
ros::NodeHandle
sub_info_t::sub
ros::Subscriber * sub
Definition: mux.cpp:65
ros::Subscriber


topic_tools
Author(s): Morgan Quigley, Brian Gerkey, Dirk Thomas , Jacob Perron
autogenerated on Tue May 20 2025 03:00:38