mux.cpp
Go to the documentation of this file.
00001 
00002 // demux is a generic ROS topic demultiplexer: one input topic is fanned out
00003 // to 1 of N output topics. A service is provided to select between the outputs
00004 //
00005 // Copyright (C) 2009, Morgan Quigley
00006 //
00007 // Redistribution and use in source and binary forms, with or without
00008 // modification, are permitted provided that the following conditions are met:
00009 //   * Redistributions of source code must retain the above copyright notice,
00010 //     this list of conditions and the following disclaimer.
00011 //   * Redistributions in binary form must reproduce the above copyright
00012 //     notice, this list of conditions and the following disclaimer in the
00013 //     documentation and/or other materials provided with the distribution.
00014 //   * Neither the name of Stanford University nor the names of its
00015 //     contributors may be used to endorse or promote products derived from
00016 //     this software without specific prior written permission.
00017 //
00018 // THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
00019 // AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
00020 // IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
00021 // ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
00022 // LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
00023 // CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
00024 // SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
00025 // INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
00026 // CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
00027 // ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
00028 // POSSIBILITY OF SUCH DAMAGE.
00030 
00031 
00032 #include <cstdio>
00033 #include <vector>
00034 #include <list>
00035 #include "ros/console.h"
00036 #include "std_msgs/String.h"
00037 #include "topic_tools/MuxSelect.h"
00038 #include "topic_tools/MuxAdd.h"
00039 #include "topic_tools/MuxList.h"
00040 #include "topic_tools/MuxDelete.h"
00041 #include "topic_tools/shape_shifter.h"
00042 #include "topic_tools/parse.h"
00043 
00044 using std::string;
00045 using std::vector;
00046 using std::list;
00047 using namespace topic_tools;
00048 
00049 const static string g_none_topic = "__none";
00050 
00051 static ros::NodeHandle *g_node = NULL;
00052 static bool g_lazy = false;
00053 static bool g_advertised = false;
00054 static string g_output_topic;
00055 static ros::Publisher g_pub;
00056 static ros::Publisher g_pub_selected;
00057 
00058 struct sub_info_t
00059 {
00060   std::string topic_name;
00061   ros::Subscriber *sub;
00062   ShapeShifter* msg;
00063 };
00064 
00065 void in_cb(const boost::shared_ptr<ShapeShifter const>& msg, ShapeShifter* s);
00066 
00067 static list<struct sub_info_t> g_subs;
00068 static list<struct sub_info_t>::iterator g_selected = g_subs.end();
00069 
00070 void conn_cb(const ros::SingleSubscriberPublisher&)
00071 {
00072   // If we're in lazy subscribe mode, and the first subscriber just
00073   // connected, then subscribe
00074   if(g_lazy && g_selected != g_subs.end() && !g_selected->sub)
00075   {
00076     ROS_DEBUG("lazy mode; resubscribing to %s", g_selected->topic_name.c_str());
00077     g_selected->sub = new ros::Subscriber(g_node->subscribe<ShapeShifter>(g_selected->topic_name, 10, boost::bind(in_cb, _1, g_selected->msg)));
00078   }
00079 }
00080 
00081 bool sel_srv_cb( topic_tools::MuxSelect::Request  &req,
00082                  topic_tools::MuxSelect::Response &res )
00083 {
00084   bool ret = false;
00085   if (g_selected != g_subs.end()) {
00086     res.prev_topic = g_selected->topic_name;
00087 
00088     // Unsubscribe to old topic if lazy
00089     if (g_lazy) {
00090       ROS_DEBUG("Unsubscribing to %s, lazy", res.prev_topic.c_str());
00091       if (g_selected->sub)
00092         g_selected->sub->shutdown();
00093       delete g_selected->sub;
00094       g_selected->sub = NULL;
00095     }
00096   }
00097   else
00098     res.prev_topic = string("");
00099 
00100   // see if it's the magical '__none' topic, in which case we open the circuit
00101   if (req.topic == g_none_topic)
00102   {
00103     ROS_INFO("mux selected to no input.");
00104 
00105     g_selected = g_subs.end();
00106     ret = true;
00107   }
00108   else
00109   {
00110     ROS_INFO("trying to switch mux to %s", req.topic.c_str());
00111     // spin through our vector of inputs and find this guy
00112     for (list<struct sub_info_t>::iterator it = g_subs.begin();
00113          it != g_subs.end();
00114          ++it)
00115     {
00116       if (ros::names::resolve(it->topic_name) == ros::names::resolve(req.topic))
00117       {
00118         g_selected = it;
00119         ROS_INFO("mux selected input: [%s]", it->topic_name.c_str());
00120         ret = true;
00121         
00122         if (!g_selected->sub && (!g_advertised || (g_advertised && g_pub.getNumSubscribers()))) {
00123           g_selected->sub = new ros::Subscriber(g_node->subscribe<ShapeShifter>(g_selected->topic_name, 10, boost::bind(in_cb, _1, g_selected->msg)));
00124         }
00125       }
00126     }
00127   }
00128   
00129   if(ret)
00130   {
00131     std_msgs::String t;
00132     t.data = req.topic;
00133     g_pub_selected.publish(t);
00134   }
00135 
00136   return ret;
00137 }
00138 
00139 bool sel_srv_cb_dep( topic_tools::MuxSelect::Request  &req,
00140                      topic_tools::MuxSelect::Response &res )
00141 {
00142   ROS_WARN("the <topic>_select service is deprecated; use mux/select instead");
00143   return sel_srv_cb(req,res);
00144 }
00145 
00146 
00147 void in_cb(const boost::shared_ptr<ShapeShifter const>& msg,
00148            ShapeShifter* s)
00149 {
00150   if (!g_advertised)
00151   {
00152     ROS_INFO("advertising");
00153     g_pub = msg->advertise(*g_node, g_output_topic, 10, false, conn_cb);
00154     g_advertised = true;
00155     
00156     // If lazy, unregister from all but the selected topic
00157     if (g_lazy) {
00158       for (static list<struct sub_info_t>::iterator it = g_subs.begin(); it != g_subs.end(); ++it) {
00159         if (it != g_selected) {
00160           ROS_INFO("Unregistering from %s", it->topic_name.c_str());
00161           if (it->sub)
00162             it->sub->shutdown();
00163           delete it->sub;
00164           it->sub = NULL;
00165         }
00166                 }
00167     }
00168   }
00169   
00170   if (s != g_selected->msg)
00171     return;
00172   
00173   // If we're in lazy subscribe mode, and nobody's listening, then unsubscribe
00174   if (g_lazy && !g_pub.getNumSubscribers() && g_selected != g_subs.end()) {
00175     ROS_INFO("lazy mode; unsubscribing");
00176     g_selected->sub->shutdown();
00177     delete g_selected->sub;
00178     g_selected->sub = NULL;
00179   }
00180   else
00181     g_pub.publish(msg);
00182 }
00183 
00184 bool list_topic_cb(topic_tools::MuxList::Request& req,
00185                    topic_tools::MuxList::Response& res)
00186 {
00187   for (list<struct sub_info_t>::iterator it = g_subs.begin();
00188        it != g_subs.end();
00189        ++it)
00190   {
00191     res.topics.push_back(it->topic_name);
00192   }
00193 
00194   return true;
00195 }
00196 
00197 bool add_topic_cb(topic_tools::MuxAdd::Request& req,
00198                   topic_tools::MuxAdd::Response& res)
00199 {
00200   // Check that it's not already in our list
00201   ROS_INFO("trying to add %s to mux", req.topic.c_str());
00202   
00203   // Can't add the __none topic
00204   if(req.topic == g_none_topic)
00205   {
00206     ROS_WARN("failed to add topic %s to mux, because it's reserved for special use",
00207              req.topic.c_str());
00208     return false;
00209   }
00210 
00211   // spin through our vector of inputs and find this guy
00212   for (list<struct sub_info_t>::iterator it = g_subs.begin();
00213        it != g_subs.end();
00214        ++it)
00215   {
00216     if (ros::names::resolve(it->topic_name) == ros::names::resolve(req.topic))
00217     {
00218       ROS_WARN("tried to add a topic that mux was already listening to: [%s]", 
00219                it->topic_name.c_str());
00220       return false;
00221     }
00222   }
00223 
00224   struct sub_info_t sub_info;
00225   sub_info.msg = new ShapeShifter;
00226   sub_info.topic_name = ros::names::resolve(req.topic);
00227   try
00228   {
00229     if (g_lazy)
00230       sub_info.sub = NULL;
00231     else
00232       sub_info.sub = new ros::Subscriber(g_node->subscribe<ShapeShifter>(sub_info.topic_name, 10, boost::bind(in_cb, _1, sub_info.msg)));
00233   }
00234   catch(ros::InvalidNameException& e)
00235   {
00236     ROS_WARN("failed to add topic %s to mux, because it's an invalid name: %s",
00237              req.topic.c_str(), e.what());
00238     delete sub_info.msg;
00239     return false;
00240   }
00241   g_subs.push_back(sub_info);
00242 
00243   ROS_INFO("added %s to mux", req.topic.c_str());
00244 
00245   return true;
00246 }
00247 
00248 bool del_topic_cb(topic_tools::MuxDelete::Request& req,
00249                   topic_tools::MuxDelete::Response& res)
00250 {
00251   // Check that it's in our list
00252   ROS_INFO("trying to delete %s from mux", req.topic.c_str());
00253   // spin through our vector of inputs and find this guy
00254   for (list<struct sub_info_t>::iterator it = g_subs.begin();
00255        it != g_subs.end();
00256        ++it)
00257   {
00258     if (ros::names::resolve(it->topic_name) == ros::names::resolve(req.topic))
00259     {
00260       // Can't delete the currently selected input, #2863
00261       if(it == g_selected)
00262       {
00263         ROS_WARN("tried to delete currently selected topic %s from mux", req.topic.c_str());
00264         return false;
00265       }
00266       if (it->sub)
00267         it->sub->shutdown();
00268       delete it->sub;
00269       delete it->msg;
00270       g_subs.erase(it);
00271       ROS_INFO("deleted topic %s from mux", req.topic.c_str());
00272       return true;
00273     }
00274   }
00275 
00276   ROS_WARN("tried to delete non-subscribed topic %s from mux", req.topic.c_str());
00277   return false;
00278 }
00279 
00280 int main(int argc, char **argv)
00281 {
00282   vector<string> args;
00283   ros::removeROSArgs(argc, (const char**)argv, args);
00284 
00285   if (args.size() < 3)
00286   {
00287     printf("\nusage: mux OUT_TOPIC IN_TOPIC1 [IN_TOPIC2 [...]]\n\n");
00288     return 1;
00289   }
00290   std::string topic_name;
00291   if(!getBaseName(args[1], topic_name))
00292     return 1;
00293   ros::init(argc, argv, topic_name + string("_mux"),
00294             ros::init_options::AnonymousName);
00295   vector<string> topics;
00296   for (unsigned int i = 2; i < args.size(); i++)
00297     topics.push_back(args[i]);
00298   ros::NodeHandle n;
00299   g_node = &n;
00300   g_output_topic = args[1];
00301   // Put our API into the "mux" namespace, which the user should usually remap
00302   ros::NodeHandle mux_nh("mux"), pnh("~");
00303   pnh.getParam("lazy", g_lazy);
00304 
00305   // Latched publisher for selected input topic name
00306   g_pub_selected = mux_nh.advertise<std_msgs::String>(string("selected"), 1, true);
00307 
00308   for (size_t i = 0; i < topics.size(); i++)
00309   {
00310     struct sub_info_t sub_info;
00311     sub_info.msg = new ShapeShifter;
00312     sub_info.topic_name = ros::names::resolve(topics[i]);
00313     sub_info.sub = new ros::Subscriber(n.subscribe<ShapeShifter>(sub_info.topic_name, 10, boost::bind(in_cb, _1, sub_info.msg)));
00314 
00315     g_subs.push_back(sub_info);
00316   }
00317   g_selected = g_subs.begin(); // select first topic to start
00318   std_msgs::String t;
00319   t.data = g_selected->topic_name;
00320   g_pub_selected.publish(t);
00321 
00322   // Backward compatibility
00323   ros::ServiceServer ss = n.advertiseService(g_output_topic + string("_select"), sel_srv_cb_dep);
00324   // New service
00325   ros::ServiceServer ss_select = mux_nh.advertiseService(string("select"), sel_srv_cb);
00326   ros::ServiceServer ss_add = mux_nh.advertiseService(string("add"), add_topic_cb);
00327   ros::ServiceServer ss_list = mux_nh.advertiseService(string("list"), list_topic_cb);
00328   ros::ServiceServer ss_del = mux_nh.advertiseService(string("delete"), del_topic_cb);
00329   ros::spin();
00330   for (list<struct sub_info_t>::iterator it = g_subs.begin();
00331        it != g_subs.end();
00332        ++it)
00333   {
00334     if (it->sub)
00335       it->sub->shutdown();
00336     delete it->sub;
00337     delete it->msg;
00338   }
00339 
00340   g_subs.clear();
00341   return 0;
00342 }
00343 


topic_tools
Author(s): Morgan Quigley, Brian Gerkey
autogenerated on Fri Aug 28 2015 12:33:48