mux.cpp
Go to the documentation of this file.
1 // demux is a generic ROS topic demultiplexer: one input topic is fanned out
3 // to 1 of N output topics. A service is provided to select between the outputs
4 //
5 // Copyright (C) 2009, Morgan Quigley
6 //
7 // Redistribution and use in source and binary forms, with or without
8 // modification, are permitted provided that the following conditions are met:
9 // * Redistributions of source code must retain the above copyright notice,
10 // this list of conditions and the following disclaimer.
11 // * Redistributions in binary form must reproduce the above copyright
12 // notice, this list of conditions and the following disclaimer in the
13 // documentation and/or other materials provided with the distribution.
14 // * Neither the name of Stanford University nor the names of its
15 // contributors may be used to endorse or promote products derived from
16 // this software without specific prior written permission.
17 //
18 // THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
19 // AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
20 // IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
21 // ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
22 // LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
23 // CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
24 // SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
25 // INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
26 // CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
27 // ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
28 // POSSIBILITY OF SUCH DAMAGE.
30 
31 
32 #include <cstdio>
33 #include <vector>
34 #include <list>
35 #include "ros/console.h"
36 #include "std_msgs/String.h"
37 #include "topic_tools/MuxSelect.h"
38 #include "topic_tools/MuxAdd.h"
39 #include "topic_tools/MuxList.h"
40 #include "topic_tools/MuxDelete.h"
42 #include "topic_tools/parse.h"
43 
44 using std::string;
45 using std::vector;
46 using std::list;
47 using namespace topic_tools;
48 
49 const static string g_none_topic = "__none";
50 
51 static ros::NodeHandle *g_node = NULL;
52 static bool g_lazy = false;
53 static bool g_latch = false;
54 static bool g_advertised = false;
55 static bool g_wait_pub_init = false;
56 static double g_wait_second = 0.1;
57 static string g_output_topic;
60 
61 struct sub_info_t
62 {
63  std::string topic_name;
66 };
67 
69 
70 static list<struct sub_info_t> g_subs;
71 static list<struct sub_info_t>::iterator g_selected = g_subs.end();
72 
74 {
75  // If we're in lazy subscribe mode, and the first subscriber just
76  // connected, then subscribe
77  if(g_lazy && g_selected != g_subs.end() && !g_selected->sub)
78  {
79  ROS_DEBUG("lazy mode; resubscribing to %s", g_selected->topic_name.c_str());
80  g_selected->sub = new ros::Subscriber(g_node->subscribe<ShapeShifter>(g_selected->topic_name, 10, boost::bind(in_cb, boost::placeholders::_1, g_selected->msg)));
81  }
82 }
83 
84 bool sel_srv_cb( topic_tools::MuxSelect::Request &req,
85  topic_tools::MuxSelect::Response &res )
86 {
87  bool ret = false;
88  if (g_selected != g_subs.end()) {
89  res.prev_topic = g_selected->topic_name;
90 
91  // Unsubscribe to old topic if lazy
92  if (g_lazy) {
93  ROS_DEBUG("Unsubscribing to %s, lazy", res.prev_topic.c_str());
94  if (g_selected->sub)
95  g_selected->sub->shutdown();
96  delete g_selected->sub;
97  g_selected->sub = NULL;
98  }
99  }
100  else
101  res.prev_topic = string("");
102 
103  // see if it's the magical '__none' topic, in which case we open the circuit
104  if (req.topic == g_none_topic)
105  {
106  ROS_INFO("mux selected to no input.");
107 
108  g_selected = g_subs.end();
109  ret = true;
110  }
111  else
112  {
113  ROS_INFO("trying to switch mux to %s", req.topic.c_str());
114  // spin through our vector of inputs and find this guy
115  for (list<struct sub_info_t>::iterator it = g_subs.begin();
116  it != g_subs.end();
117  ++it)
118  {
119  if (ros::names::resolve(it->topic_name) == ros::names::resolve(req.topic))
120  {
121  g_selected = it;
122  ROS_INFO("mux selected input: [%s]", it->topic_name.c_str());
123  ret = true;
124 
125  if (!g_selected->sub && (!g_advertised || (g_advertised && g_pub.getNumSubscribers()))) {
126  g_selected->sub = new ros::Subscriber(g_node->subscribe<ShapeShifter>(g_selected->topic_name, 10, boost::bind(in_cb, boost::placeholders::_1, g_selected->msg)));
127  }
128  }
129  }
130  }
131 
132  if(ret)
133  {
134  std_msgs::String t;
135  t.data = req.topic;
137  }
138 
139  return ret;
140 }
141 
142 bool sel_srv_cb_dep( topic_tools::MuxSelect::Request &req,
143  topic_tools::MuxSelect::Response &res )
144 {
145  ROS_WARN("the <topic>_select service is deprecated; use mux/select instead");
146  return sel_srv_cb(req,res);
147 }
148 
149 
151  ShapeShifter* s)
152 {
153  if (!g_advertised)
154  {
155  ROS_INFO("advertising");
156  g_pub = msg->advertise(*g_node, g_output_topic, 10, g_latch, conn_cb);
157  // we need sleep for publisher initialization
158  // otherwise the first topic will drop.
159  if (g_wait_pub_init) {
160  usleep(g_wait_second * 1000000);
161  }
162  g_advertised = true;
163 
164  // If lazy, unregister from all but the selected topic
165  if (g_lazy) {
166  for (static list<struct sub_info_t>::iterator it = g_subs.begin(); it != g_subs.end(); ++it) {
167  if (it != g_selected) {
168  ROS_INFO("Unregistering from %s", it->topic_name.c_str());
169  if (it->sub)
170  it->sub->shutdown();
171  delete it->sub;
172  it->sub = NULL;
173  }
174  }
175  }
176  }
177 
178  if (g_selected == g_subs.end() || s != g_selected->msg)
179  return;
180 
181  // If we're in lazy subscribe mode, and nobody's listening, then unsubscribe
182  if (g_lazy && !g_pub.getNumSubscribers() && g_selected != g_subs.end()) {
183  ROS_INFO("lazy mode; unsubscribing");
184  g_selected->sub->shutdown();
185  delete g_selected->sub;
186  g_selected->sub = NULL;
187  }
188  else
189  g_pub.publish(msg);
190 }
191 
192 bool list_topic_cb(topic_tools::MuxList::Request& req,
193  topic_tools::MuxList::Response& res)
194 {
195  (void)req;
196  for (list<struct sub_info_t>::iterator it = g_subs.begin();
197  it != g_subs.end();
198  ++it)
199  {
200  res.topics.push_back(it->topic_name);
201  }
202 
203  return true;
204 }
205 
206 bool add_topic_cb(topic_tools::MuxAdd::Request& req,
207  topic_tools::MuxAdd::Response& res)
208 {
209  (void)res;
210  // Check that it's not already in our list
211  ROS_INFO("trying to add %s to mux", req.topic.c_str());
212 
213  // Can't add the __none topic
214  if(req.topic == g_none_topic)
215  {
216  ROS_WARN("failed to add topic %s to mux, because it's reserved for special use",
217  req.topic.c_str());
218  return false;
219  }
220 
221  // spin through our vector of inputs and find this guy
222  for (list<struct sub_info_t>::iterator it = g_subs.begin();
223  it != g_subs.end();
224  ++it)
225  {
226  if (ros::names::resolve(it->topic_name) == ros::names::resolve(req.topic))
227  {
228  ROS_WARN("tried to add a topic that mux was already listening to: [%s]",
229  it->topic_name.c_str());
230  return false;
231  }
232  }
233 
234  struct sub_info_t sub_info;
235  sub_info.msg = new ShapeShifter;
236  sub_info.topic_name = ros::names::resolve(req.topic);
237  try
238  {
239  if (g_lazy)
240  sub_info.sub = NULL;
241  else
242  sub_info.sub = new ros::Subscriber(g_node->subscribe<ShapeShifter>(sub_info.topic_name, 10, boost::bind(in_cb, boost::placeholders::_1, sub_info.msg)));
243  }
244  catch(ros::InvalidNameException& e)
245  {
246  ROS_WARN("failed to add topic %s to mux, because it's an invalid name: %s",
247  req.topic.c_str(), e.what());
248  delete sub_info.msg;
249  return false;
250  }
251  g_subs.push_back(sub_info);
252 
253  ROS_INFO("added %s to mux", req.topic.c_str());
254 
255  return true;
256 }
257 
258 bool del_topic_cb(topic_tools::MuxDelete::Request& req,
259  topic_tools::MuxDelete::Response& res)
260 {
261  (void)res;
262  // Check that it's in our list
263  ROS_INFO("trying to delete %s from mux", req.topic.c_str());
264  // spin through our vector of inputs and find this guy
265  for (list<struct sub_info_t>::iterator it = g_subs.begin();
266  it != g_subs.end();
267  ++it)
268  {
269  if (ros::names::resolve(it->topic_name) == ros::names::resolve(req.topic))
270  {
271  // Can't delete the currently selected input, #2863
272  if(it == g_selected)
273  {
274  ROS_WARN("tried to delete currently selected topic %s from mux", req.topic.c_str());
275  return false;
276  }
277  if (it->sub)
278  it->sub->shutdown();
279  delete it->sub;
280  delete it->msg;
281  g_subs.erase(it);
282  ROS_INFO("deleted topic %s from mux", req.topic.c_str());
283  return true;
284  }
285  }
286 
287  ROS_WARN("tried to delete non-subscribed topic %s from mux", req.topic.c_str());
288  return false;
289 }
290 
291 int main(int argc, char **argv)
292 {
293  vector<string> args;
294  ros::removeROSArgs(argc, (const char**)argv, args);
295 
296  if (args.size() < 3)
297  {
298  printf("\nusage: mux OUT_TOPIC IN_TOPIC1 [IN_TOPIC2 [...]]\n\n");
299  return 1;
300  }
301  std::string topic_name;
302  if(!getBaseName(args[1], topic_name))
303  return 1;
304  ros::init(argc, argv, topic_name + string("_mux"),
306  vector<string> topics;
307  for (unsigned int i = 2; i < args.size(); i++)
308  topics.push_back(args[i]);
309  ros::NodeHandle n;
310  g_node = &n;
311  g_output_topic = args[1];
312  // Put our API into the "mux" namespace, which the user should usually remap
313  ros::NodeHandle mux_nh("mux"), pnh("~");
314  pnh.getParam("lazy", g_lazy);
315  pnh.getParam("latch", g_latch);
316 
317  // Latched publisher for selected input topic name
318  g_pub_selected = mux_nh.advertise<std_msgs::String>(string("selected"), 1, true);
319 
320  for (size_t i = 0; i < topics.size(); i++)
321  {
322  struct sub_info_t sub_info;
323  sub_info.msg = new ShapeShifter;
324  sub_info.topic_name = ros::names::resolve(topics[i]);
325  sub_info.sub = new ros::Subscriber(n.subscribe<ShapeShifter>(sub_info.topic_name, 10, boost::bind(in_cb, boost::placeholders::_1, sub_info.msg)));
326 
327  g_subs.push_back(sub_info);
328  }
329 
330  // Set initial input topic from optional param, defaults to second argument
331  std::string initial_topic;
332  pnh.getParam("initial_topic", initial_topic);
333  std_msgs::String t;
334  if (initial_topic.empty()) // If param is not set, default to first in list
335  {
336  g_selected = g_subs.begin();
337  t.data = g_selected->topic_name;
338  }
339  else if (initial_topic == g_none_topic) // Set no initial input if param was __none
340  {
341  ROS_INFO("mux selected to no input.");
342  g_selected = g_subs.end();
343  t.data = g_none_topic;
344  }
345  else // Attempt to set initial topic if it is in the list
346  {
347  ROS_INFO("trying to switch mux to %s", initial_topic.c_str());
348  // spin through our vector of inputs and find this guy
349  for (list<struct sub_info_t>::iterator it = g_subs.begin();
350  it != g_subs.end();
351  ++it)
352  {
353  if (ros::names::resolve(it->topic_name) == ros::names::resolve(initial_topic))
354  {
355  g_selected = it;
356  t.data = initial_topic;
357  ROS_INFO("mux selected input: [%s]", it->topic_name.c_str());
358  break;
359  }
360  }
361  if (t.data.empty()) // If it wasn't in the list, default to no input. Or should we crash here?
362  {
363  g_selected = g_subs.end();
364  t.data = g_none_topic;
365  }
366  }
367 
368  // Wait publisher initialization for few second (default: 0.1s)
369  // This option is to avoid dropping the first topic publishing.
370  pnh.getParam("wait_publisher_initialization", g_wait_pub_init);
371  pnh.getParam("wait_publisher_second", g_wait_second);
373 
374  // Backward compatibility
376  // New service
377  ros::ServiceServer ss_select = mux_nh.advertiseService(string("select"), sel_srv_cb);
378  ros::ServiceServer ss_add = mux_nh.advertiseService(string("add"), add_topic_cb);
379  ros::ServiceServer ss_list = mux_nh.advertiseService(string("list"), list_topic_cb);
380  ros::ServiceServer ss_del = mux_nh.advertiseService(string("delete"), del_topic_cb);
381  ros::spin();
382  for (list<struct sub_info_t>::iterator it = g_subs.begin();
383  it != g_subs.end();
384  ++it)
385  {
386  if (it->sub)
387  it->sub->shutdown();
388  delete it->sub;
389  delete it->msg;
390  }
391 
392  g_subs.clear();
393  return 0;
394 }
395 
ros::init_options::AnonymousName
AnonymousName
g_wait_pub_init
static bool g_wait_pub_init
Definition: mux.cpp:55
sub_info_t::topic_name
std::string topic_name
Definition: mux.cpp:63
ros::Publisher
sel_srv_cb_dep
bool sel_srv_cb_dep(topic_tools::MuxSelect::Request &req, topic_tools::MuxSelect::Response &res)
Definition: mux.cpp:142
boost::shared_ptr
ros::init
ROSCPP_DECL void init(const M_string &remappings, const std::string &name, uint32_t options=0)
shape_shifter.h
s
XmlRpcServer s
ros::NodeHandle::getParam
bool getParam(const std::string &key, bool &b) const
add_topic_cb
bool add_topic_cb(topic_tools::MuxAdd::Request &req, topic_tools::MuxAdd::Response &res)
Definition: mux.cpp:206
g_wait_second
static double g_wait_second
Definition: mux.cpp:56
sel_srv_cb
bool sel_srv_cb(topic_tools::MuxSelect::Request &req, topic_tools::MuxSelect::Response &res)
Definition: mux.cpp:84
ros::NodeHandle::advertiseService
ServiceServer advertiseService(AdvertiseServiceOptions &ops)
main
int main(int argc, char **argv)
Definition: mux.cpp:291
ros::Publisher::publish
void publish(const boost::shared_ptr< M > &message) const
ros::names::resolve
ROSCPP_DECL std::string resolve(const std::string &name, bool remap=true)
parse.h
ros::ServiceServer
console.h
ros::SingleSubscriberPublisher
g_advertised
static bool g_advertised
Definition: mux.cpp:54
del_topic_cb
bool del_topic_cb(topic_tools::MuxDelete::Request &req, topic_tools::MuxDelete::Response &res)
Definition: mux.cpp:258
ROS_DEBUG
#define ROS_DEBUG(...)
g_subs
static list< struct sub_info_t > g_subs
Definition: mux.cpp:70
sub_info_t
Definition: mux.cpp:61
ROS_WARN
#define ROS_WARN(...)
g_node
static ros::NodeHandle * g_node
Definition: mux.cpp:51
ros::NodeHandle::subscribe
Subscriber subscribe(const std::string &topic, uint32_t queue_size, const boost::function< void(C)> &callback, const VoidConstPtr &tracked_object=VoidConstPtr(), const TransportHints &transport_hints=TransportHints())
g_pub_selected
static ros::Publisher g_pub_selected
Definition: mux.cpp:59
topic_tools::getBaseName
TOPIC_TOOLS_DECL bool getBaseName(const std::string &full_name, std::string &base_name)
Definition: parse.cpp:40
g_none_topic
const static string g_none_topic
Definition: mux.cpp:49
list_topic_cb
bool list_topic_cb(topic_tools::MuxList::Request &req, topic_tools::MuxList::Response &res)
Definition: mux.cpp:192
g_pub
static ros::Publisher g_pub
Definition: mux.cpp:58
g_lazy
static bool g_lazy
Definition: mux.cpp:52
g_output_topic
static string g_output_topic
Definition: mux.cpp:57
conn_cb
void conn_cb(const ros::SingleSubscriberPublisher &)
Definition: mux.cpp:73
ros::InvalidNameException
g_selected
static list< struct sub_info_t >::iterator g_selected
Definition: mux.cpp:71
sub_info_t::msg
ShapeShifter * msg
Definition: mux.cpp:65
g_latch
static bool g_latch
Definition: mux.cpp:53
ros::spin
ROSCPP_DECL void spin()
ros::Publisher::getNumSubscribers
uint32_t getNumSubscribers() const
args
Definition: args.py:1
ros::removeROSArgs
ROSCPP_DECL void removeROSArgs(int argc, const char *const *argv, V_string &args_out)
topic_tools::ShapeShifter
Definition: shape_shifter.h:91
ROS_INFO
#define ROS_INFO(...)
topic_tools
Definition: parse.h:33
in_cb
void in_cb(const boost::shared_ptr< ShapeShifter const > &msg, ShapeShifter *s)
Definition: mux.cpp:150
ros::NodeHandle
sub_info_t::sub
ros::Subscriber * sub
Definition: mux.cpp:64
ros::Subscriber


topic_tools
Author(s): Morgan Quigley, Brian Gerkey, Dirk Thomas , Jacob Perron
autogenerated on Sat Sep 14 2024 03:00:05