00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036 #include <actionlib/client/connection_monitor.h>
00037 #include <ros/ros.h>
00038
00039 #include <map>
00040 #include <sstream>
00041 #include <string>
00042
00043
00044
00045
00046 #define CONNECTION_DEBUG(fmt, ...) \
00047 ROS_DEBUG_NAMED("ConnectionMonitor", fmt, ## __VA_ARGS__)
00048
00049 #define CONNECTION_WARN(fmt, ...) \
00050 ROS_WARN_NAMED("ConnectionMonitor", fmt, ## __VA_ARGS__)
00051
00052
00053 actionlib::ConnectionMonitor::ConnectionMonitor(ros::Subscriber & feedback_sub,
00054 ros::Subscriber & result_sub)
00055 : feedback_sub_(feedback_sub), result_sub_(result_sub)
00056 {
00057 status_received_ = false;
00058 }
00059
00060
00061
00062 void actionlib::ConnectionMonitor::goalConnectCallback(const ros::SingleSubscriberPublisher & pub)
00063 {
00064 boost::recursive_mutex::scoped_lock lock(data_mutex_);
00065
00066
00067 if (goalSubscribers_.find(pub.getSubscriberName()) == goalSubscribers_.end()) {
00068 CONNECTION_DEBUG("goalConnectCallback: Adding [%s] to goalSubscribers",
00069 pub.getSubscriberName().c_str());
00070 goalSubscribers_[pub.getSubscriberName()] = 1;
00071 } else {
00072 CONNECTION_WARN(
00073 "goalConnectCallback: Trying to add [%s] to goalSubscribers, but it is already in the goalSubscribers list",
00074 pub.getSubscriberName().c_str());
00075 goalSubscribers_[pub.getSubscriberName()]++;
00076 }
00077 CONNECTION_DEBUG("%s", goalSubscribersString().c_str());
00078
00079 check_connection_condition_.notify_all();
00080 }
00081
00082 void actionlib::ConnectionMonitor::goalDisconnectCallback(const ros::SingleSubscriberPublisher & pub)
00083 {
00084 boost::recursive_mutex::scoped_lock lock(data_mutex_);
00085
00086 std::map<std::string, size_t>::iterator it;
00087 it = goalSubscribers_.find(pub.getSubscriberName());
00088
00089 if (it == goalSubscribers_.end()) {
00090 CONNECTION_WARN(
00091 "goalDisconnectCallback: Trying to remove [%s] to goalSubscribers, but it is not in the goalSubscribers list",
00092 pub.getSubscriberName().c_str());
00093 } else {
00094 CONNECTION_DEBUG("goalDisconnectCallback: Removing [%s] from goalSubscribers",
00095 pub.getSubscriberName().c_str());
00096 goalSubscribers_[pub.getSubscriberName()]--;
00097 if (goalSubscribers_[pub.getSubscriberName()] == 0) {
00098 goalSubscribers_.erase(it);
00099 }
00100 }
00101 CONNECTION_DEBUG("%s", goalSubscribersString().c_str());
00102 }
00103
00104 std::string actionlib::ConnectionMonitor::goalSubscribersString()
00105 {
00106 boost::recursive_mutex::scoped_lock lock(data_mutex_);
00107 std::ostringstream ss;
00108 ss << "Goal Subscribers (" << goalSubscribers_.size() << " total)";
00109 for (std::map<std::string, size_t>::iterator it = goalSubscribers_.begin();
00110 it != goalSubscribers_.end(); it++)
00111 {
00112 ss << "\n - " << it->first;
00113 }
00114 return ss.str();
00115 }
00116
00117
00118
00119 void actionlib::ConnectionMonitor::cancelConnectCallback(const ros::SingleSubscriberPublisher & pub)
00120 {
00121 boost::recursive_mutex::scoped_lock lock(data_mutex_);
00122
00123
00124 if (cancelSubscribers_.find(pub.getSubscriberName()) == cancelSubscribers_.end()) {
00125 CONNECTION_DEBUG("cancelConnectCallback: Adding [%s] to cancelSubscribers",
00126 pub.getSubscriberName().c_str());
00127 cancelSubscribers_[pub.getSubscriberName()] = 1;
00128 } else {
00129 CONNECTION_WARN(
00130 "cancelConnectCallback: Trying to add [%s] to cancelSubscribers, but it is already in the cancelSubscribers list",
00131 pub.getSubscriberName().c_str());
00132 cancelSubscribers_[pub.getSubscriberName()]++;
00133 }
00134 CONNECTION_DEBUG("%s", cancelSubscribersString().c_str());
00135
00136 check_connection_condition_.notify_all();
00137 }
00138
00139 void actionlib::ConnectionMonitor::cancelDisconnectCallback(
00140 const ros::SingleSubscriberPublisher & pub)
00141 {
00142 boost::recursive_mutex::scoped_lock lock(data_mutex_);
00143
00144 std::map<std::string, size_t>::iterator it;
00145 it = cancelSubscribers_.find(pub.getSubscriberName());
00146
00147 if (it == cancelSubscribers_.end()) {
00148 CONNECTION_WARN(
00149 "cancelDisconnectCallback: Trying to remove [%s] to cancelSubscribers, but it is not in the cancelSubscribers list",
00150 pub.getSubscriberName().c_str());
00151 } else {
00152 CONNECTION_DEBUG("cancelDisconnectCallback: Removing [%s] from cancelSubscribers",
00153 pub.getSubscriberName().c_str());
00154 cancelSubscribers_[pub.getSubscriberName()]--;
00155 if (cancelSubscribers_[pub.getSubscriberName()] == 0) {
00156 cancelSubscribers_.erase(it);
00157 }
00158 }
00159 CONNECTION_DEBUG("%s", cancelSubscribersString().c_str());
00160 }
00161
00162 std::string actionlib::ConnectionMonitor::cancelSubscribersString()
00163 {
00164 boost::recursive_mutex::scoped_lock lock(data_mutex_);
00165
00166 std::ostringstream ss;
00167 ss << "cancel Subscribers (" << cancelSubscribers_.size() << " total)";
00168 for (std::map<std::string, size_t>::iterator it = cancelSubscribers_.begin();
00169 it != cancelSubscribers_.end(); it++)
00170 {
00171 ss << "\n - " << it->first;
00172 }
00173 return ss.str();
00174 }
00175
00176
00177 void actionlib::ConnectionMonitor::processStatus(
00178 const actionlib_msgs::GoalStatusArrayConstPtr & status, const std::string & cur_status_caller_id)
00179 {
00180 boost::recursive_mutex::scoped_lock lock(data_mutex_);
00181
00182 if (status_received_) {
00183 if (status_caller_id_ != cur_status_caller_id) {
00184 CONNECTION_WARN(
00185 "processStatus: Previously received status from [%s], but we now received status from [%s]. Did the ActionServer change?",
00186 status_caller_id_.c_str(), cur_status_caller_id.c_str());
00187 status_caller_id_ = cur_status_caller_id;
00188 }
00189 latest_status_time_ = status->header.stamp;
00190 } else {
00191 CONNECTION_DEBUG(
00192 "processStatus: Just got our first status message from the ActionServer at node [%s]",
00193 cur_status_caller_id.c_str());
00194 status_received_ = true;
00195 status_caller_id_ = cur_status_caller_id;
00196 latest_status_time_ = status->header.stamp;
00197 }
00198
00199 check_connection_condition_.notify_all();
00200 }
00201
00202
00203 bool actionlib::ConnectionMonitor::isServerConnected()
00204 {
00205 boost::recursive_mutex::scoped_lock lock(data_mutex_);
00206
00207 if (!status_received_) {
00208 CONNECTION_DEBUG("isServerConnected: Didn't receive status yet, so not connected yet");
00209 return false;
00210 }
00211
00212 if (goalSubscribers_.find(status_caller_id_) == goalSubscribers_.end()) {
00213 CONNECTION_DEBUG(
00214 "isServerConnected: Server [%s] has not yet subscribed to the goal topic, so not connected yet",
00215 status_caller_id_.c_str());
00216 CONNECTION_DEBUG("%s", goalSubscribersString().c_str());
00217 return false;
00218 }
00219
00220 if (cancelSubscribers_.find(status_caller_id_) == cancelSubscribers_.end()) {
00221 CONNECTION_DEBUG(
00222 "isServerConnected: Server [%s] has not yet subscribed to the cancel topic, so not connected yet",
00223 status_caller_id_.c_str());
00224 CONNECTION_DEBUG("%s", cancelSubscribersString().c_str());
00225 return false;
00226 }
00227
00228 if (feedback_sub_.getNumPublishers() == 0) {
00229 CONNECTION_DEBUG(
00230 "isServerConnected: Client has not yet connected to feedback topic of server [%s]",
00231 status_caller_id_.c_str());
00232 return false;
00233 }
00234
00235 if (result_sub_.getNumPublishers() == 0) {
00236 CONNECTION_DEBUG(
00237 "isServerConnected: Client has not yet connected to result topic of server [%s]",
00238 status_caller_id_.c_str());
00239 return false;
00240 }
00241
00242 CONNECTION_DEBUG("isServerConnected: Server [%s] is fully connected", status_caller_id_.c_str());
00243 return true;
00244 }
00245
00246 bool actionlib::ConnectionMonitor::waitForActionServerToStart(const ros::Duration & timeout,
00247 const ros::NodeHandle & nh)
00248 {
00249 if (timeout < ros::Duration(0, 0)) {
00250 ROS_ERROR_NAMED("actionlib", "Timeouts can't be negative. Timeout is [%.2fs]", timeout.toSec());
00251 }
00252
00253 ros::Time timeout_time = ros::Time::now() + timeout;
00254
00255 boost::recursive_mutex::scoped_lock lock(data_mutex_);
00256
00257 if (isServerConnected()) {
00258 return true;
00259 }
00260
00261
00262 ros::Duration loop_period = ros::Duration().fromSec(.5);
00263
00264 while (nh.ok() && !isServerConnected()) {
00265
00266 ros::Duration time_left = timeout_time - ros::Time::now();
00267
00268
00269 if (timeout != ros::Duration(0, 0) && time_left <= ros::Duration(0, 0) ) {
00270 break;
00271 }
00272
00273
00274 if (time_left > loop_period || timeout == ros::Duration()) {
00275 time_left = loop_period;
00276 }
00277
00278 check_connection_condition_.timed_wait(lock,
00279 boost::posix_time::milliseconds(static_cast<int64_t>(time_left.toSec() * 1000.0f)));
00280 }
00281
00282 return isServerConnected();
00283 }