Go to the documentation of this file.00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030 #include <rve_rpc/client.h>
00031
00032 #include <ros/ros.h>
00033 #include <ros/callback_queue.h>
00034
00035 #include <rve_rpc/Request.h>
00036 #include <rve_rpc/Response.h>
00037 #include <rve_rpc/ProtocolMethodsAdded.h>
00038 #include <rve_rpc/ProtocolClientConnect.h>
00039
00040 #include <boost/unordered_set.hpp>
00041
00042 #define REQUEST_DEBUG 1
00043
00044 namespace rve_rpc
00045 {
00046
00047 struct Client::Impl
00048 {
00049 Impl(const std::string& name, const ros::NodeHandle& nh);
00050 ~Impl();
00051
00052 void connect();
00053 void connectAsync();
00054 bool isConnected() { return connected_; }
00055 void disconnect();
00056 void call(const RequestWrapperPtr& req, const boost::function<void(const ResponseWrapperConstPtr&)>& response_cb);
00057 void pump();
00058 void flush();
00059 void cb(const ros::MessageEvent<ResponseWrapper const>& res);
00060 void addMethod(const MethodInfo& method);
00061 void handleMethodsAdded(const ros::MessageEvent<ResponseWrapper const>& evt);
00062
00063 void subAndPub();
00064 void waitForConnection();
00065 void sendConnected();
00066
00067 ros::CallbackQueue cbqueue_;
00068 std::vector<ros::Subscriber> subs_;
00069 ros::Publisher pub_;
00070 ros::NodeHandle nh_;
00071
00072 struct RequestInfo
00073 {
00074 RequestInfo()
00075 {}
00076
00077 rve_common::UUID id;
00078 boost::function<void(const ResponseWrapperConstPtr&)> response_cb;
00079
00080 #if REQUEST_DEBUG
00081 std::string method;
00082 #endif
00083 };
00084
00085 typedef std::map<rve_common::UUID, RequestInfo> M_RequestInfo;
00086 M_RequestInfo requests_;
00087 boost::mutex requests_mutex_;
00088
00089 typedef std::map<std::string, MethodInfo> M_MethodInfo;
00090 M_MethodInfo methods_;
00091
00092 typedef std::set<std::string> S_string;
00093 S_string broken_methods_;
00094
00095 bool connected_;
00096
00097 void onQueueTimer(const ros::WallTimerEvent&);
00098 ros::WallTimer queue_timer_;
00099 };
00100
00101 Client::Impl::Impl(const std::string& name, const ros::NodeHandle& nh)
00102 : nh_(nh, name)
00103 , connected_(false)
00104 {
00105 }
00106
00107 Client::Impl::~Impl()
00108 {
00109
00110 pub_.shutdown();
00111 subs_.clear();
00112 queue_timer_.stop();
00113 }
00114
00115 void Client::Impl::subAndPub()
00116 {
00117 ROS_ASSERT(!pub_);
00118 ROS_ASSERT(subs_.empty());
00119 ROS_ASSERT(!queue_timer_);
00120
00121 pub_ = nh_.advertise<RequestWrapper>("request", 0);
00122
00123
00124
00125
00126
00127
00128
00129
00130
00131 ros::SubscribeOptions sub_ops;
00132 sub_ops.initByFullCallbackType<const ros::MessageEvent<ResponseWrapper>&>("response", 0, boost::bind(&Impl::cb, this, _1));
00133 sub_ops.callback_queue = &cbqueue_;
00134 subs_.push_back(nh_.subscribe(sub_ops));
00135 subs_.push_back(nh_.subscribe("response", 0, &Impl::cb, this));
00136
00137 queue_timer_ = nh_.createWallTimer(ros::WallDuration(0.05), &Impl::onQueueTimer, this);
00138 }
00139
00140 void Client::Impl::connect()
00141 {
00142 subAndPub();
00143 waitForConnection();
00144 }
00145
00146 void Client::Impl::disconnect()
00147 {
00148 connected_ = false;
00149 subs_.clear();
00150 pub_.shutdown();
00151 queue_timer_ = ros::WallTimer();
00152 }
00153
00154 void Client::Impl::waitForConnection()
00155 {
00156
00157
00158 while (!pub_.getNumSubscribers() || !subs_[0].getNumPublishers())
00159 {
00160 cbqueue_.callAvailable(ros::WallDuration(0.1));
00161 }
00162
00163 sendConnected();
00164
00165 while (nh_.ok() && !connected_)
00166 {
00167 cbqueue_.callAvailable(ros::WallDuration(0.1));
00168 }
00169 }
00170
00171 void Client::Impl::onQueueTimer(const ros::WallTimerEvent&)
00172 {
00173 if (!connected_)
00174 {
00175 if (pub_.getNumSubscribers() && subs_[0].getNumPublishers())
00176 {
00177 sendConnected();
00178 }
00179 }
00180
00181 cbqueue_.callAvailable();
00182 }
00183
00184 void Client::Impl::connectAsync()
00185 {
00186 subAndPub();
00187 }
00188
00189 void Client::Impl::sendConnected()
00190 {
00191 std::pair<RequestWrapperPtr, boost::shared_ptr<ProtocolClientConnect> > pair = makeRequest<ProtocolClientConnect>();
00192 pair.first->request_id = rve_common::UUID::generate();
00193 pair.first->protocol = Request::PROTOCOL_CLIENT_CONNECT;
00194 pub_.publish(pair.first);
00195 }
00196
00197 void Client::Impl::handleMethodsAdded(const ros::MessageEvent<ResponseWrapper const>& evt)
00198 {
00199 ResponseWrapperConstPtr res = evt.getMessage();
00200
00201 try
00202 {
00203 ProtocolMethodsAddedConstPtr header = res->instantiate<ProtocolMethodsAdded>();
00204 for (size_t i = 0; i < header->methods.size(); ++i)
00205 {
00206 const MethodSpec& spec = header->methods[i];
00207
00208
00209 if (broken_methods_.find(spec.name) != broken_methods_.end())
00210 {
00211 continue;
00212 }
00213
00214 M_MethodInfo::iterator it = methods_.find(spec.name);
00215 if (it != methods_.end())
00216 {
00217 MethodInfo& info = it->second;
00218 if (info.request_md5sum != spec.request_md5sum)
00219 {
00220 ROS_ERROR("RPC Server wants method [%s:%s] to have request of type [%s/%s] but I have [%s/%s]", nh_.getNamespace().c_str(), spec.name.c_str(),
00221 spec.request_datatype.c_str(), spec.request_md5sum.c_str(), info.request_datatype.c_str(), info.request_md5sum.c_str());
00222
00223 broken_methods_.insert(info.name);
00224 }
00225 else if (info.response_md5sum != spec.response_md5sum)
00226 {
00227 ROS_ERROR("RPC Server wants method [%s:%s] to have response of type [%s/%s] but I have [%s/%s]", nh_.getNamespace().c_str(), spec.name.c_str(),
00228 spec.response_datatype.c_str(), spec.response_md5sum.c_str(), info.response_datatype.c_str(), info.response_md5sum.c_str());
00229
00230 broken_methods_.insert(info.name);
00231 }
00232 }
00233 else
00234 {
00235 ROS_DEBUG("RPC Server for [%s] at [%s] has extra method [%s]", nh_.getNamespace().c_str(), evt.getPublisherName().c_str(), spec.name.c_str());
00236 }
00237 }
00238 }
00239 catch (std::exception& e)
00240 {
00241 ROS_ERROR("Exception thrown while processing header from rpc server for [%s] at [%s]", nh_.getNamespace().c_str(), evt.getPublisherName().c_str());
00242 }
00243
00244 connected_ = true;
00245 }
00246
00247 void Client::Impl::cb(const ros::MessageEvent<ResponseWrapper const>& evt)
00248 {
00249 ResponseWrapperConstPtr res = evt.getMessage();
00250 if (res->protocol != 0)
00251 {
00252 switch (res->protocol)
00253 {
00254 case Response::PROTOCOL_METHODS_ADDED:
00255 handleMethodsAdded(evt);
00256 break;
00257 default:
00258 ROS_ERROR("Unknown protocol message type [%d]", res->protocol);
00259 }
00260 }
00261 else
00262 {
00263 rve_common::UUID id = res->request_id;
00264
00265 {
00266 boost::mutex::scoped_lock lock(requests_mutex_);
00267 M_RequestInfo::iterator it = requests_.find(id);
00268 if (it != requests_.end())
00269 {
00270 RequestInfo& ri = it->second;
00271 ri.response_cb(res);
00272 requests_.erase(it);
00273 }
00274 }
00275 }
00276 }
00277
00278 void Client::Impl::addMethod(const MethodInfo& method)
00279 {
00280 ROS_ASSERT(methods_.count(method.name) == 0);
00281
00282 methods_[method.name] = method;
00283 }
00284
00285 void Client::Impl::pump()
00286 {
00287 cbqueue_.callOne(ros::WallDuration(0.1));
00288 }
00289
00290 void Client::Impl::call(const RequestWrapperPtr& req, const boost::function<void(const ResponseWrapperConstPtr&)>& response_cb)
00291 {
00292 ROS_ASSERT(req);
00293 ROS_ASSERT(response_cb);
00294
00295 if (!connected_)
00296 {
00297 throw CallException("Client to [" + nh_.getNamespace() + "] is not connected");
00298 }
00299
00300 if (broken_methods_.find(req->method) != broken_methods_.end())
00301 {
00302 throw CallException("Unconnected method [" + req->method + "], see previously generated errors for more details");
00303 }
00304
00305 req->request_id = rve_common::UUID::generate();
00306
00307 RequestInfo info;
00308 info.id = req->request_id;
00309 info.response_cb = response_cb;
00310 #if REQUEST_DEBUG
00311 info.method = req->method;
00312 #endif
00313
00314 {
00315 boost::mutex::scoped_lock lock(requests_mutex_);
00316 requests_[info.id] = info;
00317 }
00318
00319 pub_.publish(req);
00320 }
00321
00322 void Client::Impl::flush()
00323 {
00324
00325 std::vector<rve_common::UUID> uuids;
00326 {
00327 boost::mutex::scoped_lock lock(requests_mutex_);
00328 M_RequestInfo::const_iterator it = requests_.begin();
00329 M_RequestInfo::const_iterator end = requests_.end();
00330 for (; it != end; ++it)
00331 {
00332 uuids.push_back(it->first);
00333 }
00334 }
00335
00336 while (true)
00337 {
00338 if (uuids.empty())
00339 {
00340 return;
00341 }
00342
00343 ros::WallDuration(0.001).sleep();
00344
00345 pump();
00346
00347 bool erased = true;
00348 while (erased && !uuids.empty())
00349 {
00350 erased = false;
00351
00352 boost::mutex::scoped_lock lock(requests_mutex_);
00353 rve_common::UUID& id = uuids.back();
00354 M_RequestInfo::iterator it = requests_.find(id);
00355 if (it == requests_.end())
00356 {
00357 uuids.pop_back();
00358 erased = true;
00359 }
00360 else
00361 {
00362 #if REQUEST_DEBUG
00363 ROS_INFO("Waiting for request on method '%s'", it->second.method.c_str());
00364 #endif
00365 }
00366 }
00367 }
00368 }
00369
00372
00373 Client::Client(const std::string& name, const ros::NodeHandle& nh)
00374 : impl_(new Impl(name, nh))
00375 {
00376
00377 }
00378
00379 void Client::connect()
00380 {
00381 impl_->connect();
00382 }
00383
00384 void Client::connectAsync()
00385 {
00386 impl_->connectAsync();
00387 }
00388
00389 bool Client::isConnected()
00390 {
00391 return impl_->isConnected();
00392 }
00393
00394 void Client::waitForConnection()
00395 {
00396 impl_->waitForConnection();
00397 }
00398
00399 void Client::call(const RequestWrapperPtr& req, const boost::function<void(const ResponseWrapperConstPtr&)>& response_cb)
00400 {
00401 impl_->call(req, response_cb);
00402 }
00403
00404 void Client::addMethod(const MethodInfo& method)
00405 {
00406 impl_->addMethod(method);
00407 }
00408
00409 void Client::pump()
00410 {
00411 impl_->pump();
00412 }
00413
00414 void Client::disconnect()
00415 {
00416 impl_->disconnect();
00417 }
00418
00419 void Client::flush()
00420 {
00421 impl_->flush();
00422 }
00423
00424 }