Go to the documentation of this file.00001 
00002 
00003 
00004 
00005 
00006 
00007 
00008 
00009 
00010 
00011 
00012 
00013 
00014 
00015 
00016 
00017 
00018 
00019 
00020 
00021 
00022 
00023 
00024 
00025 
00026 
00027 
00028 
00029 
00030 #include <rve_rpc/client.h>
00031 
00032 #include <ros/ros.h>
00033 #include <ros/callback_queue.h>
00034 
00035 #include <rve_rpc/Request.h>
00036 #include <rve_rpc/Response.h>
00037 #include <rve_rpc/ProtocolMethodsAdded.h>
00038 #include <rve_rpc/ProtocolClientConnect.h>
00039 
00040 #include <boost/unordered_set.hpp>
00041 
00042 #define REQUEST_DEBUG 1
00043 
00044 namespace rve_rpc
00045 {
00046 
00047 struct Client::Impl
00048 {
00049   Impl(const std::string& name, const ros::NodeHandle& nh);
00050   ~Impl();
00051 
00052   void connect();
00053   void connectAsync();
00054   bool isConnected() { return connected_; }
00055   void disconnect();
00056   void call(const RequestWrapperPtr& req, const boost::function<void(const ResponseWrapperConstPtr&)>& response_cb);
00057   void pump();
00058   void flush();
00059   void cb(const ros::MessageEvent<ResponseWrapper const>& res);
00060   void addMethod(const MethodInfo& method);
00061   void handleMethodsAdded(const ros::MessageEvent<ResponseWrapper const>& evt);
00062 
00063   void subAndPub();
00064   void waitForConnection();
00065   void sendConnected();
00066 
00067   ros::CallbackQueue cbqueue_;
00068   std::vector<ros::Subscriber> subs_;
00069   ros::Publisher pub_;
00070   ros::NodeHandle nh_;
00071 
00072   struct RequestInfo
00073   {
00074     RequestInfo()
00075     {}
00076 
00077     rve_common::UUID id;
00078     boost::function<void(const ResponseWrapperConstPtr&)> response_cb;
00079 
00080 #if REQUEST_DEBUG
00081     std::string method;
00082 #endif
00083   };
00084 
00085   typedef std::map<rve_common::UUID, RequestInfo> M_RequestInfo;
00086   M_RequestInfo requests_;
00087   boost::mutex requests_mutex_;
00088 
00089   typedef std::map<std::string, MethodInfo> M_MethodInfo;
00090   M_MethodInfo methods_;
00091 
00092   typedef std::set<std::string> S_string;
00093   S_string broken_methods_;
00094 
00095   bool connected_;
00096 
00097   void onQueueTimer(const ros::WallTimerEvent&);
00098   ros::WallTimer queue_timer_;
00099 };
00100 
00101 Client::Impl::Impl(const std::string& name, const ros::NodeHandle& nh)
00102 : nh_(nh, name)
00103 , connected_(false)
00104 {
00105 }
00106 
00107 Client::Impl::~Impl()
00108 {
00109   
00110   pub_.shutdown();
00111   subs_.clear();
00112   queue_timer_.stop();
00113 }
00114 
00115 void Client::Impl::subAndPub()
00116 {
00117   ROS_ASSERT(!pub_);
00118   ROS_ASSERT(subs_.empty());
00119   ROS_ASSERT(!queue_timer_);
00120 
00121   pub_ = nh_.advertise<RequestWrapper>("request", 0);
00122 
00123   
00124   
00125   
00126   
00127   
00128   
00129   
00130   
00131   ros::SubscribeOptions sub_ops;
00132   sub_ops.initByFullCallbackType<const ros::MessageEvent<ResponseWrapper>&>("response", 0, boost::bind(&Impl::cb, this, _1));
00133   sub_ops.callback_queue = &cbqueue_;
00134   subs_.push_back(nh_.subscribe(sub_ops));
00135   subs_.push_back(nh_.subscribe("response", 0, &Impl::cb, this));
00136 
00137   queue_timer_ = nh_.createWallTimer(ros::WallDuration(0.05), &Impl::onQueueTimer, this);
00138 }
00139 
00140 void Client::Impl::connect()
00141 {
00142   subAndPub();
00143   waitForConnection();
00144 }
00145 
00146 void Client::Impl::disconnect()
00147 {
00148   connected_ = false;
00149   subs_.clear();
00150   pub_.shutdown();
00151   queue_timer_ = ros::WallTimer();
00152 }
00153 
00154 void Client::Impl::waitForConnection()
00155 {
00156   
00157   
00158   while (!pub_.getNumSubscribers() || !subs_[0].getNumPublishers())
00159   {
00160     cbqueue_.callAvailable(ros::WallDuration(0.1));
00161   }
00162 
00163   sendConnected();
00164 
00165   while (nh_.ok() && !connected_)
00166   {
00167     cbqueue_.callAvailable(ros::WallDuration(0.1));
00168   }
00169 }
00170 
00171 void Client::Impl::onQueueTimer(const ros::WallTimerEvent&)
00172 {
00173   if (!connected_)
00174   {
00175     if (pub_.getNumSubscribers() && subs_[0].getNumPublishers())
00176     {
00177       sendConnected();
00178     }
00179   }
00180 
00181   cbqueue_.callAvailable();
00182 }
00183 
00184 void Client::Impl::connectAsync()
00185 {
00186   subAndPub();
00187 }
00188 
00189 void Client::Impl::sendConnected()
00190 {
00191   std::pair<RequestWrapperPtr, boost::shared_ptr<ProtocolClientConnect> > pair = makeRequest<ProtocolClientConnect>();
00192   pair.first->request_id = rve_common::UUID::generate();
00193   pair.first->protocol = Request::PROTOCOL_CLIENT_CONNECT;
00194   pub_.publish(pair.first);
00195 }
00196 
00197 void Client::Impl::handleMethodsAdded(const ros::MessageEvent<ResponseWrapper const>& evt)
00198 {
00199   ResponseWrapperConstPtr res = evt.getMessage();
00200 
00201   try
00202   {
00203     ProtocolMethodsAddedConstPtr header = res->instantiate<ProtocolMethodsAdded>();
00204     for (size_t i = 0; i < header->methods.size(); ++i)
00205     {
00206       const MethodSpec& spec = header->methods[i];
00207 
00208       
00209       if (broken_methods_.find(spec.name) != broken_methods_.end())
00210       {
00211         continue;
00212       }
00213 
00214       M_MethodInfo::iterator it = methods_.find(spec.name);
00215       if (it != methods_.end())
00216       {
00217         MethodInfo& info = it->second;
00218         if (info.request_md5sum != spec.request_md5sum)
00219         {
00220           ROS_ERROR("RPC Server wants method [%s:%s] to have request of type [%s/%s] but I have [%s/%s]", nh_.getNamespace().c_str(), spec.name.c_str(),
00221               spec.request_datatype.c_str(), spec.request_md5sum.c_str(), info.request_datatype.c_str(), info.request_md5sum.c_str());
00222 
00223           broken_methods_.insert(info.name);
00224         }
00225         else if (info.response_md5sum != spec.response_md5sum)
00226         {
00227           ROS_ERROR("RPC Server wants method [%s:%s] to have response of type [%s/%s] but I have [%s/%s]", nh_.getNamespace().c_str(), spec.name.c_str(),
00228               spec.response_datatype.c_str(), spec.response_md5sum.c_str(), info.response_datatype.c_str(), info.response_md5sum.c_str());
00229 
00230           broken_methods_.insert(info.name);
00231         }
00232       }
00233       else
00234       {
00235         ROS_DEBUG("RPC Server for [%s] at [%s] has extra method [%s]", nh_.getNamespace().c_str(), evt.getPublisherName().c_str(), spec.name.c_str());
00236       }
00237     }
00238   }
00239   catch (std::exception& e)
00240   {
00241     ROS_ERROR("Exception thrown while processing header from rpc server for [%s] at [%s]", nh_.getNamespace().c_str(), evt.getPublisherName().c_str());
00242   }
00243 
00244   connected_ = true;
00245 }
00246 
00247 void Client::Impl::cb(const ros::MessageEvent<ResponseWrapper const>& evt)
00248 {
00249   ResponseWrapperConstPtr res = evt.getMessage();
00250   if (res->protocol != 0)
00251   {
00252     switch (res->protocol)
00253     {
00254     case Response::PROTOCOL_METHODS_ADDED:
00255       handleMethodsAdded(evt);
00256       break;
00257     default:
00258       ROS_ERROR("Unknown protocol message type [%d]", res->protocol);
00259     }
00260   }
00261   else
00262   {
00263     rve_common::UUID id = res->request_id;
00264 
00265     {
00266       boost::mutex::scoped_lock lock(requests_mutex_);
00267       M_RequestInfo::iterator it = requests_.find(id);
00268       if (it != requests_.end())
00269       {
00270         RequestInfo& ri = it->second;
00271         ri.response_cb(res);
00272         requests_.erase(it);
00273       }
00274     }
00275   }
00276 }
00277 
00278 void Client::Impl::addMethod(const MethodInfo& method)
00279 {
00280   ROS_ASSERT(methods_.count(method.name) == 0);
00281 
00282   methods_[method.name] = method;
00283 }
00284 
00285 void Client::Impl::pump()
00286 {
00287   cbqueue_.callOne(ros::WallDuration(0.1));
00288 }
00289 
00290 void Client::Impl::call(const RequestWrapperPtr& req, const boost::function<void(const ResponseWrapperConstPtr&)>& response_cb)
00291 {
00292   ROS_ASSERT(req);
00293   ROS_ASSERT(response_cb);
00294 
00295   if (!connected_)
00296   {
00297     throw CallException("Client to [" + nh_.getNamespace() + "] is not connected");
00298   }
00299 
00300   if (broken_methods_.find(req->method) != broken_methods_.end())
00301   {
00302     throw CallException("Unconnected method [" + req->method + "], see previously generated errors for more details");
00303   }
00304 
00305   req->request_id = rve_common::UUID::generate();
00306 
00307   RequestInfo info;
00308   info.id = req->request_id;
00309   info.response_cb = response_cb;
00310 #if REQUEST_DEBUG
00311   info.method = req->method;
00312 #endif
00313 
00314   {
00315     boost::mutex::scoped_lock lock(requests_mutex_);
00316     requests_[info.id] = info;
00317   }
00318 
00319   pub_.publish(req);
00320 }
00321 
00322 void Client::Impl::flush()
00323 {
00324   
00325   std::vector<rve_common::UUID> uuids;
00326   {
00327     boost::mutex::scoped_lock lock(requests_mutex_);
00328     M_RequestInfo::const_iterator it = requests_.begin();
00329     M_RequestInfo::const_iterator end = requests_.end();
00330     for (; it != end; ++it)
00331     {
00332       uuids.push_back(it->first);
00333     }
00334   }
00335 
00336   while (true)
00337   {
00338     if (uuids.empty())
00339     {
00340       return;
00341     }
00342 
00343     ros::WallDuration(0.001).sleep();
00344 
00345     pump();
00346 
00347     bool erased = true;
00348     while (erased && !uuids.empty())
00349     {
00350       erased = false;
00351 
00352       boost::mutex::scoped_lock lock(requests_mutex_);
00353       rve_common::UUID& id = uuids.back();
00354       M_RequestInfo::iterator it = requests_.find(id);
00355       if (it == requests_.end())
00356       {
00357         uuids.pop_back();
00358         erased = true;
00359       }
00360       else
00361       {
00362 #if REQUEST_DEBUG
00363         ROS_INFO("Waiting for request on method '%s'", it->second.method.c_str());
00364 #endif
00365       }
00366     }
00367   }
00368 }
00369 
00372 
00373 Client::Client(const std::string& name, const ros::NodeHandle& nh)
00374 : impl_(new Impl(name, nh))
00375 {
00376 
00377 }
00378 
00379 void Client::connect()
00380 {
00381   impl_->connect();
00382 }
00383 
00384 void Client::connectAsync()
00385 {
00386   impl_->connectAsync();
00387 }
00388 
00389 bool Client::isConnected()
00390 {
00391   return impl_->isConnected();
00392 }
00393 
00394 void Client::waitForConnection()
00395 {
00396   impl_->waitForConnection();
00397 }
00398 
00399 void Client::call(const RequestWrapperPtr& req, const boost::function<void(const ResponseWrapperConstPtr&)>& response_cb)
00400 {
00401   impl_->call(req, response_cb);
00402 }
00403 
00404 void Client::addMethod(const MethodInfo& method)
00405 {
00406   impl_->addMethod(method);
00407 }
00408 
00409 void Client::pump()
00410 {
00411   impl_->pump();
00412 }
00413 
00414 void Client::disconnect()
00415 {
00416   impl_->disconnect();
00417 }
00418 
00419 void Client::flush()
00420 {
00421   impl_->flush();
00422 }
00423 
00424 }