00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036 #include <actionlib/client/connection_monitor.h>
00037
00038 #include <map>
00039 #include <sstream>
00040 #include <string>
00041
00042
00043
00044
00045 #define CONNECTION_DEBUG(fmt, ...) \
00046 ROS_DEBUG_NAMED("ConnectionMonitor", fmt, ## __VA_ARGS__)
00047
00048 #define CONNECTION_WARN(fmt, ...) \
00049 ROS_WARN_NAMED("ConnectionMonitor", fmt, ## __VA_ARGS__)
00050
00051
00052 actionlib::ConnectionMonitor::ConnectionMonitor(ros::Subscriber & feedback_sub,
00053 ros::Subscriber & result_sub)
00054 : feedback_sub_(feedback_sub), result_sub_(result_sub)
00055 {
00056 status_received_ = false;
00057 }
00058
00059
00060
00061 void actionlib::ConnectionMonitor::goalConnectCallback(const ros::SingleSubscriberPublisher & pub)
00062 {
00063 boost::recursive_mutex::scoped_lock lock(data_mutex_);
00064
00065
00066 if (goalSubscribers_.find(pub.getSubscriberName()) == goalSubscribers_.end()) {
00067 CONNECTION_DEBUG("goalConnectCallback: Adding [%s] to goalSubscribers",
00068 pub.getSubscriberName().c_str());
00069 goalSubscribers_[pub.getSubscriberName()] = 1;
00070 } else {
00071 CONNECTION_WARN(
00072 "goalConnectCallback: Trying to add [%s] to goalSubscribers, but it is already in the goalSubscribers list",
00073 pub.getSubscriberName().c_str());
00074 goalSubscribers_[pub.getSubscriberName()]++;
00075 }
00076 CONNECTION_DEBUG("%s", goalSubscribersString().c_str());
00077
00078 check_connection_condition_.notify_all();
00079 }
00080
00081 void actionlib::ConnectionMonitor::goalDisconnectCallback(const ros::SingleSubscriberPublisher & pub)
00082 {
00083 boost::recursive_mutex::scoped_lock lock(data_mutex_);
00084
00085 std::map<std::string, size_t>::iterator it;
00086 it = goalSubscribers_.find(pub.getSubscriberName());
00087
00088 if (it == goalSubscribers_.end()) {
00089 CONNECTION_WARN(
00090 "goalDisconnectCallback: Trying to remove [%s] to goalSubscribers, but it is not in the goalSubscribers list",
00091 pub.getSubscriberName().c_str());
00092 } else {
00093 CONNECTION_DEBUG("goalDisconnectCallback: Removing [%s] from goalSubscribers",
00094 pub.getSubscriberName().c_str());
00095 goalSubscribers_[pub.getSubscriberName()]--;
00096 if (goalSubscribers_[pub.getSubscriberName()] == 0) {
00097 goalSubscribers_.erase(it);
00098 }
00099 }
00100 CONNECTION_DEBUG("%s", goalSubscribersString().c_str());
00101 }
00102
00103 std::string actionlib::ConnectionMonitor::goalSubscribersString()
00104 {
00105 boost::recursive_mutex::scoped_lock lock(data_mutex_);
00106 std::ostringstream ss;
00107 ss << "Goal Subscribers (" << goalSubscribers_.size() << " total)";
00108 for (std::map<std::string, size_t>::iterator it = goalSubscribers_.begin();
00109 it != goalSubscribers_.end(); it++)
00110 {
00111 ss << "\n - " << it->first;
00112 }
00113 return ss.str();
00114 }
00115
00116
00117
00118 void actionlib::ConnectionMonitor::cancelConnectCallback(const ros::SingleSubscriberPublisher & pub)
00119 {
00120 boost::recursive_mutex::scoped_lock lock(data_mutex_);
00121
00122
00123 if (cancelSubscribers_.find(pub.getSubscriberName()) == cancelSubscribers_.end()) {
00124 CONNECTION_DEBUG("cancelConnectCallback: Adding [%s] to cancelSubscribers",
00125 pub.getSubscriberName().c_str());
00126 cancelSubscribers_[pub.getSubscriberName()] = 1;
00127 } else {
00128 CONNECTION_WARN(
00129 "cancelConnectCallback: Trying to add [%s] to cancelSubscribers, but it is already in the cancelSubscribers list",
00130 pub.getSubscriberName().c_str());
00131 cancelSubscribers_[pub.getSubscriberName()]++;
00132 }
00133 CONNECTION_DEBUG("%s", cancelSubscribersString().c_str());
00134
00135 check_connection_condition_.notify_all();
00136 }
00137
00138 void actionlib::ConnectionMonitor::cancelDisconnectCallback(
00139 const ros::SingleSubscriberPublisher & pub)
00140 {
00141 boost::recursive_mutex::scoped_lock lock(data_mutex_);
00142
00143 std::map<std::string, size_t>::iterator it;
00144 it = cancelSubscribers_.find(pub.getSubscriberName());
00145
00146 if (it == cancelSubscribers_.end()) {
00147 CONNECTION_WARN(
00148 "cancelDisconnectCallback: Trying to remove [%s] to cancelSubscribers, but it is not in the cancelSubscribers list",
00149 pub.getSubscriberName().c_str());
00150 } else {
00151 CONNECTION_DEBUG("cancelDisconnectCallback: Removing [%s] from cancelSubscribers",
00152 pub.getSubscriberName().c_str());
00153 cancelSubscribers_[pub.getSubscriberName()]--;
00154 if (cancelSubscribers_[pub.getSubscriberName()] == 0) {
00155 cancelSubscribers_.erase(it);
00156 }
00157 }
00158 CONNECTION_DEBUG("%s", cancelSubscribersString().c_str());
00159 }
00160
00161 std::string actionlib::ConnectionMonitor::cancelSubscribersString()
00162 {
00163 boost::recursive_mutex::scoped_lock lock(data_mutex_);
00164
00165 std::ostringstream ss;
00166 ss << "cancel Subscribers (" << cancelSubscribers_.size() << " total)";
00167 for (std::map<std::string, size_t>::iterator it = cancelSubscribers_.begin();
00168 it != cancelSubscribers_.end(); it++)
00169 {
00170 ss << "\n - " << it->first;
00171 }
00172 return ss.str();
00173 }
00174
00175
00176 void actionlib::ConnectionMonitor::processStatus(
00177 const actionlib_msgs::GoalStatusArrayConstPtr & status, const std::string & cur_status_caller_id)
00178 {
00179 boost::recursive_mutex::scoped_lock lock(data_mutex_);
00180
00181 if (status_received_) {
00182 if (status_caller_id_ != cur_status_caller_id) {
00183 CONNECTION_WARN(
00184 "processStatus: Previously received status from [%s], but we now received status from [%s]. Did the ActionServer change?",
00185 status_caller_id_.c_str(), cur_status_caller_id.c_str());
00186 status_caller_id_ = cur_status_caller_id;
00187 }
00188 latest_status_time_ = status->header.stamp;
00189 } else {
00190 CONNECTION_DEBUG(
00191 "processStatus: Just got our first status message from the ActionServer at node [%s]",
00192 cur_status_caller_id.c_str());
00193 status_received_ = true;
00194 status_caller_id_ = cur_status_caller_id;
00195 latest_status_time_ = status->header.stamp;
00196 }
00197
00198 check_connection_condition_.notify_all();
00199 }
00200
00201
00202 bool actionlib::ConnectionMonitor::isServerConnected()
00203 {
00204 boost::recursive_mutex::scoped_lock lock(data_mutex_);
00205
00206 if (!status_received_) {
00207 CONNECTION_DEBUG("isServerConnected: Didn't receive status yet, so not connected yet");
00208 return false;
00209 }
00210
00211 if (goalSubscribers_.find(status_caller_id_) == goalSubscribers_.end()) {
00212 CONNECTION_DEBUG(
00213 "isServerConnected: Server [%s] has not yet subscribed to the goal topic, so not connected yet",
00214 status_caller_id_.c_str());
00215 CONNECTION_DEBUG("%s", goalSubscribersString().c_str());
00216 return false;
00217 }
00218
00219 if (cancelSubscribers_.find(status_caller_id_) == cancelSubscribers_.end()) {
00220 CONNECTION_DEBUG(
00221 "isServerConnected: Server [%s] has not yet subscribed to the cancel topic, so not connected yet",
00222 status_caller_id_.c_str());
00223 CONNECTION_DEBUG("%s", cancelSubscribersString().c_str());
00224 return false;
00225 }
00226
00227 if (feedback_sub_.getNumPublishers() == 0) {
00228 CONNECTION_DEBUG(
00229 "isServerConnected: Client has not yet connected to feedback topic of server [%s]",
00230 status_caller_id_.c_str());
00231 return false;
00232 }
00233
00234 if (result_sub_.getNumPublishers() == 0) {
00235 CONNECTION_DEBUG(
00236 "isServerConnected: Client has not yet connected to result topic of server [%s]",
00237 status_caller_id_.c_str());
00238 return false;
00239 }
00240
00241 CONNECTION_DEBUG("isServerConnected: Server [%s] is fully connected", status_caller_id_.c_str());
00242 return true;
00243 }
00244
00245 bool actionlib::ConnectionMonitor::waitForActionServerToStart(const ros::Duration & timeout,
00246 const ros::NodeHandle & nh)
00247 {
00248 if (timeout < ros::Duration(0, 0)) {
00249 ROS_ERROR_NAMED("actionlib", "Timeouts can't be negative. Timeout is [%.2fs]", timeout.toSec());
00250 }
00251
00252 ros::Time timeout_time = ros::Time::now() + timeout;
00253
00254 boost::recursive_mutex::scoped_lock lock(data_mutex_);
00255
00256 if (isServerConnected()) {
00257 return true;
00258 }
00259
00260
00261 ros::Duration loop_period = ros::Duration().fromSec(.5);
00262
00263 while (nh.ok() && !isServerConnected()) {
00264
00265 ros::Duration time_left = timeout_time - ros::Time::now();
00266
00267
00268 if (timeout != ros::Duration(0, 0) && time_left <= ros::Duration(0, 0) ) {
00269 break;
00270 }
00271
00272
00273 if (time_left > loop_period || timeout == ros::Duration()) {
00274 time_left = loop_period;
00275 }
00276
00277 check_connection_condition_.timed_wait(lock,
00278 boost::posix_time::milliseconds(time_left.toSec() * 1000.0f));
00279 }
00280
00281 return isServerConnected();
00282 }