xmlrpc_manager.cpp
Go to the documentation of this file.
00001 /*
00002  * Copyright (C) 2009, Willow Garage, Inc.
00003  *
00004  * Redistribution and use in source and binary forms, with or without
00005  * modification, are permitted provided that the following conditions are met:
00006  *   * Redistributions of source code must retain the above copyright notice,
00007  *     this list of conditions and the following disclaimer.
00008  *   * Redistributions in binary form must reproduce the above copyright
00009  *     notice, this list of conditions and the following disclaimer in the
00010  *     documentation and/or other materials provided with the distribution.
00011  *   * Neither the names of Willow Garage, Inc. nor the names of its
00012  *     contributors may be used to endorse or promote products derived from
00013  *     this software without specific prior written permission.
00014  *
00015  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
00016  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
00017  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
00018  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
00019  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
00020  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
00021  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
00022  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
00023  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
00024  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
00025  * POSSIBILITY OF SUCH DAMAGE.
00026  */
00027 
00028 #include "ros/xmlrpc_manager.h"
00029 #include "ros/network.h"
00030 #include "ros/param.h"
00031 #include "ros/assert.h"
00032 #include "ros/common.h"
00033 #include "ros/file_log.h"
00034 #include "ros/io.h"
00035 
00036 using namespace XmlRpc;
00037 
00038 namespace ros
00039 {
00040 
00041 namespace xmlrpc
00042 {
00043 XmlRpc::XmlRpcValue responseStr(int code, const std::string& msg, const std::string& response)
00044 {
00045   XmlRpc::XmlRpcValue v;
00046   v[0] = code;
00047   v[1] = msg;
00048   v[2] = response;
00049   return v;
00050 }
00051 
00052 XmlRpc::XmlRpcValue responseInt(int code, const std::string& msg, int response)
00053 {
00054   XmlRpc::XmlRpcValue v;
00055   v[0] = int(code);
00056   v[1] = msg;
00057   v[2] = response;
00058   return v;
00059 }
00060 
00061 XmlRpc::XmlRpcValue responseBool(int code, const std::string& msg, bool response)
00062 {
00063   XmlRpc::XmlRpcValue v;
00064   v[0] = int(code);
00065   v[1] = msg;
00066   v[2] = XmlRpc::XmlRpcValue(response);
00067   return v;
00068 }
00069 }
00070 
00071 class XMLRPCCallWrapper : public XmlRpcServerMethod
00072 {
00073 public:
00074   XMLRPCCallWrapper(const std::string& function_name, const XMLRPCFunc& cb, XmlRpcServer *s)
00075   : XmlRpcServerMethod(function_name, s)
00076   , name_(function_name)
00077   , func_(cb)
00078   { }
00079 
00080   void execute(XmlRpcValue &params, XmlRpcValue &result)
00081   {
00082     func_(params, result);
00083   }
00084 
00085 private:
00086   std::string name_;
00087   XMLRPCFunc func_;
00088 };
00089 
00090 void getPid(const XmlRpcValue& params, XmlRpcValue& result)
00091 {
00092   (void)params;
00093   result = xmlrpc::responseInt(1, "", (int)getpid());
00094 }
00095 
00096 const ros::WallDuration CachedXmlRpcClient::s_zombie_time_(30.0); // reap after 30 seconds
00097 
00098 XMLRPCManagerPtr g_xmlrpc_manager;
00099 boost::mutex g_xmlrpc_manager_mutex;
00100 const XMLRPCManagerPtr& XMLRPCManager::instance()
00101 {
00102   if (!g_xmlrpc_manager)
00103   {
00104     boost::mutex::scoped_lock lock(g_xmlrpc_manager_mutex);
00105     if (!g_xmlrpc_manager)
00106     {
00107       g_xmlrpc_manager.reset(new XMLRPCManager);
00108     }
00109   }
00110 
00111   return g_xmlrpc_manager;
00112 }
00113 
00114 XMLRPCManager::XMLRPCManager()
00115 : port_(0)
00116 , shutting_down_(false)
00117 , unbind_requested_(false)
00118 {
00119 }
00120 
00121 XMLRPCManager::~XMLRPCManager()
00122 {
00123   shutdown();
00124 }
00125 
00126 void XMLRPCManager::start()
00127 {
00128   shutting_down_ = false;
00129   port_ = 0;
00130   bind("getPid", getPid);
00131 
00132   bool bound = server_.bindAndListen(0);
00133   (void) bound;
00134   ROS_ASSERT(bound);
00135   port_ = server_.get_port();
00136   ROS_ASSERT(port_ != 0);
00137 
00138   std::stringstream ss;
00139   ss << "http://" << network::getHost() << ":" << port_ << "/";
00140   uri_ = ss.str();
00141 
00142   server_thread_ = boost::thread(boost::bind(&XMLRPCManager::serverThreadFunc, this));
00143 }
00144 
00145 void XMLRPCManager::shutdown()
00146 {
00147   if (shutting_down_)
00148   {
00149     return;
00150   }
00151 
00152   shutting_down_ = true;
00153   server_thread_.join();
00154 
00155   server_.close();
00156 
00157   // kill the last few clients that were started in the shutdown process
00158   for (V_CachedXmlRpcClient::iterator i = clients_.begin();
00159        i != clients_.end(); ++i)
00160   {
00161     for (int wait_count = 0; i->in_use_ && wait_count < 10; wait_count++)
00162     {
00163       ROSCPP_LOG_DEBUG("waiting for xmlrpc connection to finish...");
00164       ros::WallDuration(0.01).sleep();
00165     }
00166 
00167     i->client_->close();
00168     delete i->client_;
00169   }
00170 
00171   clients_.clear();
00172 
00173   boost::mutex::scoped_lock lock(functions_mutex_);
00174   functions_.clear();
00175 
00176   {
00177     S_ASyncXMLRPCConnection::iterator it = connections_.begin();
00178     S_ASyncXMLRPCConnection::iterator end = connections_.end();
00179     for (; it != end; ++it)
00180     {
00181       (*it)->removeFromDispatch(server_.get_dispatch());
00182     }
00183   }
00184 
00185   connections_.clear();
00186 
00187   {
00188     boost::mutex::scoped_lock lock(added_connections_mutex_);
00189     added_connections_.clear();
00190   }
00191 
00192   {
00193     boost::mutex::scoped_lock lock(removed_connections_mutex_);
00194     removed_connections_.clear();
00195   }
00196 }
00197 
00198 bool XMLRPCManager::validateXmlrpcResponse(const std::string& method, XmlRpcValue &response,
00199                                     XmlRpcValue &payload)
00200 {
00201   if (response.getType() != XmlRpcValue::TypeArray)
00202   {
00203     ROSCPP_LOG_DEBUG("XML-RPC call [%s] didn't return an array",
00204         method.c_str());
00205     return false;
00206   }
00207   if (response.size() != 2 && response.size() != 3)
00208   {
00209     ROSCPP_LOG_DEBUG("XML-RPC call [%s] didn't return a 2 or 3-element array",
00210         method.c_str());
00211     return false;
00212   }
00213   if (response[0].getType() != XmlRpcValue::TypeInt)
00214   {
00215     ROSCPP_LOG_DEBUG("XML-RPC call [%s] didn't return a int as the 1st element",
00216         method.c_str());
00217     return false;
00218   }
00219   int status_code = response[0];
00220   if (response[1].getType() != XmlRpcValue::TypeString)
00221   {
00222     ROSCPP_LOG_DEBUG("XML-RPC call [%s] didn't return a string as the 2nd element",
00223         method.c_str());
00224     return false;
00225   }
00226   std::string status_string = response[1];
00227   if (status_code != 1)
00228   {
00229     ROSCPP_LOG_DEBUG("XML-RPC call [%s] returned an error (%d): [%s]",
00230         method.c_str(), status_code, status_string.c_str());
00231     return false;
00232   }
00233   if (response.size() > 2)
00234   {
00235     payload = response[2];
00236   }
00237   else
00238   {
00239     std::string empty_array = "<value><array><data></data></array></value>";
00240     int offset = 0;
00241     payload = XmlRpcValue(empty_array, &offset);
00242   }
00243   return true;
00244 }
00245 
00246 void XMLRPCManager::serverThreadFunc()
00247 {
00248   disableAllSignalsInThisThread();
00249 
00250   while(!shutting_down_)
00251   {
00252     {
00253       boost::mutex::scoped_lock lock(added_connections_mutex_);
00254       S_ASyncXMLRPCConnection::iterator it = added_connections_.begin();
00255       S_ASyncXMLRPCConnection::iterator end = added_connections_.end();
00256       for (; it != end; ++it)
00257       {
00258         (*it)->addToDispatch(server_.get_dispatch());
00259         connections_.insert(*it);
00260       }
00261 
00262       added_connections_.clear();
00263     }
00264 
00265     // Update the XMLRPC server, blocking for at most 100ms in select()
00266     {
00267       boost::mutex::scoped_lock lock(functions_mutex_);
00268       server_.work(0.1);
00269     }
00270 
00271     while (unbind_requested_)
00272     {
00273       WallDuration(0.01).sleep();
00274     }
00275 
00276     if (shutting_down_)
00277     {
00278       return;
00279     }
00280 
00281     {
00282       S_ASyncXMLRPCConnection::iterator it = connections_.begin();
00283       S_ASyncXMLRPCConnection::iterator end = connections_.end();
00284       for (; it != end; ++it)
00285       {
00286         if ((*it)->check())
00287         {
00288           removeASyncConnection(*it);
00289         }
00290       }
00291     }
00292 
00293     {
00294       boost::mutex::scoped_lock lock(removed_connections_mutex_);
00295       S_ASyncXMLRPCConnection::iterator it = removed_connections_.begin();
00296       S_ASyncXMLRPCConnection::iterator end = removed_connections_.end();
00297       for (; it != end; ++it)
00298       {
00299         (*it)->removeFromDispatch(server_.get_dispatch());
00300         connections_.erase(*it);
00301       }
00302 
00303       removed_connections_.clear();
00304     }
00305   }
00306 }
00307 
00308 XmlRpcClient* XMLRPCManager::getXMLRPCClient(const std::string &host, const int port, const std::string &uri)
00309 {
00310   // go through our vector of clients and grab the first available one
00311   XmlRpcClient *c = NULL;
00312 
00313   boost::mutex::scoped_lock lock(clients_mutex_);
00314 
00315   for (V_CachedXmlRpcClient::iterator i = clients_.begin();
00316        !c && i != clients_.end(); )
00317   {
00318     if (!i->in_use_)
00319     {
00320       // see where it's pointing
00321       if (i->client_->getHost() == host &&
00322           i->client_->getPort() == port &&
00323           i->client_->getUri()  == uri)
00324       {
00325         // hooray, it's pointing at our destination. re-use it.
00326         c = i->client_;
00327         i->in_use_ = true;
00328         i->last_use_time_ = WallTime::now();
00329         break;
00330       }
00331       else if (i->last_use_time_ + CachedXmlRpcClient::s_zombie_time_ < WallTime::now())
00332       {
00333         // toast this guy. he's dead and nobody is reusing him.
00334         delete i->client_;
00335         i = clients_.erase(i);
00336       }
00337       else
00338       {
00339         ++i; // move along. this guy isn't dead yet.
00340       }
00341     }
00342     else
00343     {
00344       ++i;
00345     }
00346   }
00347 
00348   if (!c)
00349   {
00350     // allocate a new one
00351     c = new XmlRpcClient(host.c_str(), port, uri.c_str());
00352     CachedXmlRpcClient mc(c);
00353     mc.in_use_ = true;
00354     mc.last_use_time_ = WallTime::now();
00355     clients_.push_back(mc);
00356     //ROS_INFO("%d xmlrpc clients allocated\n", xmlrpc_clients.size());
00357   }
00358   // ONUS IS ON THE RECEIVER TO UNSET THE IN_USE FLAG
00359   // by calling releaseXMLRPCClient
00360   return c;
00361 }
00362 
00363 void XMLRPCManager::releaseXMLRPCClient(XmlRpcClient *c)
00364 {
00365   boost::mutex::scoped_lock lock(clients_mutex_);
00366 
00367   for (V_CachedXmlRpcClient::iterator i = clients_.begin();
00368        i != clients_.end(); ++i)
00369   {
00370     if (c == i->client_)
00371     {
00372       i->in_use_ = false;
00373       break;
00374     }
00375   }
00376 }
00377 
00378 void XMLRPCManager::addASyncConnection(const ASyncXMLRPCConnectionPtr& conn)
00379 {
00380   boost::mutex::scoped_lock lock(added_connections_mutex_);
00381   added_connections_.insert(conn);
00382 }
00383 
00384 void XMLRPCManager::removeASyncConnection(const ASyncXMLRPCConnectionPtr& conn)
00385 {
00386   boost::mutex::scoped_lock lock(removed_connections_mutex_);
00387   removed_connections_.insert(conn);
00388 }
00389 
00390 bool XMLRPCManager::bind(const std::string& function_name, const XMLRPCFunc& cb)
00391 {
00392   boost::mutex::scoped_lock lock(functions_mutex_);
00393   if (functions_.find(function_name) != functions_.end())
00394   {
00395     return false;
00396   }
00397 
00398   FunctionInfo info;
00399   info.name = function_name;
00400   info.function = cb;
00401   info.wrapper.reset(new XMLRPCCallWrapper(function_name, cb, &server_));
00402   functions_[function_name] = info;
00403 
00404   return true;
00405 }
00406 
00407 void XMLRPCManager::unbind(const std::string& function_name)
00408 {
00409   unbind_requested_ = true;
00410   boost::mutex::scoped_lock lock(functions_mutex_);
00411   functions_.erase(function_name);
00412   unbind_requested_ = false;
00413 }
00414 
00415 } // namespace ros


roscpp
Author(s): Morgan Quigley, Josh Faust, Brian Gerkey, Troy Straszheim
autogenerated on Tue Mar 7 2017 03:44:47