$search
00001 00002 // demux is a generic ROS topic demultiplexer: one input topic is fanned out 00003 // to 1 of N output topics. A service is provided to select between the outputs 00004 // 00005 // Copyright (C) 2009, Morgan Quigley 00006 // 00007 // Redistribution and use in source and binary forms, with or without 00008 // modification, are permitted provided that the following conditions are met: 00009 // * Redistributions of source code must retain the above copyright notice, 00010 // this list of conditions and the following disclaimer. 00011 // * Redistributions in binary form must reproduce the above copyright 00012 // notice, this list of conditions and the following disclaimer in the 00013 // documentation and/or other materials provided with the distribution. 00014 // * Neither the name of Stanford University nor the names of its 00015 // contributors may be used to endorse or promote products derived from 00016 // this software without specific prior written permission. 00017 // 00018 // THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" 00019 // AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 00020 // IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 00021 // ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE 00022 // LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 00023 // CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 00024 // SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 00025 // INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 00026 // CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) 00027 // ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 00028 // POSSIBILITY OF SUCH DAMAGE. 00030 00031 00032 #include <cstdio> 00033 #include <vector> 00034 #include <list> 00035 #include "ros/console.h" 00036 #include "std_msgs/String.h" 00037 #include "topic_tools/MuxSelect.h" 00038 #include "topic_tools/MuxAdd.h" 00039 #include "topic_tools/MuxList.h" 00040 #include "topic_tools/MuxDelete.h" 00041 #include "topic_tools/shape_shifter.h" 00042 #include "topic_tools/parse.h" 00043 00044 using std::string; 00045 using std::vector; 00046 using std::list; 00047 using namespace topic_tools; 00048 00049 const static string g_none_topic = "__none"; 00050 00051 static ros::NodeHandle *g_node = NULL; 00052 static bool g_lazy = false; 00053 static bool g_advertised = false; 00054 static string g_output_topic; 00055 static ros::Publisher g_pub; 00056 static ros::Publisher g_pub_selected; 00057 00058 struct sub_info_t 00059 { 00060 std::string topic_name; 00061 ros::Subscriber *sub; 00062 ShapeShifter* msg; 00063 }; 00064 00065 void in_cb(const boost::shared_ptr<ShapeShifter const>& msg, ShapeShifter* s); 00066 00067 static list<struct sub_info_t> g_subs; 00068 static list<struct sub_info_t>::iterator g_selected = g_subs.end(); 00069 00070 void conn_cb(const ros::SingleSubscriberPublisher&) 00071 { 00072 // If we're in lazy subscribe mode, and the first subscriber just 00073 // connected, then subscribe 00074 if(g_lazy && g_selected != g_subs.end() && !g_selected->sub) 00075 { 00076 ROS_DEBUG("lazy mode; resubscribing to %s", g_selected->topic_name.c_str()); 00077 g_selected->sub = new ros::Subscriber(g_node->subscribe<ShapeShifter>(g_selected->topic_name, 10, boost::bind(in_cb, _1, g_selected->msg))); 00078 } 00079 } 00080 00081 bool sel_srv_cb( topic_tools::MuxSelect::Request &req, 00082 topic_tools::MuxSelect::Response &res ) 00083 { 00084 bool ret = false; 00085 if (g_selected != g_subs.end()) { 00086 res.prev_topic = g_selected->topic_name; 00087 00088 // Unsubscribe to old topic if lazy 00089 if (g_lazy) { 00090 ROS_DEBUG("Unsubscribing to %s, lazy", res.prev_topic.c_str()); 00091 if (g_selected->sub) 00092 g_selected->sub->shutdown(); 00093 delete g_selected->sub; 00094 g_selected->sub = NULL; 00095 } 00096 } 00097 else 00098 res.prev_topic = string(""); 00099 00100 // see if it's the magical '__none' topic, in which case we open the circuit 00101 if (req.topic == g_none_topic) 00102 { 00103 ROS_INFO("mux selected to no input."); 00104 00105 g_selected = g_subs.end(); 00106 ret = true; 00107 } 00108 else 00109 { 00110 ROS_INFO("trying to switch mux to %s", req.topic.c_str()); 00111 // spin through our vector of inputs and find this guy 00112 for (list<struct sub_info_t>::iterator it = g_subs.begin(); 00113 it != g_subs.end(); 00114 ++it) 00115 { 00116 if (ros::names::resolve(it->topic_name) == ros::names::resolve(req.topic)) 00117 { 00118 g_selected = it; 00119 ROS_INFO("mux selected input: [%s]", it->topic_name.c_str()); 00120 ret = true; 00121 00122 if (!g_selected->sub && (!g_advertised || (g_advertised && g_pub.getNumSubscribers()))) { 00123 g_selected->sub = new ros::Subscriber(g_node->subscribe<ShapeShifter>(g_selected->topic_name, 10, boost::bind(in_cb, _1, g_selected->msg))); 00124 } 00125 } 00126 } 00127 } 00128 00129 if(ret) 00130 { 00131 std_msgs::String t; 00132 t.data = req.topic; 00133 g_pub_selected.publish(t); 00134 } 00135 00136 return ret; 00137 } 00138 00139 bool sel_srv_cb_dep( topic_tools::MuxSelect::Request &req, 00140 topic_tools::MuxSelect::Response &res ) 00141 { 00142 ROS_WARN("the <topic>_select service is deprecated; use mux/select instead"); 00143 return sel_srv_cb(req,res); 00144 } 00145 00146 00147 void in_cb(const boost::shared_ptr<ShapeShifter const>& msg, 00148 ShapeShifter* s) 00149 { 00150 if (!g_advertised) 00151 { 00152 ROS_INFO("advertising"); 00153 g_pub = msg->advertise(*g_node, g_output_topic, 10, false, conn_cb); 00154 g_advertised = true; 00155 00156 // If lazy, unregister from all but the selected topic 00157 if (g_lazy) { 00158 for (static list<struct sub_info_t>::iterator it = g_subs.begin(); it != g_subs.end(); ++it) { 00159 if (it != g_selected) { 00160 ROS_INFO("Unregistering from %s", it->topic_name.c_str()); 00161 if (it->sub) 00162 it->sub->shutdown(); 00163 delete it->sub; 00164 it->sub = NULL; 00165 } 00166 } 00167 } 00168 } 00169 00170 if (s != g_selected->msg) 00171 return; 00172 00173 // If we're in lazy subscribe mode, and nobody's listening, then unsubscribe 00174 if (g_lazy && !g_pub.getNumSubscribers() && g_selected != g_subs.end()) { 00175 ROS_INFO("lazy mode; unsubscribing"); 00176 g_selected->sub->shutdown(); 00177 delete g_selected->sub; 00178 g_selected->sub = NULL; 00179 } 00180 else 00181 g_pub.publish(msg); 00182 } 00183 00184 bool list_topic_cb(topic_tools::MuxList::Request& req, 00185 topic_tools::MuxList::Response& res) 00186 { 00187 for (list<struct sub_info_t>::iterator it = g_subs.begin(); 00188 it != g_subs.end(); 00189 ++it) 00190 { 00191 res.topics.push_back(it->topic_name); 00192 } 00193 00194 return true; 00195 } 00196 00197 bool add_topic_cb(topic_tools::MuxAdd::Request& req, 00198 topic_tools::MuxAdd::Response& res) 00199 { 00200 // Check that it's not already in our list 00201 ROS_INFO("trying to add %s to mux", req.topic.c_str()); 00202 00203 // Can't add the __none topic 00204 if(req.topic == g_none_topic) 00205 { 00206 ROS_WARN("failed to add topic %s to mux, because it's reserved for special use", 00207 req.topic.c_str()); 00208 return false; 00209 } 00210 00211 // spin through our vector of inputs and find this guy 00212 for (list<struct sub_info_t>::iterator it = g_subs.begin(); 00213 it != g_subs.end(); 00214 ++it) 00215 { 00216 if (ros::names::resolve(it->topic_name) == ros::names::resolve(req.topic)) 00217 { 00218 ROS_WARN("tried to add a topic that mux was already listening to: [%s]", 00219 it->topic_name.c_str()); 00220 return false; 00221 } 00222 } 00223 00224 struct sub_info_t sub_info; 00225 sub_info.msg = new ShapeShifter; 00226 sub_info.topic_name = ros::names::resolve(req.topic); 00227 try 00228 { 00229 if (g_lazy) 00230 sub_info.sub = NULL; 00231 else 00232 sub_info.sub = new ros::Subscriber(g_node->subscribe<ShapeShifter>(sub_info.topic_name, 10, boost::bind(in_cb, _1, sub_info.msg))); 00233 } 00234 catch(ros::InvalidNameException& e) 00235 { 00236 ROS_WARN("failed to add topic %s to mux, because it's an invalid name: %s", 00237 req.topic.c_str(), e.what()); 00238 delete sub_info.msg; 00239 return false; 00240 } 00241 g_subs.push_back(sub_info); 00242 00243 ROS_INFO("added %s to mux", req.topic.c_str()); 00244 00245 return true; 00246 } 00247 00248 bool del_topic_cb(topic_tools::MuxDelete::Request& req, 00249 topic_tools::MuxDelete::Response& res) 00250 { 00251 // Check that it's in our list 00252 ROS_INFO("trying to delete %s from mux", req.topic.c_str()); 00253 // spin through our vector of inputs and find this guy 00254 for (list<struct sub_info_t>::iterator it = g_subs.begin(); 00255 it != g_subs.end(); 00256 ++it) 00257 { 00258 if (ros::names::resolve(it->topic_name) == ros::names::resolve(req.topic)) 00259 { 00260 // Can't delete the currently selected input, #2863 00261 if(it == g_selected) 00262 { 00263 ROS_WARN("tried to delete currently selected topic %s from mux", req.topic.c_str()); 00264 return false; 00265 } 00266 if (it->sub) 00267 it->sub->shutdown(); 00268 delete it->sub; 00269 delete it->msg; 00270 g_subs.erase(it); 00271 ROS_INFO("deleted topic %s from mux", req.topic.c_str()); 00272 return true; 00273 } 00274 } 00275 00276 ROS_WARN("tried to delete non-subscribed topic %s from mux", req.topic.c_str()); 00277 return false; 00278 } 00279 00280 int main(int argc, char **argv) 00281 { 00282 vector<string> args; 00283 ros::removeROSArgs(argc, (const char**)argv, args); 00284 00285 if (args.size() < 3) 00286 { 00287 printf("\nusage: mux OUT_TOPIC IN_TOPIC1 [IN_TOPIC2 [...]]\n\n"); 00288 return 1; 00289 } 00290 std::string topic_name; 00291 if(!getBaseName(args[1], topic_name)) 00292 return 1; 00293 ros::init(argc, argv, topic_name + string("_mux"), 00294 ros::init_options::AnonymousName); 00295 vector<string> topics; 00296 for (unsigned int i = 2; i < args.size(); i++) 00297 topics.push_back(args[i]); 00298 ros::NodeHandle n; 00299 g_node = &n; 00300 g_output_topic = args[1]; 00301 // Put our API into the "mux" namespace, which the user should usually remap 00302 ros::NodeHandle mux_nh("mux"), pnh("~"); 00303 pnh.getParam("lazy", g_lazy); 00304 00305 // Latched publisher for selected input topic name 00306 g_pub_selected = mux_nh.advertise<std_msgs::String>(string("selected"), 1, true); 00307 00308 for (size_t i = 0; i < topics.size(); i++) 00309 { 00310 struct sub_info_t sub_info; 00311 sub_info.msg = new ShapeShifter; 00312 sub_info.topic_name = ros::names::resolve(topics[i]); 00313 sub_info.sub = new ros::Subscriber(n.subscribe<ShapeShifter>(sub_info.topic_name, 10, boost::bind(in_cb, _1, sub_info.msg))); 00314 00315 g_subs.push_back(sub_info); 00316 } 00317 g_selected = g_subs.begin(); // select first topic to start 00318 std_msgs::String t; 00319 t.data = g_selected->topic_name; 00320 g_pub_selected.publish(t); 00321 00322 // Backward compatibility 00323 ros::ServiceServer ss = n.advertiseService(g_output_topic + string("_select"), sel_srv_cb_dep); 00324 // New service 00325 ros::ServiceServer ss_select = mux_nh.advertiseService(string("select"), sel_srv_cb); 00326 ros::ServiceServer ss_add = mux_nh.advertiseService(string("add"), add_topic_cb); 00327 ros::ServiceServer ss_list = mux_nh.advertiseService(string("list"), list_topic_cb); 00328 ros::ServiceServer ss_del = mux_nh.advertiseService(string("delete"), del_topic_cb); 00329 ros::spin(); 00330 for (list<struct sub_info_t>::iterator it = g_subs.begin(); 00331 it != g_subs.end(); 00332 ++it) 00333 { 00334 if (it->sub) 00335 it->sub->shutdown(); 00336 delete it->sub; 00337 delete it->msg; 00338 } 00339 00340 g_subs.clear(); 00341 return 0; 00342 } 00343