demux.cpp
Go to the documentation of this file.
1 // demux is a generic ROS topic demultiplexer: one input topic is fanned out
3 // to 1 of N output topics. A service is provided to select between the outputs
4 //
5 // Copyright (C) 2009, Morgan Quigley
6 // Copyright (C) 2014, Andreas Hermann
7 //
8 // Redistribution and use in source and binary forms, with or without
9 // modification, are permitted provided that the following conditions are met:
10 // * Redistributions of source code must retain the above copyright notice,
11 // this list of conditions and the following disclaimer.
12 // * Redistributions in binary form must reproduce the above copyright
13 // notice, this list of conditions and the following disclaimer in the
14 // documentation and/or other materials provided with the distribution.
15 // * Neither the name of Stanford University nor the names of its
16 // contributors may be used to endorse or promote products derived from
17 // this software without specific prior written permission.
18 //
19 // THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
20 // AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
21 // IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
22 // ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
23 // LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
24 // CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
25 // SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
26 // INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
27 // CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
28 // ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
29 // POSSIBILITY OF SUCH DAMAGE.
31 
32 
33 #include <cstdio>
34 #include <vector>
35 #include <list>
36 #include "ros/console.h"
37 #include "std_msgs/String.h"
38 #include "topic_tools/DemuxSelect.h"
39 #include "topic_tools/DemuxAdd.h"
40 #include "topic_tools/DemuxList.h"
41 #include "topic_tools/DemuxDelete.h"
43 #include "topic_tools/parse.h"
44 
45 using std::string;
46 using std::vector;
47 using std::list;
48 using namespace topic_tools;
49 
50 const static string g_none_topic = "__none";
51 
52 static ros::NodeHandle *g_node = NULL;
53 
54 static string g_input_topic;
55 static ros::Subscriber g_sub; // the input toppic
56 static ros::Publisher g_pub_selected; // publishes name of selected publisher toppic
57 
58 struct pub_info_t
59 {
60  std::string topic_name;
62 };
63 
65 
66 static list<struct pub_info_t> g_pubs; // the list of publishers
67 static list<struct pub_info_t>::iterator g_selected = g_pubs.end();
68 
69 bool sel_srv_cb( topic_tools::DemuxSelect::Request &req,
70  topic_tools::DemuxSelect::Response &res )
71 {
72  bool ret = false;
73  if (g_selected != g_pubs.end()) {
74  res.prev_topic = g_selected->topic_name;
75  }
76  else
77  res.prev_topic = string("");
78 
79  // see if it's the magical '__none' topic, in which case we open the circuit
80  if (req.topic == g_none_topic)
81  {
82  ROS_INFO("demux selected to no output.");
83 
84  g_selected = g_pubs.end();
85  ret = true;
86  }
87  else
88  {
89  ROS_INFO("trying to switch demux to %s", req.topic.c_str());
90  // spin through our vector of inputs and find this guy
91  for (list<struct pub_info_t>::iterator it = g_pubs.begin();
92  it != g_pubs.end();
93  ++it)
94  {
95  if (ros::names::resolve(it->topic_name) == ros::names::resolve(req.topic))
96  {
97  g_selected = it;
98  ROS_INFO("demux selected output: [%s]", it->topic_name.c_str());
99  ret = true;
100  break;
101  }
102  }
103  if(!ret)
104  {
105  ROS_WARN("Failed to switch to non-existing topic %s in demux.", req.topic.c_str());
106  }
107  }
108 
109  if(ret)
110  {
111  std_msgs::String t;
112  t.data = req.topic;
113  g_pub_selected.publish(t);
114  }
115 
116  return ret;
117 }
118 
120 {
121  ROS_DEBUG("Received an incoming msg ...");
122  // when a message is incoming, check, if the requested publisher is already existing.
123  // if not, create it with the information available from the incoming message.
124  bool selected_added = false;
125  for (list<struct pub_info_t>::iterator it = g_pubs.begin(); it != g_pubs.end(); ++it) {
126  if (!it->pub)
127  {
128  if (it->topic_name == g_selected->topic_name)
129  {
130  selected_added = true;
131  }
132 
133  try
134  {
135  it->pub = new ros::Publisher(msg->advertise(*g_node, it->topic_name, 10, false));
136  }
137  catch (ros::InvalidNameException& e)
138  {
139  ROS_WARN("failed to add topic '%s' to demux, because it's an invalid name: %s",
140  it->topic_name.c_str(), e.what());
141  return;
142  }
143 
144  ROS_INFO("Added publisher %s to demux!", it->topic_name.c_str());
145  }
146  }
147 
148  if (selected_added)
149  {
150  // This is needed, because it takes some time before publisher is registered and can send out messages.
151  ROS_INFO("Sleeping 0.5 sec.");
152  ros::Duration(0.5).sleep();
153  }
154 
155  // check that we have a valid topic
156  if (!g_selected->pub) return;
157 
158  // finally: send out the message over the active publisher
159  g_selected->pub->publish(msg);
160  ROS_DEBUG("... and sent it out again!");
161 }
162 
163 bool list_topic_cb(topic_tools::DemuxList::Request& req,
164  topic_tools::DemuxList::Response& res)
165 {
166  (void)req;
167  for (list<struct pub_info_t>::iterator it = g_pubs.begin();
168  it != g_pubs.end();
169  ++it)
170  {
171  res.topics.push_back(it->topic_name);
172  }
173 
174  return true;
175 }
176 
177 bool add_topic_cb(topic_tools::DemuxAdd::Request& req,
178  topic_tools::DemuxAdd::Response& res)
179 {
180  (void)res;
181  // Check that it's not already in our list
182  ROS_INFO("trying to add %s to demux", req.topic.c_str());
183 
184  // Can't add the __none topic
185  if(req.topic == g_none_topic)
186  {
187  ROS_WARN("failed to add topic %s to demux, because it's reserved for special use",
188  req.topic.c_str());
189  return false;
190  }
191 
192  // spin through our vector of inputs and find this guy
193  for (list<struct pub_info_t>::iterator it = g_pubs.begin();
194  it != g_pubs.end();
195  ++it)
196  {
197  if (ros::names::resolve(it->topic_name) == ros::names::resolve(req.topic))
198  {
199  ROS_WARN("tried to add a topic that demux was already publishing: [%s]",
200  it->topic_name.c_str());
201  return false;
202  }
203  }
204 
205  struct pub_info_t pub_info;
206  pub_info.topic_name = ros::names::resolve(req.topic);
207  pub_info.pub = NULL;
208  g_pubs.push_back(pub_info);
209 
210  ROS_INFO("PRE added %s to demux", req.topic.c_str());
211 
212  return true;
213 }
214 
215 bool del_topic_cb(topic_tools::DemuxDelete::Request& req,
216  topic_tools::DemuxDelete::Response& res)
217 {
218  (void)res;
219  // Check that it's in our list
220  ROS_INFO("trying to delete %s from demux", req.topic.c_str());
221  // spin through our vector of inputs and find this guy
222  for (list<struct pub_info_t>::iterator it = g_pubs.begin();
223  it != g_pubs.end();
224  ++it)
225  {
226  if (ros::names::resolve(it->topic_name) == ros::names::resolve(req.topic))
227  {
228  // Can't delete the currently selected input, #2863
229  if(it == g_selected)
230  {
231  ROS_WARN("tried to delete currently selected topic %s from demux", req.topic.c_str());
232  return false;
233  }
234  if (it->pub)
235  it->pub->shutdown();
236  delete it->pub;
237  g_pubs.erase(it);
238  ROS_INFO("deleted topic %s from demux", req.topic.c_str());
239  return true;
240  }
241  }
242 
243  ROS_WARN("tried to delete non-published topic %s from demux", req.topic.c_str());
244  return false;
245 }
246 
247 int main(int argc, char **argv)
248 {
249  vector<string> args;
250  ros::removeROSArgs(argc, (const char**)argv, args);
251 
252  if (args.size() < 3)
253  {
254  printf("\nusage: demux IN_TOPIC OUT_TOPIC1 [OUT_TOPIC2 [...]]\n\n");
255  return 1;
256  }
257  std::string topic_name;
258  if(!getBaseName(args[1], topic_name))
259  return 1;
260  ros::init(argc, argv, topic_name + string("_demux"),
262  vector<string> topics;
263  for (unsigned int i = 2; i < args.size(); i++)
264  topics.push_back(args[i]);
265  ros::NodeHandle n;
266  g_node = &n;
267  g_input_topic = args[1];
268  // Put our API into the "demux" namespace, which the user should usually remap
269  ros::NodeHandle demux_nh("demux"), pnh("~");
270 
271  // Latched publisher for selected output topic name
272  g_pub_selected = demux_nh.advertise<std_msgs::String>(string("selected"), 1, true);
273 
274  for (size_t i = 0; i < topics.size(); i++)
275  {
276  struct pub_info_t pub_info;
277  pub_info.topic_name = ros::names::resolve(topics[i]);
278  pub_info.pub = NULL;
279  g_pubs.push_back(pub_info);
280  ROS_INFO("PRE added %s to demux", topics[i].c_str());
281  }
282  g_selected = g_pubs.begin(); // select first topic to start
283  std_msgs::String t;
284  t.data = g_selected->topic_name;
285  g_pub_selected.publish(t);
286 
287  // Create the one subscriber
288  g_sub = ros::Subscriber(n.subscribe<ShapeShifter>(g_input_topic, 10, boost::bind(in_cb, _1)));
289 
290 
291  // New service
292  ros::ServiceServer ss_select = demux_nh.advertiseService(string("select"), sel_srv_cb);
293  ros::ServiceServer ss_add = demux_nh.advertiseService(string("add"), add_topic_cb);
294  ros::ServiceServer ss_list = demux_nh.advertiseService(string("list"), list_topic_cb);
295  ros::ServiceServer ss_del = demux_nh.advertiseService(string("delete"), del_topic_cb);
296 
297  // Run
298  ros::spin();
299 
300  // Destruction
301  for (list<struct pub_info_t>::iterator it = g_pubs.begin();
302  it != g_pubs.end();
303  ++it)
304  {
305  if (it->pub)
306  it->pub->shutdown();
307  delete it->pub;
308  }
309 
310  g_pubs.clear();
311  return 0;
312 }
bool sel_srv_cb(topic_tools::DemuxSelect::Request &req, topic_tools::DemuxSelect::Response &res)
Definition: demux.cpp:69
bool list_topic_cb(topic_tools::DemuxList::Request &req, topic_tools::DemuxList::Response &res)
Definition: demux.cpp:163
bool del_topic_cb(topic_tools::DemuxDelete::Request &req, topic_tools::DemuxDelete::Response &res)
Definition: demux.cpp:215
Definition: args.py:1
void publish(const boost::shared_ptr< M > &message) const
Subscriber subscribe(const std::string &topic, uint32_t queue_size, void(T::*fp)(M), T *obj, const TransportHints &transport_hints=TransportHints())
bool sleep() const
ROSCPP_DECL void init(int &argc, char **argv, const std::string &name, uint32_t options=0)
int main(int argc, char **argv)
Definition: demux.cpp:247
void in_cb(const boost::shared_ptr< ShapeShifter const > &msg)
Definition: demux.cpp:119
ROSCPP_DECL std::string resolve(const std::string &name, bool remap=true)
#define ROS_WARN(...)
static ros::Subscriber g_sub
Definition: demux.cpp:55
ros::Publisher * pub
Definition: demux.cpp:61
std::string topic_name
Definition: demux.cpp:60
static string g_input_topic
Definition: demux.cpp:54
TOPIC_TOOLS_DECL bool getBaseName(const std::string &full_name, std::string &base_name)
Definition: parse.cpp:40
#define ROS_INFO(...)
static list< struct pub_info_t >::iterator g_selected
Definition: demux.cpp:67
static ros::Publisher g_pub_selected
Definition: demux.cpp:56
ROSCPP_DECL void spin()
static const string g_none_topic
Definition: demux.cpp:50
ROSCPP_DECL void removeROSArgs(int argc, const char *const *argv, V_string &args_out)
static ros::NodeHandle * g_node
Definition: demux.cpp:52
static list< struct pub_info_t > g_pubs
Definition: demux.cpp:66
bool add_topic_cb(topic_tools::DemuxAdd::Request &req, topic_tools::DemuxAdd::Response &res)
Definition: demux.cpp:177
#define ROS_DEBUG(...)


topic_tools
Author(s): Morgan Quigley, Brian Gerkey, Dirk Thomas
autogenerated on Mon Nov 2 2020 03:52:58