Go to the documentation of this file.00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036
00037
00038
00039
00040 #include <jsk_topic_tools/stealth_relay.h>
00041
00042
00043 namespace jsk_topic_tools
00044 {
00045 void StealthRelay::onInit()
00046 {
00047 bool use_multithread;
00048 ros::param::param<bool>("~use_multithread_callback", use_multithread, true);
00049 if (use_multithread) {
00050 NODELET_DEBUG("use multithread callback");
00051 nh_.reset (new ros::NodeHandle (getMTNodeHandle ()));
00052 pnh_.reset (new ros::NodeHandle (getMTPrivateNodeHandle ()));
00053 } else {
00054 NODELET_DEBUG("use singlethread callback");
00055 nh_.reset (new ros::NodeHandle (getNodeHandle ()));
00056 pnh_.reset (new ros::NodeHandle (getPrivateNodeHandle ()));
00057 }
00058
00059 subscribed_ = false;
00060 advertised_ = false;
00061
00062 pnh_->param<int>("queue_size", queue_size_, 1);
00063 pnh_->param<std::string>("monitoring_topic", monitoring_topic_,
00064 pnh_->resolveName("input"));
00065
00066 double monitor_rate;
00067 pnh_->param<double>("monitor_rate", monitor_rate, 1.0);
00068 poll_timer_ = pnh_->createTimer(ros::Duration(monitor_rate),
00069 &StealthRelay::timerCallback, this);
00070
00071 NODELET_DEBUG("Started monitoring %s at %.2f Hz", monitoring_topic_.c_str(), monitor_rate);
00072 subscribe();
00073 }
00074
00075 void StealthRelay::subscribe()
00076 {
00077 NODELET_DEBUG("subscribe");
00078 sub_ = pnh_->subscribe("input", queue_size_,
00079 &StealthRelay::inputCallback, this);
00080 subscribed_ = true;
00081 }
00082
00083 void StealthRelay::unsubscribe()
00084 {
00085 NODELET_DEBUG("unsubscribe");
00086 sub_.shutdown();
00087 subscribed_ = false;
00088 }
00089
00090 bool StealthRelay::isSubscribed()
00091 {
00092 return subscribed_;
00093 }
00094
00095 void StealthRelay::inputCallback(const AnyMsgConstPtr& msg)
00096 {
00097 boost::mutex::scoped_lock lock(mutex_);
00098
00099 if (!advertised_)
00100 {
00101 pub_ = msg->advertise(*pnh_, "output", 1);
00102 advertised_ = true;
00103 unsubscribe();
00104 return;
00105 }
00106
00107 pub_.publish(msg);
00108 }
00109
00110 void StealthRelay::timerCallback(const ros::TimerEvent& event)
00111 {
00112 boost::mutex::scoped_lock lock(mutex_);
00113
00114 if (pub_.getNumSubscribers() == 0 && subscribed_)
00115 {
00116 unsubscribe();
00117 return;
00118 }
00119
00120 int subscribing_num = 0;
00121 if (!getNumOtherSubscribers(monitoring_topic_, subscribing_num))
00122 {
00123 if (subscribed_) unsubscribe();
00124 }
00125 else if (subscribed_ && subscribing_num == 0)
00126 unsubscribe();
00127 else if (!subscribed_ && subscribing_num > 0)
00128 subscribe();
00129 }
00130
00131 bool StealthRelay::getNumOtherSubscribers(const std::string& name, int& num)
00132 {
00133 XmlRpc::XmlRpcValue req(ros::this_node::getName()), res, data;
00134 bool ok = ros::master::execute("getSystemState", req, res, data, false);
00135
00136 XmlRpc::XmlRpcValue& sub_info = data[1];
00137 for (size_t i = 0; i < sub_info.size(); ++i)
00138 {
00139 std::string topic_name = sub_info[i][0];
00140 if (topic_name == name)
00141 {
00142 XmlRpc::XmlRpcValue& subscribers = sub_info[i][1];
00143 int cnt = 0;
00144 for (size_t j = 0; j < subscribers.size(); ++j)
00145 {
00146 std::string subscriber = subscribers[j];
00147 if (subscriber != ros::this_node::getName()) ++cnt;
00148 }
00149 num = cnt;
00150 return true;
00151 }
00152 }
00153 return false;
00154 }
00155 }
00156
00157 #include <pluginlib/class_list_macros.h>
00158 typedef jsk_topic_tools::StealthRelay StealthRelay;
00159 PLUGINLIB_EXPORT_CLASS(StealthRelay, nodelet::Nodelet)