Go to the documentation of this file.00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034 #include <pluginlib/class_list_macros.h>
00035 #include "jsk_topic_tools/mux_nodelet.h"
00036 #include <std_msgs/String.h>
00037 #include "jsk_topic_tools/rosparam_utils.h"
00038
00039 namespace jsk_topic_tools
00040 {
00041
00042 const static std::string g_none_topic = "__none";
00043
00044 void MUX::onInit()
00045 {
00046 advertised_ = false;
00047 pnh_ = getPrivateNodeHandle();
00048 readVectorParameter(pnh_, "topics", topics_);
00049 if (topics_.size() < 1) {
00050 NODELET_FATAL("need to specify at least one topic in ~topics");
00051 return;
00052 }
00053 pub_selected_ = pnh_.advertise<std_msgs::String>("selected", 1, true);
00054
00055
00056
00057
00058 selected_topic_ = topics_[0];
00059 subscribeSelectedTopic();
00060
00061 ss_select_ = pnh_.advertiseService("select", &MUX::selectTopicCallback, this);
00062 ss_add_ = pnh_.advertiseService("add", &MUX::addTopicCallback, this);
00063 ss_list_ = pnh_.advertiseService("list_topics", &MUX::listTopicCallback, this);
00064 ss_del_ = pnh_.advertiseService("delete", &MUX::deleteTopicCallback, this);
00065
00066 }
00067
00068 void MUX::connectCb(const ros::SingleSubscriberPublisher& pub)
00069 {
00070
00071 if (pub_.getNumSubscribers() > 0) {
00072
00073 if (!subscribing_) {
00074
00075 sub_.reset(new ros::Subscriber(
00076 pnh_.subscribe<topic_tools::ShapeShifter>(
00077 selected_topic_, 10,
00078 &MUX::inputCallback, this, th_)));
00079 subscribing_ = true;
00080 }
00081 }
00082 else {
00083 if (subscribing_) {
00084
00085 sub_->shutdown();
00086 subscribing_ = false;
00087 }
00088 }
00089 }
00090
00091
00092
00093 bool MUX::selectTopicCallback(topic_tools::MuxSelect::Request &req,
00094 topic_tools::MuxSelect::Response &res)
00095 {
00096 res.prev_topic = selected_topic_;
00097 if (selected_topic_ != g_none_topic) {
00098
00099 sub_->shutdown();
00100 }
00101
00102 if (req.topic == g_none_topic) {
00103 selected_topic_ = g_none_topic;
00104 return true;
00105 }
00106 for (size_t i = 0; i < topics_.size(); i++) {
00107 if (pnh_.resolveName(topics_[i]) == pnh_.resolveName(req.topic)) {
00108
00109 selected_topic_ = topics_[i];
00110 subscribeSelectedTopic();
00111 return true;
00112 }
00113 }
00114
00115 NODELET_WARN("%s is not provided in topic list", req.topic.c_str());
00116 return false;
00117 }
00118
00119 bool MUX::addTopicCallback(topic_tools::MuxAdd::Request& req,
00120 topic_tools::MuxAdd::Response& res)
00121 {
00122 NODELET_INFO("trying to add %s to mux", req.topic.c_str());
00123 if (req.topic == g_none_topic) {
00124 NODELET_WARN("failed to add topic %s to mux, because it's reserved for special use",
00125 req.topic.c_str());
00126 return false;
00127 }
00128
00129 for (size_t i = 0; i < topics_.size(); i++) {
00130 if (pnh_.resolveName(topics_[i]) == pnh_.resolveName(req.topic)) {
00131 NODELET_WARN("tried to add a topic that mux was already listening to: [%s]",
00132 topics_[i].c_str());
00133 return false;
00134 }
00135 }
00136
00137
00138
00139
00140 topics_.push_back(ros::names::resolve(req.topic));
00141 return true;
00142 }
00143
00144 bool MUX::deleteTopicCallback(topic_tools::MuxDelete::Request& req,
00145 topic_tools::MuxDelete::Response& res)
00146 {
00147
00148 for (size_t i = 0; i < topics_.size(); i++) {
00149 if (pnh_.resolveName(topics_[i]) == pnh_.resolveName(req.topic)) {
00150 if (pnh_.resolveName(req.topic) == pnh_.resolveName(selected_topic_)) {
00151 NODELET_WARN("tried to delete currently selected topic %s from mux",
00152 req.topic.c_str());
00153 return false;
00154 }
00155 topics_.erase(topics_.begin() + i);
00156 return true;
00157 }
00158 }
00159 NODELET_WARN("cannot find the topics %s in the list of mux",
00160 req.topic.c_str());
00161 return false;
00162 }
00163
00164 bool MUX::listTopicCallback(topic_tools::MuxList::Request& req,
00165 topic_tools::MuxList::Response& res)
00166 {
00167 for (size_t i = 0; i < topics_.size(); i++) {
00168 res.topics.push_back(pnh_.resolveName(topics_[i]));
00169 }
00170 return true;
00171 }
00172
00173 void MUX::subscribeSelectedTopic()
00174 {
00175
00176 advertised_ = false;
00177 subscribing_ = false;
00178
00179 if (selected_topic_ == g_none_topic) {
00180 NODELET_WARN("none topic is selected");
00181 return;
00182 }
00183
00184 sub_.reset(new ros::Subscriber(
00185 pnh_.subscribe<topic_tools::ShapeShifter>(
00186 selected_topic_, 10,
00187 &MUX::inputCallback, this, th_)));
00188 std_msgs::String msg;
00189 msg.data = selected_topic_;
00190 pub_selected_.publish(msg);
00191 }
00192
00193 void MUX::inputCallback(const boost::shared_ptr<topic_tools::ShapeShifter const>& msg)
00194 {
00195 if (!advertised_) {
00196 ros::SubscriberStatusCallback connect_cb
00197 = boost::bind(&MUX::connectCb, this, _1);
00198 ros::AdvertiseOptions opts("output", 1,
00199 msg->getMD5Sum(),
00200 msg->getDataType(),
00201 msg->getMessageDefinition(),
00202 connect_cb,
00203 connect_cb);
00204 pub_ = pnh_.advertise(opts);
00205 advertised_ = true;
00206 sub_->shutdown();
00207
00208 }
00209 pub_.publish(msg);
00210 }
00211
00212 }
00213
00214 typedef jsk_topic_tools::MUX MUX;
00215 PLUGINLIB_EXPORT_CLASS(MUX, nodelet::Nodelet)
00216