00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00030
00031
00032 #include <cstdio>
00033 #include <vector>
00034 #include <list>
00035 #include "ros/console.h"
00036 #include "std_msgs/String.h"
00037 #include "topic_tools/MuxSelect.h"
00038 #include "topic_tools/MuxAdd.h"
00039 #include "topic_tools/MuxList.h"
00040 #include "topic_tools/MuxDelete.h"
00041 #include "topic_tools/shape_shifter.h"
00042 #include "topic_tools/parse.h"
00043
00044 using std::string;
00045 using std::vector;
00046 using std::list;
00047 using namespace topic_tools;
00048
00049 const static string g_none_topic = "__none";
00050
00051 static ros::NodeHandle *g_node = NULL;
00052 static bool g_lazy = false;
00053 static bool g_advertised = false;
00054 static string g_output_topic;
00055 static ros::Publisher g_pub;
00056 static ros::Publisher g_pub_selected;
00057
00058 struct sub_info_t
00059 {
00060 std::string topic_name;
00061 ros::Subscriber *sub;
00062 ShapeShifter* msg;
00063 };
00064
00065 void in_cb(const boost::shared_ptr<ShapeShifter const>& msg, ShapeShifter* s);
00066
00067 static list<struct sub_info_t> g_subs;
00068 static list<struct sub_info_t>::iterator g_selected = g_subs.end();
00069
00070 void conn_cb(const ros::SingleSubscriberPublisher&)
00071 {
00072
00073
00074 if(g_lazy && g_selected != g_subs.end() && !g_selected->sub)
00075 {
00076 ROS_DEBUG("lazy mode; resubscribing to %s", g_selected->topic_name.c_str());
00077 g_selected->sub = new ros::Subscriber(g_node->subscribe<ShapeShifter>(g_selected->topic_name, 10, boost::bind(in_cb, _1, g_selected->msg)));
00078 }
00079 }
00080
00081 bool sel_srv_cb( topic_tools::MuxSelect::Request &req,
00082 topic_tools::MuxSelect::Response &res )
00083 {
00084 bool ret = false;
00085 if (g_selected != g_subs.end()) {
00086 res.prev_topic = g_selected->topic_name;
00087
00088
00089 if (g_lazy) {
00090 ROS_DEBUG("Unsubscribing to %s, lazy", res.prev_topic.c_str());
00091 if (g_selected->sub)
00092 g_selected->sub->shutdown();
00093 delete g_selected->sub;
00094 g_selected->sub = NULL;
00095 }
00096 }
00097 else
00098 res.prev_topic = string("");
00099
00100
00101 if (req.topic == g_none_topic)
00102 {
00103 ROS_INFO("mux selected to no input.");
00104
00105 g_selected = g_subs.end();
00106 ret = true;
00107 }
00108 else
00109 {
00110 ROS_INFO("trying to switch mux to %s", req.topic.c_str());
00111
00112 for (list<struct sub_info_t>::iterator it = g_subs.begin();
00113 it != g_subs.end();
00114 ++it)
00115 {
00116 if (ros::names::resolve(it->topic_name) == ros::names::resolve(req.topic))
00117 {
00118 g_selected = it;
00119 ROS_INFO("mux selected input: [%s]", it->topic_name.c_str());
00120 ret = true;
00121
00122 if (!g_selected->sub && (!g_advertised || (g_advertised && g_pub.getNumSubscribers()))) {
00123 g_selected->sub = new ros::Subscriber(g_node->subscribe<ShapeShifter>(g_selected->topic_name, 10, boost::bind(in_cb, _1, g_selected->msg)));
00124 }
00125 }
00126 }
00127 }
00128
00129 if(ret)
00130 {
00131 std_msgs::String t;
00132 t.data = req.topic;
00133 g_pub_selected.publish(t);
00134 }
00135
00136 return ret;
00137 }
00138
00139 bool sel_srv_cb_dep( topic_tools::MuxSelect::Request &req,
00140 topic_tools::MuxSelect::Response &res )
00141 {
00142 ROS_WARN("the <topic>_select service is deprecated; use mux/select instead");
00143 return sel_srv_cb(req,res);
00144 }
00145
00146
00147 void in_cb(const boost::shared_ptr<ShapeShifter const>& msg,
00148 ShapeShifter* s)
00149 {
00150 if (!g_advertised)
00151 {
00152 ROS_INFO("advertising");
00153 g_pub = msg->advertise(*g_node, g_output_topic, 10, false, conn_cb);
00154 g_advertised = true;
00155
00156
00157 if (g_lazy) {
00158 for (static list<struct sub_info_t>::iterator it = g_subs.begin(); it != g_subs.end(); ++it) {
00159 if (it != g_selected) {
00160 ROS_INFO("Unregistering from %s", it->topic_name.c_str());
00161 if (it->sub)
00162 it->sub->shutdown();
00163 delete it->sub;
00164 it->sub = NULL;
00165 }
00166 }
00167 }
00168 }
00169
00170 if (s != g_selected->msg)
00171 return;
00172
00173
00174 if (g_lazy && !g_pub.getNumSubscribers() && g_selected != g_subs.end()) {
00175 ROS_INFO("lazy mode; unsubscribing");
00176 g_selected->sub->shutdown();
00177 delete g_selected->sub;
00178 g_selected->sub = NULL;
00179 }
00180 else
00181 g_pub.publish(msg);
00182 }
00183
00184 bool list_topic_cb(topic_tools::MuxList::Request& req,
00185 topic_tools::MuxList::Response& res)
00186 {
00187 for (list<struct sub_info_t>::iterator it = g_subs.begin();
00188 it != g_subs.end();
00189 ++it)
00190 {
00191 res.topics.push_back(it->topic_name);
00192 }
00193
00194 return true;
00195 }
00196
00197 bool add_topic_cb(topic_tools::MuxAdd::Request& req,
00198 topic_tools::MuxAdd::Response& res)
00199 {
00200
00201 ROS_INFO("trying to add %s to mux", req.topic.c_str());
00202
00203
00204 if(req.topic == g_none_topic)
00205 {
00206 ROS_WARN("failed to add topic %s to mux, because it's reserved for special use",
00207 req.topic.c_str());
00208 return false;
00209 }
00210
00211
00212 for (list<struct sub_info_t>::iterator it = g_subs.begin();
00213 it != g_subs.end();
00214 ++it)
00215 {
00216 if (ros::names::resolve(it->topic_name) == ros::names::resolve(req.topic))
00217 {
00218 ROS_WARN("tried to add a topic that mux was already listening to: [%s]",
00219 it->topic_name.c_str());
00220 return false;
00221 }
00222 }
00223
00224 struct sub_info_t sub_info;
00225 sub_info.msg = new ShapeShifter;
00226 sub_info.topic_name = ros::names::resolve(req.topic);
00227 try
00228 {
00229 if (g_lazy)
00230 sub_info.sub = NULL;
00231 else
00232 sub_info.sub = new ros::Subscriber(g_node->subscribe<ShapeShifter>(sub_info.topic_name, 10, boost::bind(in_cb, _1, sub_info.msg)));
00233 }
00234 catch(ros::InvalidNameException& e)
00235 {
00236 ROS_WARN("failed to add topic %s to mux, because it's an invalid name: %s",
00237 req.topic.c_str(), e.what());
00238 delete sub_info.msg;
00239 return false;
00240 }
00241 g_subs.push_back(sub_info);
00242
00243 ROS_INFO("added %s to mux", req.topic.c_str());
00244
00245 return true;
00246 }
00247
00248 bool del_topic_cb(topic_tools::MuxDelete::Request& req,
00249 topic_tools::MuxDelete::Response& res)
00250 {
00251
00252 ROS_INFO("trying to delete %s from mux", req.topic.c_str());
00253
00254 for (list<struct sub_info_t>::iterator it = g_subs.begin();
00255 it != g_subs.end();
00256 ++it)
00257 {
00258 if (ros::names::resolve(it->topic_name) == ros::names::resolve(req.topic))
00259 {
00260
00261 if(it == g_selected)
00262 {
00263 ROS_WARN("tried to delete currently selected topic %s from mux", req.topic.c_str());
00264 return false;
00265 }
00266 if (it->sub)
00267 it->sub->shutdown();
00268 delete it->sub;
00269 delete it->msg;
00270 g_subs.erase(it);
00271 ROS_INFO("deleted topic %s from mux", req.topic.c_str());
00272 return true;
00273 }
00274 }
00275
00276 ROS_WARN("tried to delete non-subscribed topic %s from mux", req.topic.c_str());
00277 return false;
00278 }
00279
00280 int main(int argc, char **argv)
00281 {
00282 vector<string> args;
00283 ros::removeROSArgs(argc, (const char**)argv, args);
00284
00285 if (args.size() < 3)
00286 {
00287 printf("\nusage: mux OUT_TOPIC IN_TOPIC1 [IN_TOPIC2 [...]]\n\n");
00288 return 1;
00289 }
00290 std::string topic_name;
00291 if(!getBaseName(args[1], topic_name))
00292 return 1;
00293 ros::init(argc, argv, topic_name + string("_mux"),
00294 ros::init_options::AnonymousName);
00295 vector<string> topics;
00296 for (unsigned int i = 2; i < args.size(); i++)
00297 topics.push_back(args[i]);
00298 ros::NodeHandle n;
00299 g_node = &n;
00300 g_output_topic = args[1];
00301
00302 ros::NodeHandle mux_nh("mux"), pnh("~");
00303 pnh.getParam("lazy", g_lazy);
00304
00305
00306 g_pub_selected = mux_nh.advertise<std_msgs::String>(string("selected"), 1, true);
00307
00308 for (size_t i = 0; i < topics.size(); i++)
00309 {
00310 struct sub_info_t sub_info;
00311 sub_info.msg = new ShapeShifter;
00312 sub_info.topic_name = ros::names::resolve(topics[i]);
00313 sub_info.sub = new ros::Subscriber(n.subscribe<ShapeShifter>(sub_info.topic_name, 10, boost::bind(in_cb, _1, sub_info.msg)));
00314
00315 g_subs.push_back(sub_info);
00316 }
00317 g_selected = g_subs.begin();
00318 std_msgs::String t;
00319 t.data = g_selected->topic_name;
00320 g_pub_selected.publish(t);
00321
00322
00323 ros::ServiceServer ss = n.advertiseService(g_output_topic + string("_select"), sel_srv_cb_dep);
00324
00325 ros::ServiceServer ss_select = mux_nh.advertiseService(string("select"), sel_srv_cb);
00326 ros::ServiceServer ss_add = mux_nh.advertiseService(string("add"), add_topic_cb);
00327 ros::ServiceServer ss_list = mux_nh.advertiseService(string("list"), list_topic_cb);
00328 ros::ServiceServer ss_del = mux_nh.advertiseService(string("delete"), del_topic_cb);
00329 ros::spin();
00330 for (list<struct sub_info_t>::iterator it = g_subs.begin();
00331 it != g_subs.end();
00332 ++it)
00333 {
00334 if (it->sub)
00335 it->sub->shutdown();
00336 delete it->sub;
00337 delete it->msg;
00338 }
00339
00340 g_subs.clear();
00341 return 0;
00342 }
00343