mux.cpp
Go to the documentation of this file.
00001 
00002 // demux is a generic ROS topic demultiplexer: one input topic is fanned out
00003 // to 1 of N output topics. A service is provided to select between the outputs
00004 //
00005 // Copyright (C) 2009, Morgan Quigley
00006 //
00007 // Redistribution and use in source and binary forms, with or without
00008 // modification, are permitted provided that the following conditions are met:
00009 //   * Redistributions of source code must retain the above copyright notice,
00010 //     this list of conditions and the following disclaimer.
00011 //   * Redistributions in binary form must reproduce the above copyright
00012 //     notice, this list of conditions and the following disclaimer in the
00013 //     documentation and/or other materials provided with the distribution.
00014 //   * Neither the name of Stanford University nor the names of its
00015 //     contributors may be used to endorse or promote products derived from
00016 //     this software without specific prior written permission.
00017 //
00018 // THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
00019 // AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
00020 // IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
00021 // ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
00022 // LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
00023 // CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
00024 // SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
00025 // INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
00026 // CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
00027 // ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
00028 // POSSIBILITY OF SUCH DAMAGE.
00030 
00031 
00032 #include <cstdio>
00033 #include <vector>
00034 #include <list>
00035 #include "ros/console.h"
00036 #include "std_msgs/String.h"
00037 #include "topic_tools/MuxSelect.h"
00038 #include "topic_tools/MuxAdd.h"
00039 #include "topic_tools/MuxList.h"
00040 #include "topic_tools/MuxDelete.h"
00041 #include "topic_tools/shape_shifter.h"
00042 #include "topic_tools/parse.h"
00043 
00044 using std::string;
00045 using std::vector;
00046 using std::list;
00047 using namespace topic_tools;
00048 
00049 const static string g_none_topic = "__none";
00050 
00051 static ros::NodeHandle *g_node = NULL;
00052 static bool g_lazy = false;
00053 static bool g_advertised = false;
00054 static string g_output_topic;
00055 static ros::Publisher g_pub;
00056 static ros::Publisher g_pub_selected;
00057 
00058 struct sub_info_t
00059 {
00060   std::string topic_name;
00061   ros::Subscriber *sub;
00062   ShapeShifter* msg;
00063 };
00064 
00065 void in_cb(const boost::shared_ptr<ShapeShifter const>& msg, ShapeShifter* s);
00066 
00067 static list<struct sub_info_t> g_subs;
00068 static list<struct sub_info_t>::iterator g_selected = g_subs.end();
00069 
00070 void conn_cb(const ros::SingleSubscriberPublisher&)
00071 {
00072   // If we're in lazy subscribe mode, and the first subscriber just
00073   // connected, then subscribe
00074   if(g_lazy && g_selected != g_subs.end() && !g_selected->sub)
00075   {
00076     ROS_DEBUG("lazy mode; resubscribing to %s", g_selected->topic_name.c_str());
00077     g_selected->sub = new ros::Subscriber(g_node->subscribe<ShapeShifter>(g_selected->topic_name, 10, boost::bind(in_cb, _1, g_selected->msg)));
00078   }
00079 }
00080 
00081 bool sel_srv_cb( topic_tools::MuxSelect::Request  &req,
00082                  topic_tools::MuxSelect::Response &res )
00083 {
00084   bool ret = false;
00085   if (g_selected != g_subs.end()) {
00086     res.prev_topic = g_selected->topic_name;
00087 
00088     // Unsubscribe to old topic if lazy
00089     if (g_lazy) {
00090       ROS_DEBUG("Unsubscribing to %s, lazy", res.prev_topic.c_str());
00091       if (g_selected->sub)
00092         g_selected->sub->shutdown();
00093       delete g_selected->sub;
00094       g_selected->sub = NULL;
00095     }
00096   }
00097   else
00098     res.prev_topic = string("");
00099 
00100   // see if it's the magical '__none' topic, in which case we open the circuit
00101   if (req.topic == g_none_topic)
00102   {
00103     ROS_INFO("mux selected to no input.");
00104 
00105     g_selected = g_subs.end();
00106     ret = true;
00107   }
00108   else
00109   {
00110     ROS_INFO("trying to switch mux to %s", req.topic.c_str());
00111     // spin through our vector of inputs and find this guy
00112     for (list<struct sub_info_t>::iterator it = g_subs.begin();
00113          it != g_subs.end();
00114          ++it)
00115     {
00116       if (ros::names::resolve(it->topic_name) == ros::names::resolve(req.topic))
00117       {
00118         g_selected = it;
00119         ROS_INFO("mux selected input: [%s]", it->topic_name.c_str());
00120         ret = true;
00121         
00122         if (!g_selected->sub && (!g_advertised || (g_advertised && g_pub.getNumSubscribers()))) {
00123           g_selected->sub = new ros::Subscriber(g_node->subscribe<ShapeShifter>(g_selected->topic_name, 10, boost::bind(in_cb, _1, g_selected->msg)));
00124         }
00125       }
00126     }
00127   }
00128   
00129   if(ret)
00130   {
00131     std_msgs::String t;
00132     t.data = req.topic;
00133     g_pub_selected.publish(t);
00134   }
00135 
00136   return ret;
00137 }
00138 
00139 bool sel_srv_cb_dep( topic_tools::MuxSelect::Request  &req,
00140                      topic_tools::MuxSelect::Response &res )
00141 {
00142   ROS_WARN("the <topic>_select service is deprecated; use mux/select instead");
00143   return sel_srv_cb(req,res);
00144 }
00145 
00146 
00147 void in_cb(const boost::shared_ptr<ShapeShifter const>& msg,
00148            ShapeShifter* s)
00149 {
00150   if (!g_advertised)
00151   {
00152     ROS_INFO("advertising");
00153     g_pub = msg->advertise(*g_node, g_output_topic, 10, false, conn_cb);
00154     g_advertised = true;
00155     
00156     // If lazy, unregister from all but the selected topic
00157     if (g_lazy) {
00158       for (static list<struct sub_info_t>::iterator it = g_subs.begin(); it != g_subs.end(); ++it) {
00159         if (it != g_selected) {
00160           ROS_INFO("Unregistering from %s", it->topic_name.c_str());
00161           if (it->sub)
00162             it->sub->shutdown();
00163           delete it->sub;
00164           it->sub = NULL;
00165         }
00166                 }
00167     }
00168   }
00169   
00170   if (s != g_selected->msg)
00171     return;
00172   
00173   // If we're in lazy subscribe mode, and nobody's listening, then unsubscribe
00174   if (g_lazy && !g_pub.getNumSubscribers() && g_selected != g_subs.end()) {
00175     ROS_INFO("lazy mode; unsubscribing");
00176     g_selected->sub->shutdown();
00177     delete g_selected->sub;
00178     g_selected->sub = NULL;
00179   }
00180   else
00181     g_pub.publish(msg);
00182 }
00183 
00184 bool list_topic_cb(topic_tools::MuxList::Request& req,
00185                    topic_tools::MuxList::Response& res)
00186 {
00187   (void)req;
00188   for (list<struct sub_info_t>::iterator it = g_subs.begin();
00189        it != g_subs.end();
00190        ++it)
00191   {
00192     res.topics.push_back(it->topic_name);
00193   }
00194 
00195   return true;
00196 }
00197 
00198 bool add_topic_cb(topic_tools::MuxAdd::Request& req,
00199                   topic_tools::MuxAdd::Response& res)
00200 {
00201   (void)res;
00202   // Check that it's not already in our list
00203   ROS_INFO("trying to add %s to mux", req.topic.c_str());
00204   
00205   // Can't add the __none topic
00206   if(req.topic == g_none_topic)
00207   {
00208     ROS_WARN("failed to add topic %s to mux, because it's reserved for special use",
00209              req.topic.c_str());
00210     return false;
00211   }
00212 
00213   // spin through our vector of inputs and find this guy
00214   for (list<struct sub_info_t>::iterator it = g_subs.begin();
00215        it != g_subs.end();
00216        ++it)
00217   {
00218     if (ros::names::resolve(it->topic_name) == ros::names::resolve(req.topic))
00219     {
00220       ROS_WARN("tried to add a topic that mux was already listening to: [%s]", 
00221                it->topic_name.c_str());
00222       return false;
00223     }
00224   }
00225 
00226   struct sub_info_t sub_info;
00227   sub_info.msg = new ShapeShifter;
00228   sub_info.topic_name = ros::names::resolve(req.topic);
00229   try
00230   {
00231     if (g_lazy)
00232       sub_info.sub = NULL;
00233     else
00234       sub_info.sub = new ros::Subscriber(g_node->subscribe<ShapeShifter>(sub_info.topic_name, 10, boost::bind(in_cb, _1, sub_info.msg)));
00235   }
00236   catch(ros::InvalidNameException& e)
00237   {
00238     ROS_WARN("failed to add topic %s to mux, because it's an invalid name: %s",
00239              req.topic.c_str(), e.what());
00240     delete sub_info.msg;
00241     return false;
00242   }
00243   g_subs.push_back(sub_info);
00244 
00245   ROS_INFO("added %s to mux", req.topic.c_str());
00246 
00247   return true;
00248 }
00249 
00250 bool del_topic_cb(topic_tools::MuxDelete::Request& req,
00251                   topic_tools::MuxDelete::Response& res)
00252 {
00253   (void)res;
00254   // Check that it's in our list
00255   ROS_INFO("trying to delete %s from mux", req.topic.c_str());
00256   // spin through our vector of inputs and find this guy
00257   for (list<struct sub_info_t>::iterator it = g_subs.begin();
00258        it != g_subs.end();
00259        ++it)
00260   {
00261     if (ros::names::resolve(it->topic_name) == ros::names::resolve(req.topic))
00262     {
00263       // Can't delete the currently selected input, #2863
00264       if(it == g_selected)
00265       {
00266         ROS_WARN("tried to delete currently selected topic %s from mux", req.topic.c_str());
00267         return false;
00268       }
00269       if (it->sub)
00270         it->sub->shutdown();
00271       delete it->sub;
00272       delete it->msg;
00273       g_subs.erase(it);
00274       ROS_INFO("deleted topic %s from mux", req.topic.c_str());
00275       return true;
00276     }
00277   }
00278 
00279   ROS_WARN("tried to delete non-subscribed topic %s from mux", req.topic.c_str());
00280   return false;
00281 }
00282 
00283 int main(int argc, char **argv)
00284 {
00285   vector<string> args;
00286   ros::removeROSArgs(argc, (const char**)argv, args);
00287 
00288   if (args.size() < 3)
00289   {
00290     printf("\nusage: mux OUT_TOPIC IN_TOPIC1 [IN_TOPIC2 [...]]\n\n");
00291     return 1;
00292   }
00293   std::string topic_name;
00294   if(!getBaseName(args[1], topic_name))
00295     return 1;
00296   ros::init(argc, argv, topic_name + string("_mux"),
00297             ros::init_options::AnonymousName);
00298   vector<string> topics;
00299   for (unsigned int i = 2; i < args.size(); i++)
00300     topics.push_back(args[i]);
00301   ros::NodeHandle n;
00302   g_node = &n;
00303   g_output_topic = args[1];
00304   // Put our API into the "mux" namespace, which the user should usually remap
00305   ros::NodeHandle mux_nh("mux"), pnh("~");
00306   pnh.getParam("lazy", g_lazy);
00307 
00308   // Latched publisher for selected input topic name
00309   g_pub_selected = mux_nh.advertise<std_msgs::String>(string("selected"), 1, true);
00310 
00311   for (size_t i = 0; i < topics.size(); i++)
00312   {
00313     struct sub_info_t sub_info;
00314     sub_info.msg = new ShapeShifter;
00315     sub_info.topic_name = ros::names::resolve(topics[i]);
00316     sub_info.sub = new ros::Subscriber(n.subscribe<ShapeShifter>(sub_info.topic_name, 10, boost::bind(in_cb, _1, sub_info.msg)));
00317 
00318     g_subs.push_back(sub_info);
00319   }
00320   g_selected = g_subs.begin(); // select first topic to start
00321   std_msgs::String t;
00322   t.data = g_selected->topic_name;
00323   g_pub_selected.publish(t);
00324 
00325   // Backward compatibility
00326   ros::ServiceServer ss = n.advertiseService(g_output_topic + string("_select"), sel_srv_cb_dep);
00327   // New service
00328   ros::ServiceServer ss_select = mux_nh.advertiseService(string("select"), sel_srv_cb);
00329   ros::ServiceServer ss_add = mux_nh.advertiseService(string("add"), add_topic_cb);
00330   ros::ServiceServer ss_list = mux_nh.advertiseService(string("list"), list_topic_cb);
00331   ros::ServiceServer ss_del = mux_nh.advertiseService(string("delete"), del_topic_cb);
00332   ros::spin();
00333   for (list<struct sub_info_t>::iterator it = g_subs.begin();
00334        it != g_subs.end();
00335        ++it)
00336   {
00337     if (it->sub)
00338       it->sub->shutdown();
00339     delete it->sub;
00340     delete it->msg;
00341   }
00342 
00343   g_subs.clear();
00344   return 0;
00345 }
00346 


topic_tools
Author(s): Morgan Quigley, Brian Gerkey
autogenerated on Tue Mar 7 2017 03:45:45