client.cpp
Go to the documentation of this file.
00001 /*
00002  * Copyright (c) 2010, Willow Garage, Inc.
00003  * All rights reserved.
00004  *
00005  * Redistribution and use in source and binary forms, with or without
00006  * modification, are permitted provided that the following conditions are met:
00007  *
00008  *     * Redistributions of source code must retain the above copyright
00009  *       notice, this list of conditions and the following disclaimer.
00010  *     * Redistributions in binary form must reproduce the above copyright
00011  *       notice, this list of conditions and the following disclaimer in the
00012  *       documentation and/or other materials provided with the distribution.
00013  *     * Neither the name of the Willow Garage, Inc. nor the names of its
00014  *       contributors may be used to endorse or promote products derived from
00015  *       this software without specific prior written permission.
00016  *
00017  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
00018  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
00019  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
00020  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
00021  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
00022  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
00023  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
00024  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
00025  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
00026  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
00027  * POSSIBILITY OF SUCH DAMAGE.
00028  */
00029 
00030 #include <rve_rpc/client.h>
00031 
00032 #include <ros/ros.h>
00033 #include <ros/callback_queue.h>
00034 
00035 #include <rve_rpc/Request.h>
00036 #include <rve_rpc/Response.h>
00037 #include <rve_rpc/ProtocolMethodsAdded.h>
00038 #include <rve_rpc/ProtocolClientConnect.h>
00039 
00040 #include <boost/unordered_set.hpp>
00041 
00042 #define REQUEST_DEBUG 1
00043 
00044 namespace rve_rpc
00045 {
00046 
00047 struct Client::Impl
00048 {
00049   Impl(const std::string& name, const ros::NodeHandle& nh);
00050   ~Impl();
00051 
00052   void connect();
00053   void connectAsync();
00054   bool isConnected() { return connected_; }
00055   void disconnect();
00056   void call(const RequestWrapperPtr& req, const boost::function<void(const ResponseWrapperConstPtr&)>& response_cb);
00057   void pump();
00058   void flush();
00059   void cb(const ros::MessageEvent<ResponseWrapper const>& res);
00060   void addMethod(const MethodInfo& method);
00061   void handleMethodsAdded(const ros::MessageEvent<ResponseWrapper const>& evt);
00062 
00063   void subAndPub();
00064   void waitForConnection();
00065   void sendConnected();
00066 
00067   ros::CallbackQueue cbqueue_;
00068   std::vector<ros::Subscriber> subs_;
00069   ros::Publisher pub_;
00070   ros::NodeHandle nh_;
00071 
00072   struct RequestInfo
00073   {
00074     RequestInfo()
00075     {}
00076 
00077     rve_common::UUID id;
00078     boost::function<void(const ResponseWrapperConstPtr&)> response_cb;
00079 
00080 #if REQUEST_DEBUG
00081     std::string method;
00082 #endif
00083   };
00084 
00085   typedef std::map<rve_common::UUID, RequestInfo> M_RequestInfo;
00086   M_RequestInfo requests_;
00087   boost::mutex requests_mutex_;
00088 
00089   typedef std::map<std::string, MethodInfo> M_MethodInfo;
00090   M_MethodInfo methods_;
00091 
00092   typedef std::set<std::string> S_string;
00093   S_string broken_methods_;
00094 
00095   bool connected_;
00096 
00097   void onQueueTimer(const ros::WallTimerEvent&);
00098   ros::WallTimer queue_timer_;
00099 };
00100 
00101 Client::Impl::Impl(const std::string& name, const ros::NodeHandle& nh)
00102 : nh_(nh, name)
00103 , connected_(false)
00104 {
00105 }
00106 
00107 Client::Impl::~Impl()
00108 {
00109   // Ensure that by the time we hit the automatic destructor we're not getting any callbacks
00110   pub_.shutdown();
00111   subs_.clear();
00112   queue_timer_.stop();
00113 }
00114 
00115 void Client::Impl::subAndPub()
00116 {
00117   ROS_ASSERT(!pub_);
00118   ROS_ASSERT(subs_.empty());
00119   ROS_ASSERT(!queue_timer_);
00120 
00121   pub_ = nh_.advertise<RequestWrapper>("request", 0);
00122 
00123   // We subscribe to "response" twice, once with our internal callback queue, and once with the queue used by
00124   // the NodeHandle we were initialized with.  This is so that we're not limited by the 50ms update rate of the
00125   // internal queue for receiving responses -- however fast the external queue is being serviced, we'll be
00126   // serviced that quickly as well
00127   //
00128   // This creates a little bit of extra work when we receive a response, since technically we'll receive response twice,
00129   // but it's a relatively small amount of work.  If it turns out to be a problem, another solution is to spin a separate
00130   // thread that's servicing the queue
00131   ros::SubscribeOptions sub_ops;
00132   sub_ops.initByFullCallbackType<const ros::MessageEvent<ResponseWrapper>&>("response", 0, boost::bind(&Impl::cb, this, _1));
00133   sub_ops.callback_queue = &cbqueue_;
00134   subs_.push_back(nh_.subscribe(sub_ops));
00135   subs_.push_back(nh_.subscribe("response", 0, &Impl::cb, this));
00136 
00137   queue_timer_ = nh_.createWallTimer(ros::WallDuration(0.05), &Impl::onQueueTimer, this);
00138 }
00139 
00140 void Client::Impl::connect()
00141 {
00142   subAndPub();
00143   waitForConnection();
00144 }
00145 
00146 void Client::Impl::disconnect()
00147 {
00148   connected_ = false;
00149   subs_.clear();
00150   pub_.shutdown();
00151   queue_timer_ = ros::WallTimer();
00152 }
00153 
00154 void Client::Impl::waitForConnection()
00155 {
00156   // TODO this may break if there are multiple servers (which is a broken situation anyway).
00157   // we assume only one server
00158   while (!pub_.getNumSubscribers() || !subs_[0].getNumPublishers())
00159   {
00160     cbqueue_.callAvailable(ros::WallDuration(0.1));
00161   }
00162 
00163   sendConnected();
00164 
00165   while (nh_.ok() && !connected_)
00166   {
00167     cbqueue_.callAvailable(ros::WallDuration(0.1));
00168   }
00169 }
00170 
00171 void Client::Impl::onQueueTimer(const ros::WallTimerEvent&)
00172 {
00173   if (!connected_)
00174   {
00175     if (pub_.getNumSubscribers() && subs_[0].getNumPublishers())
00176     {
00177       sendConnected();
00178     }
00179   }
00180 
00181   cbqueue_.callAvailable();
00182 }
00183 
00184 void Client::Impl::connectAsync()
00185 {
00186   subAndPub();
00187 }
00188 
00189 void Client::Impl::sendConnected()
00190 {
00191   std::pair<RequestWrapperPtr, boost::shared_ptr<ProtocolClientConnect> > pair = makeRequest<ProtocolClientConnect>();
00192   pair.first->request_id = rve_common::UUID::generate();
00193   pair.first->protocol = Request::PROTOCOL_CLIENT_CONNECT;
00194   pub_.publish(pair.first);
00195 }
00196 
00197 void Client::Impl::handleMethodsAdded(const ros::MessageEvent<ResponseWrapper const>& evt)
00198 {
00199   ResponseWrapperConstPtr res = evt.getMessage();
00200 
00201   try
00202   {
00203     ProtocolMethodsAddedConstPtr header = res->instantiate<ProtocolMethodsAdded>();
00204     for (size_t i = 0; i < header->methods.size(); ++i)
00205     {
00206       const MethodSpec& spec = header->methods[i];
00207 
00208       // Already been notified about this one, and we know it's broken, don't warn about it again
00209       if (broken_methods_.find(spec.name) != broken_methods_.end())
00210       {
00211         continue;
00212       }
00213 
00214       M_MethodInfo::iterator it = methods_.find(spec.name);
00215       if (it != methods_.end())
00216       {
00217         MethodInfo& info = it->second;
00218         if (info.request_md5sum != spec.request_md5sum)
00219         {
00220           ROS_ERROR("RPC Server wants method [%s:%s] to have request of type [%s/%s] but I have [%s/%s]", nh_.getNamespace().c_str(), spec.name.c_str(),
00221               spec.request_datatype.c_str(), spec.request_md5sum.c_str(), info.request_datatype.c_str(), info.request_md5sum.c_str());
00222 
00223           broken_methods_.insert(info.name);
00224         }
00225         else if (info.response_md5sum != spec.response_md5sum)
00226         {
00227           ROS_ERROR("RPC Server wants method [%s:%s] to have response of type [%s/%s] but I have [%s/%s]", nh_.getNamespace().c_str(), spec.name.c_str(),
00228               spec.response_datatype.c_str(), spec.response_md5sum.c_str(), info.response_datatype.c_str(), info.response_md5sum.c_str());
00229 
00230           broken_methods_.insert(info.name);
00231         }
00232       }
00233       else
00234       {
00235         ROS_DEBUG("RPC Server for [%s] at [%s] has extra method [%s]", nh_.getNamespace().c_str(), evt.getPublisherName().c_str(), spec.name.c_str());
00236       }
00237     }
00238   }
00239   catch (std::exception& e)
00240   {
00241     ROS_ERROR("Exception thrown while processing header from rpc server for [%s] at [%s]", nh_.getNamespace().c_str(), evt.getPublisherName().c_str());
00242   }
00243 
00244   connected_ = true;
00245 }
00246 
00247 void Client::Impl::cb(const ros::MessageEvent<ResponseWrapper const>& evt)
00248 {
00249   ResponseWrapperConstPtr res = evt.getMessage();
00250   if (res->protocol != 0)
00251   {
00252     switch (res->protocol)
00253     {
00254     case Response::PROTOCOL_METHODS_ADDED:
00255       handleMethodsAdded(evt);
00256       break;
00257     default:
00258       ROS_ERROR("Unknown protocol message type [%d]", res->protocol);
00259     }
00260   }
00261   else
00262   {
00263     rve_common::UUID id = res->request_id;
00264 
00265     {
00266       boost::mutex::scoped_lock lock(requests_mutex_);
00267       M_RequestInfo::iterator it = requests_.find(id);
00268       if (it != requests_.end())
00269       {
00270         RequestInfo& ri = it->second;
00271         ri.response_cb(res);
00272         requests_.erase(it);
00273       }
00274     }
00275   }
00276 }
00277 
00278 void Client::Impl::addMethod(const MethodInfo& method)
00279 {
00280   ROS_ASSERT(methods_.count(method.name) == 0);
00281 
00282   methods_[method.name] = method;
00283 }
00284 
00285 void Client::Impl::pump()
00286 {
00287   cbqueue_.callOne(ros::WallDuration(0.1));
00288 }
00289 
00290 void Client::Impl::call(const RequestWrapperPtr& req, const boost::function<void(const ResponseWrapperConstPtr&)>& response_cb)
00291 {
00292   ROS_ASSERT(req);
00293   ROS_ASSERT(response_cb);
00294 
00295   if (!connected_)
00296   {
00297     throw CallException("Client to [" + nh_.getNamespace() + "] is not connected");
00298   }
00299 
00300   if (broken_methods_.find(req->method) != broken_methods_.end())
00301   {
00302     throw CallException("Unconnected method [" + req->method + "], see previously generated errors for more details");
00303   }
00304 
00305   req->request_id = rve_common::UUID::generate();
00306 
00307   RequestInfo info;
00308   info.id = req->request_id;
00309   info.response_cb = response_cb;
00310 #if REQUEST_DEBUG
00311   info.method = req->method;
00312 #endif
00313 
00314   {
00315     boost::mutex::scoped_lock lock(requests_mutex_);
00316     requests_[info.id] = info;
00317   }
00318 
00319   pub_.publish(req);
00320 }
00321 
00322 void Client::Impl::flush()
00323 {
00324   // Copy the ids of all the requests.  Use that to determine when we're done
00325   std::vector<rve_common::UUID> uuids;
00326   {
00327     boost::mutex::scoped_lock lock(requests_mutex_);
00328     M_RequestInfo::const_iterator it = requests_.begin();
00329     M_RequestInfo::const_iterator end = requests_.end();
00330     for (; it != end; ++it)
00331     {
00332       uuids.push_back(it->first);
00333     }
00334   }
00335 
00336   while (true)
00337   {
00338     if (uuids.empty())
00339     {
00340       return;
00341     }
00342 
00343     ros::WallDuration(0.001).sleep();
00344 
00345     pump();
00346 
00347     bool erased = true;
00348     while (erased && !uuids.empty())
00349     {
00350       erased = false;
00351 
00352       boost::mutex::scoped_lock lock(requests_mutex_);
00353       rve_common::UUID& id = uuids.back();
00354       M_RequestInfo::iterator it = requests_.find(id);
00355       if (it == requests_.end())
00356       {
00357         uuids.pop_back();
00358         erased = true;
00359       }
00360       else
00361       {
00362 #if REQUEST_DEBUG
00363         ROS_INFO("Waiting for request on method '%s'", it->second.method.c_str());
00364 #endif
00365       }
00366     }
00367   }
00368 }
00369 
00372 
00373 Client::Client(const std::string& name, const ros::NodeHandle& nh)
00374 : impl_(new Impl(name, nh))
00375 {
00376 
00377 }
00378 
00379 void Client::connect()
00380 {
00381   impl_->connect();
00382 }
00383 
00384 void Client::connectAsync()
00385 {
00386   impl_->connectAsync();
00387 }
00388 
00389 bool Client::isConnected()
00390 {
00391   return impl_->isConnected();
00392 }
00393 
00394 void Client::waitForConnection()
00395 {
00396   impl_->waitForConnection();
00397 }
00398 
00399 void Client::call(const RequestWrapperPtr& req, const boost::function<void(const ResponseWrapperConstPtr&)>& response_cb)
00400 {
00401   impl_->call(req, response_cb);
00402 }
00403 
00404 void Client::addMethod(const MethodInfo& method)
00405 {
00406   impl_->addMethod(method);
00407 }
00408 
00409 void Client::pump()
00410 {
00411   impl_->pump();
00412 }
00413 
00414 void Client::disconnect()
00415 {
00416   impl_->disconnect();
00417 }
00418 
00419 void Client::flush()
00420 {
00421   impl_->flush();
00422 }
00423 
00424 } // namespace rve_rpc


rve_rpc
Author(s): Josh Faust
autogenerated on Wed Dec 11 2013 14:30:53