00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036 #include <actionlib/client/connection_monitor.h>
00037
00038 #include <sstream>
00039
00040 using namespace std;
00041 using namespace actionlib;
00042
00043
00044 #define CONNECTION_DEBUG(fmt, ...) \
00045 ROS_DEBUG_NAMED("ConnectionMonitor", fmt,##__VA_ARGS__)
00046
00047 #define CONNECTION_WARN(fmt, ...) \
00048 ROS_WARN_NAMED("ConnectionMonitor", fmt,##__VA_ARGS__)
00049
00050
00051 ConnectionMonitor::ConnectionMonitor(ros::Subscriber&feedback_sub, ros::Subscriber& result_sub)
00052 : feedback_sub_(feedback_sub), result_sub_(result_sub)
00053 {
00054 status_received_ = false;
00055 }
00056
00057
00058
00059 void ConnectionMonitor::goalConnectCallback(const ros::SingleSubscriberPublisher& pub)
00060 {
00061 boost::recursive_mutex::scoped_lock lock(data_mutex_);
00062
00063
00064 if (goalSubscribers_.find(pub.getSubscriberName()) == goalSubscribers_.end())
00065 {
00066 CONNECTION_DEBUG("goalConnectCallback: Adding [%s] to goalSubscribers", pub.getSubscriberName().c_str());
00067 goalSubscribers_[pub.getSubscriberName()] = 1;
00068 }
00069 else
00070 {
00071 CONNECTION_WARN("goalConnectCallback: Trying to add [%s] to goalSubscribers, but it is already in the goalSubscribers list", pub.getSubscriberName().c_str());
00072 goalSubscribers_[pub.getSubscriberName()]++;
00073 }
00074 CONNECTION_DEBUG("%s", goalSubscribersString().c_str());
00075
00076 check_connection_condition_.notify_all();
00077 }
00078
00079 void ConnectionMonitor::goalDisconnectCallback(const ros::SingleSubscriberPublisher& pub)
00080 {
00081 boost::recursive_mutex::scoped_lock lock(data_mutex_);
00082
00083 map<string, size_t>::iterator it;
00084 it = goalSubscribers_.find(pub.getSubscriberName());
00085
00086 if (it == goalSubscribers_.end())
00087 CONNECTION_WARN("goalDisconnectCallback: Trying to remove [%s] to goalSubscribers, but it is not in the goalSubscribers list", pub.getSubscriberName().c_str());
00088 else
00089 {
00090 CONNECTION_DEBUG("goalDisconnectCallback: Removing [%s] from goalSubscribers", pub.getSubscriberName().c_str());
00091 goalSubscribers_[pub.getSubscriberName()]--;
00092 if (goalSubscribers_[pub.getSubscriberName()] == 0)
00093 {
00094 goalSubscribers_.erase(it);
00095 }
00096 }
00097 CONNECTION_DEBUG("%s", goalSubscribersString().c_str());
00098 }
00099
00100 string ConnectionMonitor::goalSubscribersString()
00101 {
00102 boost::recursive_mutex::scoped_lock lock(data_mutex_);
00103 ostringstream ss;
00104 ss << "Goal Subscribers (" << goalSubscribers_.size() << " total)";
00105 for (map<string, size_t>::iterator it=goalSubscribers_.begin(); it != goalSubscribers_.end(); it++)
00106 ss << "\n - " << it->first;
00107 return ss.str();
00108 }
00109
00110
00111
00112 void ConnectionMonitor::cancelConnectCallback(const ros::SingleSubscriberPublisher& pub)
00113 {
00114 boost::recursive_mutex::scoped_lock lock(data_mutex_);
00115
00116
00117 if (cancelSubscribers_.find(pub.getSubscriberName()) == cancelSubscribers_.end())
00118 {
00119 CONNECTION_DEBUG("cancelConnectCallback: Adding [%s] to cancelSubscribers", pub.getSubscriberName().c_str());
00120 cancelSubscribers_[pub.getSubscriberName()] = 1;
00121 }
00122 else
00123 {
00124 CONNECTION_WARN("cancelConnectCallback: Trying to add [%s] to cancelSubscribers, but it is already in the cancelSubscribers list", pub.getSubscriberName().c_str());
00125 cancelSubscribers_[pub.getSubscriberName()]++;
00126 }
00127 CONNECTION_DEBUG("%s", cancelSubscribersString().c_str());
00128
00129 check_connection_condition_.notify_all();
00130 }
00131
00132 void ConnectionMonitor::cancelDisconnectCallback(const ros::SingleSubscriberPublisher& pub)
00133 {
00134 boost::recursive_mutex::scoped_lock lock(data_mutex_);
00135
00136 map<string, size_t>::iterator it;
00137 it = cancelSubscribers_.find(pub.getSubscriberName());
00138
00139 if (it == cancelSubscribers_.end())
00140 CONNECTION_WARN("cancelDisconnectCallback: Trying to remove [%s] to cancelSubscribers, but it is not in the cancelSubscribers list", pub.getSubscriberName().c_str());
00141 else
00142 {
00143 CONNECTION_DEBUG("cancelDisconnectCallback: Removing [%s] from cancelSubscribers", pub.getSubscriberName().c_str());
00144 cancelSubscribers_[pub.getSubscriberName()]--;
00145 if (cancelSubscribers_[pub.getSubscriberName()] == 0)
00146 {
00147 cancelSubscribers_.erase(it);
00148 }
00149 }
00150 CONNECTION_DEBUG("%s", cancelSubscribersString().c_str());
00151 }
00152
00153 string ConnectionMonitor::cancelSubscribersString()
00154 {
00155 boost::recursive_mutex::scoped_lock lock(data_mutex_);
00156
00157 ostringstream ss;
00158 ss << "cancel Subscribers (" << cancelSubscribers_.size() << " total)";
00159 for (map<string, size_t>::iterator it=cancelSubscribers_.begin(); it != cancelSubscribers_.end(); it++)
00160 ss << "\n - " << it->first;
00161 return ss.str();
00162 }
00163
00164
00165 void ConnectionMonitor::processStatus(const actionlib_msgs::GoalStatusArrayConstPtr& status)
00166 {
00167 boost::recursive_mutex::scoped_lock lock(data_mutex_);
00168
00169 string cur_status_caller_id = (*(status->__connection_header))["callerid"];
00170
00171 if (status_received_)
00172 {
00173 if (status_caller_id_ != cur_status_caller_id)
00174 {
00175 CONNECTION_WARN("processStatus: Previously received status from [%s], but we now received status from [%s]. Did the ActionServer change?",
00176 status_caller_id_.c_str(), cur_status_caller_id.c_str());
00177 status_caller_id_ = cur_status_caller_id;
00178 }
00179 latest_status_time_ = status->header.stamp;
00180 }
00181 else
00182 {
00183 CONNECTION_DEBUG("processStatus: Just got our first status message from the ActionServer at node [%s]", cur_status_caller_id.c_str());
00184 status_received_ = true;
00185 status_caller_id_ = cur_status_caller_id;
00186 latest_status_time_ = status->header.stamp;
00187 }
00188
00189 check_connection_condition_.notify_all();
00190 }
00191
00192
00193 bool ConnectionMonitor::isServerConnected()
00194 {
00195 boost::recursive_mutex::scoped_lock lock(data_mutex_);
00196
00197 if (!status_received_)
00198 {
00199 CONNECTION_DEBUG("isServerConnected: Didn't receive status yet, so not connected yet");
00200 return false;
00201 }
00202
00203 if(goalSubscribers_.find(status_caller_id_) == goalSubscribers_.end())
00204 {
00205 CONNECTION_DEBUG("isServerConnected: Server [%s] has not yet subscribed to the goal topic, so not connected yet", status_caller_id_.c_str());
00206 CONNECTION_DEBUG("%s", goalSubscribersString().c_str());
00207 return false;
00208 }
00209
00210 if(cancelSubscribers_.find(status_caller_id_) == cancelSubscribers_.end())
00211 {
00212 CONNECTION_DEBUG("isServerConnected: Server [%s] has not yet subscribed to the cancel topic, so not connected yet", status_caller_id_.c_str());
00213 CONNECTION_DEBUG("%s", cancelSubscribersString().c_str());
00214 return false;
00215 }
00216
00217 if(feedback_sub_.getNumPublishers() == 0)
00218 {
00219 CONNECTION_DEBUG("isServerConnected: Client has not yet connected to feedback topic of server [%s]", status_caller_id_.c_str());
00220 return false;
00221 }
00222
00223 if(result_sub_.getNumPublishers() == 0)
00224 {
00225 CONNECTION_DEBUG("isServerConnected: Client has not yet connected to result topic of server [%s]", status_caller_id_.c_str());
00226 return false;
00227 }
00228
00229 CONNECTION_DEBUG("isServerConnected: Server [%s] is fully connected", status_caller_id_.c_str());
00230 return true;
00231 }
00232
00233 bool ConnectionMonitor::waitForActionServerToStart(const ros::Duration& timeout, const ros::NodeHandle& nh)
00234 {
00235 if (timeout < ros::Duration(0,0))
00236 ROS_ERROR_NAMED("actionlib", "Timeouts can't be negative. Timeout is [%.2fs]", timeout.toSec());
00237
00238 ros::Time timeout_time = ros::Time::now() + timeout;
00239
00240 boost::recursive_mutex::scoped_lock lock(data_mutex_);
00241
00242 if (isServerConnected())
00243 return true;
00244
00245
00246 ros::Duration loop_period = ros::Duration().fromSec(.5);
00247
00248 while (nh.ok() && !isServerConnected())
00249 {
00250
00251 ros::Duration time_left = timeout_time - ros::Time::now();
00252
00253
00254 if (timeout != ros::Duration(0,0) && time_left <= ros::Duration(0,0) )
00255 break;
00256
00257
00258 if (time_left > loop_period || timeout == ros::Duration())
00259 time_left = loop_period;
00260
00261 check_connection_condition_.timed_wait(lock, boost::posix_time::milliseconds(time_left.toSec() * 1000.0f));
00262 }
00263
00264 return isServerConnected();
00265 }
00266
00267
00268
00269
00270
00271
00272
00273