00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00030
00031
00032 #include <cstdio>
00033 #include <vector>
00034 #include <list>
00035 #include "ros/console.h"
00036 #include "std_msgs/String.h"
00037 #include "topic_tools/MuxSelect.h"
00038 #include "topic_tools/MuxAdd.h"
00039 #include "topic_tools/MuxList.h"
00040 #include "topic_tools/MuxDelete.h"
00041 #include "topic_tools/shape_shifter.h"
00042 #include "topic_tools/parse.h"
00043
00044 using std::string;
00045 using std::vector;
00046 using std::list;
00047 using namespace topic_tools;
00048
00049 const static string g_none_topic = "__none";
00050
00051 static ros::NodeHandle *g_node = NULL;
00052 static bool g_lazy = false;
00053 static bool g_advertised = false;
00054 static string g_output_topic;
00055 static ros::Publisher g_pub;
00056 static ros::Publisher g_pub_selected;
00057
00058 struct sub_info_t
00059 {
00060 std::string topic_name;
00061 ros::Subscriber *sub;
00062 ShapeShifter* msg;
00063 };
00064
00065 void in_cb(const boost::shared_ptr<ShapeShifter const>& msg, ShapeShifter* s);
00066
00067 static list<struct sub_info_t> g_subs;
00068 static list<struct sub_info_t>::iterator g_selected = g_subs.end();
00069
00070 void conn_cb(const ros::SingleSubscriberPublisher&)
00071 {
00072
00073
00074 if(g_lazy && g_selected != g_subs.end() && !g_selected->sub)
00075 {
00076 ROS_DEBUG("lazy mode; resubscribing to %s", g_selected->topic_name.c_str());
00077 g_selected->sub = new ros::Subscriber(g_node->subscribe<ShapeShifter>(g_selected->topic_name, 10, boost::bind(in_cb, _1, g_selected->msg)));
00078 }
00079 }
00080
00081 bool sel_srv_cb( topic_tools::MuxSelect::Request &req,
00082 topic_tools::MuxSelect::Response &res )
00083 {
00084 bool ret = false;
00085 if (g_selected != g_subs.end()) {
00086 res.prev_topic = g_selected->topic_name;
00087
00088
00089 if (g_lazy) {
00090 ROS_DEBUG("Unsubscribing to %s, lazy", res.prev_topic.c_str());
00091 if (g_selected->sub)
00092 g_selected->sub->shutdown();
00093 delete g_selected->sub;
00094 g_selected->sub = NULL;
00095 }
00096 }
00097 else
00098 res.prev_topic = string("");
00099
00100
00101 if (req.topic == g_none_topic)
00102 {
00103 ROS_INFO("mux selected to no input.");
00104
00105 g_selected = g_subs.end();
00106 ret = true;
00107 }
00108 else
00109 {
00110 ROS_INFO("trying to switch mux to %s", req.topic.c_str());
00111
00112 for (list<struct sub_info_t>::iterator it = g_subs.begin();
00113 it != g_subs.end();
00114 ++it)
00115 {
00116 if (ros::names::resolve(it->topic_name) == ros::names::resolve(req.topic))
00117 {
00118 g_selected = it;
00119 ROS_INFO("mux selected input: [%s]", it->topic_name.c_str());
00120 ret = true;
00121
00122 if (!g_selected->sub && (!g_advertised || (g_advertised && g_pub.getNumSubscribers()))) {
00123 g_selected->sub = new ros::Subscriber(g_node->subscribe<ShapeShifter>(g_selected->topic_name, 10, boost::bind(in_cb, _1, g_selected->msg)));
00124 }
00125 }
00126 }
00127 }
00128
00129 if(ret)
00130 {
00131 std_msgs::String t;
00132 t.data = req.topic;
00133 g_pub_selected.publish(t);
00134 }
00135
00136 return ret;
00137 }
00138
00139 bool sel_srv_cb_dep( topic_tools::MuxSelect::Request &req,
00140 topic_tools::MuxSelect::Response &res )
00141 {
00142 ROS_WARN("the <topic>_select service is deprecated; use mux/select instead");
00143 return sel_srv_cb(req,res);
00144 }
00145
00146
00147 void in_cb(const boost::shared_ptr<ShapeShifter const>& msg,
00148 ShapeShifter* s)
00149 {
00150 if (!g_advertised)
00151 {
00152 ROS_INFO("advertising");
00153 g_pub = msg->advertise(*g_node, g_output_topic, 10, false, conn_cb);
00154 g_advertised = true;
00155
00156
00157 if (g_lazy) {
00158 for (static list<struct sub_info_t>::iterator it = g_subs.begin(); it != g_subs.end(); ++it) {
00159 if (it != g_selected) {
00160 ROS_INFO("Unregistering from %s", it->topic_name.c_str());
00161 if (it->sub)
00162 it->sub->shutdown();
00163 delete it->sub;
00164 it->sub = NULL;
00165 }
00166 }
00167 }
00168 }
00169
00170 if (s != g_selected->msg)
00171 return;
00172
00173
00174 if (g_lazy && !g_pub.getNumSubscribers() && g_selected != g_subs.end()) {
00175 ROS_INFO("lazy mode; unsubscribing");
00176 g_selected->sub->shutdown();
00177 delete g_selected->sub;
00178 g_selected->sub = NULL;
00179 }
00180 else
00181 g_pub.publish(msg);
00182 }
00183
00184 bool list_topic_cb(topic_tools::MuxList::Request& req,
00185 topic_tools::MuxList::Response& res)
00186 {
00187 (void)req;
00188 for (list<struct sub_info_t>::iterator it = g_subs.begin();
00189 it != g_subs.end();
00190 ++it)
00191 {
00192 res.topics.push_back(it->topic_name);
00193 }
00194
00195 return true;
00196 }
00197
00198 bool add_topic_cb(topic_tools::MuxAdd::Request& req,
00199 topic_tools::MuxAdd::Response& res)
00200 {
00201 (void)res;
00202
00203 ROS_INFO("trying to add %s to mux", req.topic.c_str());
00204
00205
00206 if(req.topic == g_none_topic)
00207 {
00208 ROS_WARN("failed to add topic %s to mux, because it's reserved for special use",
00209 req.topic.c_str());
00210 return false;
00211 }
00212
00213
00214 for (list<struct sub_info_t>::iterator it = g_subs.begin();
00215 it != g_subs.end();
00216 ++it)
00217 {
00218 if (ros::names::resolve(it->topic_name) == ros::names::resolve(req.topic))
00219 {
00220 ROS_WARN("tried to add a topic that mux was already listening to: [%s]",
00221 it->topic_name.c_str());
00222 return false;
00223 }
00224 }
00225
00226 struct sub_info_t sub_info;
00227 sub_info.msg = new ShapeShifter;
00228 sub_info.topic_name = ros::names::resolve(req.topic);
00229 try
00230 {
00231 if (g_lazy)
00232 sub_info.sub = NULL;
00233 else
00234 sub_info.sub = new ros::Subscriber(g_node->subscribe<ShapeShifter>(sub_info.topic_name, 10, boost::bind(in_cb, _1, sub_info.msg)));
00235 }
00236 catch(ros::InvalidNameException& e)
00237 {
00238 ROS_WARN("failed to add topic %s to mux, because it's an invalid name: %s",
00239 req.topic.c_str(), e.what());
00240 delete sub_info.msg;
00241 return false;
00242 }
00243 g_subs.push_back(sub_info);
00244
00245 ROS_INFO("added %s to mux", req.topic.c_str());
00246
00247 return true;
00248 }
00249
00250 bool del_topic_cb(topic_tools::MuxDelete::Request& req,
00251 topic_tools::MuxDelete::Response& res)
00252 {
00253 (void)res;
00254
00255 ROS_INFO("trying to delete %s from mux", req.topic.c_str());
00256
00257 for (list<struct sub_info_t>::iterator it = g_subs.begin();
00258 it != g_subs.end();
00259 ++it)
00260 {
00261 if (ros::names::resolve(it->topic_name) == ros::names::resolve(req.topic))
00262 {
00263
00264 if(it == g_selected)
00265 {
00266 ROS_WARN("tried to delete currently selected topic %s from mux", req.topic.c_str());
00267 return false;
00268 }
00269 if (it->sub)
00270 it->sub->shutdown();
00271 delete it->sub;
00272 delete it->msg;
00273 g_subs.erase(it);
00274 ROS_INFO("deleted topic %s from mux", req.topic.c_str());
00275 return true;
00276 }
00277 }
00278
00279 ROS_WARN("tried to delete non-subscribed topic %s from mux", req.topic.c_str());
00280 return false;
00281 }
00282
00283 int main(int argc, char **argv)
00284 {
00285 vector<string> args;
00286 ros::removeROSArgs(argc, (const char**)argv, args);
00287
00288 if (args.size() < 3)
00289 {
00290 printf("\nusage: mux OUT_TOPIC IN_TOPIC1 [IN_TOPIC2 [...]]\n\n");
00291 return 1;
00292 }
00293 std::string topic_name;
00294 if(!getBaseName(args[1], topic_name))
00295 return 1;
00296 ros::init(argc, argv, topic_name + string("_mux"),
00297 ros::init_options::AnonymousName);
00298 vector<string> topics;
00299 for (unsigned int i = 2; i < args.size(); i++)
00300 topics.push_back(args[i]);
00301 ros::NodeHandle n;
00302 g_node = &n;
00303 g_output_topic = args[1];
00304
00305 ros::NodeHandle mux_nh("mux"), pnh("~");
00306 pnh.getParam("lazy", g_lazy);
00307
00308
00309 g_pub_selected = mux_nh.advertise<std_msgs::String>(string("selected"), 1, true);
00310
00311 for (size_t i = 0; i < topics.size(); i++)
00312 {
00313 struct sub_info_t sub_info;
00314 sub_info.msg = new ShapeShifter;
00315 sub_info.topic_name = ros::names::resolve(topics[i]);
00316 sub_info.sub = new ros::Subscriber(n.subscribe<ShapeShifter>(sub_info.topic_name, 10, boost::bind(in_cb, _1, sub_info.msg)));
00317
00318 g_subs.push_back(sub_info);
00319 }
00320 g_selected = g_subs.begin();
00321 std_msgs::String t;
00322 t.data = g_selected->topic_name;
00323 g_pub_selected.publish(t);
00324
00325
00326 ros::ServiceServer ss = n.advertiseService(g_output_topic + string("_select"), sel_srv_cb_dep);
00327
00328 ros::ServiceServer ss_select = mux_nh.advertiseService(string("select"), sel_srv_cb);
00329 ros::ServiceServer ss_add = mux_nh.advertiseService(string("add"), add_topic_cb);
00330 ros::ServiceServer ss_list = mux_nh.advertiseService(string("list"), list_topic_cb);
00331 ros::ServiceServer ss_del = mux_nh.advertiseService(string("delete"), del_topic_cb);
00332 ros::spin();
00333 for (list<struct sub_info_t>::iterator it = g_subs.begin();
00334 it != g_subs.end();
00335 ++it)
00336 {
00337 if (it->sub)
00338 it->sub->shutdown();
00339 delete it->sub;
00340 delete it->msg;
00341 }
00342
00343 g_subs.clear();
00344 return 0;
00345 }
00346