Go to the documentation of this file.00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036 #include "jsk_topic_tools/connection_based_nodelet.h"
00037 #include "jsk_topic_tools/log_utils.h"
00038
00039 namespace jsk_topic_tools
00040 {
00041 void ConnectionBasedNodelet::onInit()
00042 {
00043 connection_status_ = NOT_SUBSCRIBED;
00044 bool use_multithread;
00045 ros::param::param<bool>("~use_multithread_callback", use_multithread, true);
00046 if (use_multithread) {
00047 NODELET_DEBUG("use multithread callback");
00048 nh_.reset (new ros::NodeHandle (getMTNodeHandle ()));
00049 pnh_.reset (new ros::NodeHandle (getMTPrivateNodeHandle ()));
00050 } else {
00051 NODELET_DEBUG("use singlethread callback");
00052 nh_.reset (new ros::NodeHandle (getNodeHandle ()));
00053 pnh_.reset (new ros::NodeHandle (getPrivateNodeHandle ()));
00054 }
00055 pnh_->param("always_subscribe", always_subscribe_, false);
00056 pnh_->param("verbose_connection", verbose_connection_, false);
00057 if (!verbose_connection_) {
00058 nh_->param("verbose_connection", verbose_connection_, false);
00059 }
00060
00061
00062 pnh_->param("no_warn_on_init_post_process", on_init_post_process_called_, false);
00063 if (!on_init_post_process_called_) {
00064 timer_warn_on_init_post_process_called_ = nh_->createWallTimer(
00065 ros::WallDuration(5),
00066 &ConnectionBasedNodelet::warnOnInitPostProcessCalledCallback,
00067 this,
00068 true);
00069 }
00070
00071
00072 ever_subscribed_ = false;
00073 timer_warn_never_subscribed_ = nh_->createWallTimer(
00074 ros::WallDuration(5),
00075 &ConnectionBasedNodelet::warnNeverSubscribedCallback,
00076 this,
00077 true);
00078 }
00079
00080 void ConnectionBasedNodelet::onInitPostProcess()
00081 {
00082 on_init_post_process_called_ = true;
00083 if (always_subscribe_) {
00084 boost::mutex::scoped_lock lock(connection_mutex_);
00085 ever_subscribed_ = true;
00086 subscribe();
00087 }
00088 }
00089
00090 inline bool ConnectionBasedNodelet::isSubscribed()
00091 {
00092 return connection_status_ == SUBSCRIBED;
00093 }
00094
00095
00096 void ConnectionBasedNodelet::warnOnInitPostProcessCalledCallback(const ros::WallTimerEvent& event)
00097 {
00098 if (!on_init_post_process_called_) {
00099 NODELET_WARN("[%s] onInitPostProcess is not yet called.", nodelet::Nodelet::getName().c_str());
00100 }
00101 }
00102
00103 void ConnectionBasedNodelet::warnNeverSubscribedCallback(const ros::WallTimerEvent& event)
00104 {
00105 if (!ever_subscribed_) {
00106 NODELET_WARN("'%s' subscribes topics only with child subscribers.", nodelet::Nodelet::getName().c_str());
00107 }
00108 }
00109
00110 void ConnectionBasedNodelet::connectionCallback(const ros::SingleSubscriberPublisher& pub)
00111 {
00112 if (verbose_connection_) {
00113 NODELET_INFO("New connection or disconnection is detected");
00114 }
00115 if (!always_subscribe_) {
00116 boost::mutex::scoped_lock lock(connection_mutex_);
00117 for (size_t i = 0; i < publishers_.size(); i++) {
00118 ros::Publisher pub = publishers_[i];
00119 if (pub.getNumSubscribers() > 0) {
00120 if (!ever_subscribed_) {
00121 ever_subscribed_ = true;
00122 }
00123 if (connection_status_ != SUBSCRIBED) {
00124 if (verbose_connection_) {
00125 NODELET_INFO("Subscribe input topics");
00126 }
00127 subscribe();
00128 connection_status_ = SUBSCRIBED;
00129 }
00130 return;
00131 }
00132 }
00133 if (connection_status_ == SUBSCRIBED) {
00134 if (verbose_connection_) {
00135 NODELET_INFO("Unsubscribe input topics");
00136 }
00137 unsubscribe();
00138 connection_status_ = NOT_SUBSCRIBED;
00139 }
00140 }
00141 }
00142
00143 void ConnectionBasedNodelet::imageConnectionCallback(
00144 const image_transport::SingleSubscriberPublisher& pub)
00145 {
00146 if (verbose_connection_) {
00147 NODELET_INFO("New image connection or disconnection is detected");
00148 }
00149 if (!always_subscribe_) {
00150 boost::mutex::scoped_lock lock(connection_mutex_);
00151 for (size_t i = 0; i < image_publishers_.size(); i++) {
00152 image_transport::Publisher pub = image_publishers_[i];
00153 if (pub.getNumSubscribers() > 0) {
00154 if (!ever_subscribed_) {
00155 ever_subscribed_ = true;
00156 }
00157 if (connection_status_ != SUBSCRIBED) {
00158 if (verbose_connection_) {
00159 NODELET_INFO("Subscribe input topics");
00160 }
00161 subscribe();
00162 connection_status_ = SUBSCRIBED;
00163 }
00164 return;
00165 }
00166 }
00167 if (connection_status_ == SUBSCRIBED) {
00168 if (verbose_connection_) {
00169 NODELET_INFO("Unsubscribe input topics");
00170 }
00171 unsubscribe();
00172 connection_status_ = NOT_SUBSCRIBED;
00173 }
00174 }
00175 }
00176
00177 void ConnectionBasedNodelet::cameraConnectionCallback(
00178 const image_transport::SingleSubscriberPublisher& pub)
00179 {
00180 cameraConnectionBaseCallback();
00181 }
00182
00183 void ConnectionBasedNodelet::cameraInfoConnectionCallback(
00184 const ros::SingleSubscriberPublisher& pub)
00185 {
00186 cameraConnectionBaseCallback();
00187 }
00188
00189 void ConnectionBasedNodelet::cameraConnectionBaseCallback()
00190 {
00191 if (verbose_connection_) {
00192 NODELET_INFO("New image connection or disconnection is detected");
00193 }
00194 if (!always_subscribe_) {
00195 boost::mutex::scoped_lock lock(connection_mutex_);
00196 for (size_t i = 0; i < camera_publishers_.size(); i++) {
00197 image_transport::CameraPublisher pub = camera_publishers_[i];
00198 if (pub.getNumSubscribers() > 0) {
00199 if (!ever_subscribed_) {
00200 ever_subscribed_ = true;
00201 }
00202 if (connection_status_ != SUBSCRIBED) {
00203 if (verbose_connection_) {
00204 NODELET_INFO("Subscribe input topics");
00205 }
00206 subscribe();
00207 connection_status_ = SUBSCRIBED;
00208 }
00209 return;
00210 }
00211 }
00212 if (connection_status_ == SUBSCRIBED) {
00213 if (verbose_connection_) {
00214 NODELET_INFO("Unsubscribe input topics");
00215 }
00216 unsubscribe();
00217 connection_status_ = NOT_SUBSCRIBED;
00218 }
00219 }
00220 }
00221 }