xmlrpc_manager.cpp
Go to the documentation of this file.
00001 /*
00002  * Copyright (C) 2009, Willow Garage, Inc.
00003  *
00004  * Redistribution and use in source and binary forms, with or without
00005  * modification, are permitted provided that the following conditions are met:
00006  *   * Redistributions of source code must retain the above copyright notice,
00007  *     this list of conditions and the following disclaimer.
00008  *   * Redistributions in binary form must reproduce the above copyright
00009  *     notice, this list of conditions and the following disclaimer in the
00010  *     documentation and/or other materials provided with the distribution.
00011  *   * Neither the names of Willow Garage, Inc. nor the names of its
00012  *     contributors may be used to endorse or promote products derived from
00013  *     this software without specific prior written permission.
00014  *
00015  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
00016  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
00017  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
00018  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
00019  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
00020  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
00021  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
00022  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
00023  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
00024  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
00025  * POSSIBILITY OF SUCH DAMAGE.
00026  */
00027 
00028 #include "ros/xmlrpc_manager.h"
00029 #include "ros/network.h"
00030 #include "ros/param.h"
00031 #include "ros/assert.h"
00032 #include "ros/common.h"
00033 #include "ros/file_log.h"
00034 #include "ros/io.h"
00035 
00036 using namespace XmlRpc;
00037 
00038 namespace ros
00039 {
00040 
00041 namespace xmlrpc
00042 {
00043 XmlRpc::XmlRpcValue responseStr(int code, const std::string& msg, const std::string& response)
00044 {
00045   XmlRpc::XmlRpcValue v;
00046   v[0] = code;
00047   v[1] = msg;
00048   v[2] = response;
00049   return v;
00050 }
00051 
00052 XmlRpc::XmlRpcValue responseInt(int code, const std::string& msg, int response)
00053 {
00054   XmlRpc::XmlRpcValue v;
00055   v[0] = int(code);
00056   v[1] = msg;
00057   v[2] = response;
00058   return v;
00059 }
00060 
00061 XmlRpc::XmlRpcValue responseBool(int code, const std::string& msg, bool response)
00062 {
00063   XmlRpc::XmlRpcValue v;
00064   v[0] = int(code);
00065   v[1] = msg;
00066   v[2] = XmlRpc::XmlRpcValue(response);
00067   return v;
00068 }
00069 }
00070 
00071 class XMLRPCCallWrapper : public XmlRpcServerMethod
00072 {
00073 public:
00074   XMLRPCCallWrapper(const std::string& function_name, const XMLRPCFunc& cb, XmlRpcServer *s)
00075   : XmlRpcServerMethod(function_name, s)
00076   , name_(function_name)
00077   , func_(cb)
00078   { }
00079 
00080   void execute(XmlRpcValue &params, XmlRpcValue &result)
00081   {
00082     func_(params, result);
00083   }
00084 
00085 private:
00086   std::string name_;
00087   XMLRPCFunc func_;
00088 };
00089 
00090 void getPid(const XmlRpcValue& params, XmlRpcValue& result)
00091 {
00092   result = xmlrpc::responseInt(1, "", (int)getpid());
00093 }
00094 
00095 const ros::WallDuration CachedXmlRpcClient::s_zombie_time_(30.0); // reap after 30 seconds
00096 
00097 XMLRPCManagerPtr g_xmlrpc_manager;
00098 boost::mutex g_xmlrpc_manager_mutex;
00099 const XMLRPCManagerPtr& XMLRPCManager::instance()
00100 {
00101   if (!g_xmlrpc_manager)
00102   {
00103     boost::mutex::scoped_lock lock(g_xmlrpc_manager_mutex);
00104     if (!g_xmlrpc_manager)
00105     {
00106       g_xmlrpc_manager.reset(new XMLRPCManager);
00107     }
00108   }
00109 
00110   return g_xmlrpc_manager;
00111 }
00112 
00113 XMLRPCManager::XMLRPCManager()
00114 : port_(0)
00115 , shutting_down_(false)
00116 , unbind_requested_(false)
00117 {
00118 }
00119 
00120 XMLRPCManager::~XMLRPCManager()
00121 {
00122   shutdown();
00123 }
00124 
00125 void XMLRPCManager::start()
00126 {
00127   shutting_down_ = false;
00128   port_ = 0;
00129   bind("getPid", getPid);
00130 
00131   bool bound = server_.bindAndListen(0);
00132   (void) bound;
00133   ROS_ASSERT(bound);
00134   port_ = server_.get_port();
00135   ROS_ASSERT(port_ != 0);
00136 
00137   std::stringstream ss;
00138   ss << "http://" << network::getHost() << ":" << port_ << "/";
00139   uri_ = ss.str();
00140 
00141   server_thread_ = boost::thread(boost::bind(&XMLRPCManager::serverThreadFunc, this));
00142 }
00143 
00144 void XMLRPCManager::shutdown()
00145 {
00146   if (shutting_down_)
00147   {
00148     return;
00149   }
00150 
00151   shutting_down_ = true;
00152   server_thread_.join();
00153 
00154   server_.close();
00155 
00156   // kill the last few clients that were started in the shutdown process
00157   for (V_CachedXmlRpcClient::iterator i = clients_.begin();
00158        i != clients_.end(); ++i)
00159   {
00160     for (int wait_count = 0; i->in_use_ && wait_count < 10; wait_count++)
00161     {
00162       ROSCPP_LOG_DEBUG("waiting for xmlrpc connection to finish...");
00163       ros::WallDuration(0.01).sleep();
00164     }
00165 
00166     i->client_->close();
00167     delete i->client_;
00168   }
00169 
00170   clients_.clear();
00171 
00172   boost::mutex::scoped_lock lock(functions_mutex_);
00173   functions_.clear();
00174 
00175   {
00176     S_ASyncXMLRPCConnection::iterator it = connections_.begin();
00177     S_ASyncXMLRPCConnection::iterator end = connections_.end();
00178     for (; it != end; ++it)
00179     {
00180       (*it)->removeFromDispatch(server_.get_dispatch());
00181     }
00182   }
00183 
00184   connections_.clear();
00185 
00186   {
00187     boost::mutex::scoped_lock lock(added_connections_mutex_);
00188     added_connections_.clear();
00189   }
00190 
00191   {
00192     boost::mutex::scoped_lock lock(removed_connections_mutex_);
00193     removed_connections_.clear();
00194   }
00195 }
00196 
00197 bool XMLRPCManager::validateXmlrpcResponse(const std::string& method, XmlRpcValue &response,
00198                                     XmlRpcValue &payload)
00199 {
00200   if (response.getType() != XmlRpcValue::TypeArray)
00201   {
00202     ROSCPP_LOG_DEBUG("XML-RPC call [%s] didn't return an array",
00203         method.c_str());
00204     return false;
00205   }
00206   if (response.size() != 2 && response.size() != 3)
00207   {
00208     ROSCPP_LOG_DEBUG("XML-RPC call [%s] didn't return a 2 or 3-element array",
00209         method.c_str());
00210     return false;
00211   }
00212   if (response[0].getType() != XmlRpcValue::TypeInt)
00213   {
00214     ROSCPP_LOG_DEBUG("XML-RPC call [%s] didn't return a int as the 1st element",
00215         method.c_str());
00216     return false;
00217   }
00218   int status_code = response[0];
00219   if (response[1].getType() != XmlRpcValue::TypeString)
00220   {
00221     ROSCPP_LOG_DEBUG("XML-RPC call [%s] didn't return a string as the 2nd element",
00222         method.c_str());
00223     return false;
00224   }
00225   std::string status_string = response[1];
00226   if (status_code != 1)
00227   {
00228     ROSCPP_LOG_DEBUG("XML-RPC call [%s] returned an error (%d): [%s]",
00229         method.c_str(), status_code, status_string.c_str());
00230     return false;
00231   }
00232   if (response.size() > 2)
00233   {
00234     payload = response[2];
00235   }
00236   else
00237   {
00238     std::string empty_array = "<value><array><data></data></array></value>";
00239     int offset = 0;
00240     payload = XmlRpcValue(empty_array, &offset);
00241   }
00242   return true;
00243 }
00244 
00245 void XMLRPCManager::serverThreadFunc()
00246 {
00247   disableAllSignalsInThisThread();
00248 
00249   while(!shutting_down_)
00250   {
00251     {
00252       boost::mutex::scoped_lock lock(added_connections_mutex_);
00253       S_ASyncXMLRPCConnection::iterator it = added_connections_.begin();
00254       S_ASyncXMLRPCConnection::iterator end = added_connections_.end();
00255       for (; it != end; ++it)
00256       {
00257         (*it)->addToDispatch(server_.get_dispatch());
00258         connections_.insert(*it);
00259       }
00260 
00261       added_connections_.clear();
00262     }
00263 
00264     // Update the XMLRPC server, blocking for at most 100ms in select()
00265     {
00266       boost::mutex::scoped_lock lock(functions_mutex_);
00267       server_.work(0.1);
00268     }
00269 
00270     while (unbind_requested_)
00271     {
00272       WallDuration(0.01).sleep();
00273     }
00274 
00275     if (shutting_down_)
00276     {
00277       return;
00278     }
00279 
00280     {
00281       S_ASyncXMLRPCConnection::iterator it = connections_.begin();
00282       S_ASyncXMLRPCConnection::iterator end = connections_.end();
00283       for (; it != end; ++it)
00284       {
00285         if ((*it)->check())
00286         {
00287           removeASyncConnection(*it);
00288         }
00289       }
00290     }
00291 
00292     {
00293       boost::mutex::scoped_lock lock(removed_connections_mutex_);
00294       S_ASyncXMLRPCConnection::iterator it = removed_connections_.begin();
00295       S_ASyncXMLRPCConnection::iterator end = removed_connections_.end();
00296       for (; it != end; ++it)
00297       {
00298         (*it)->removeFromDispatch(server_.get_dispatch());
00299         connections_.erase(*it);
00300       }
00301 
00302       removed_connections_.clear();
00303     }
00304   }
00305 }
00306 
00307 XmlRpcClient* XMLRPCManager::getXMLRPCClient(const std::string &host, const int port, const std::string &uri)
00308 {
00309   // go through our vector of clients and grab the first available one
00310   XmlRpcClient *c = NULL;
00311 
00312   boost::mutex::scoped_lock lock(clients_mutex_);
00313 
00314   for (V_CachedXmlRpcClient::iterator i = clients_.begin();
00315        !c && i != clients_.end(); )
00316   {
00317     if (!i->in_use_)
00318     {
00319       // see where it's pointing
00320       if (i->client_->getHost() == host &&
00321           i->client_->getPort() == port &&
00322           i->client_->getUri()  == uri)
00323       {
00324         // hooray, it's pointing at our destination. re-use it.
00325         c = i->client_;
00326         i->in_use_ = true;
00327         i->last_use_time_ = WallTime::now();
00328         break;
00329       }
00330       else if (i->last_use_time_ + CachedXmlRpcClient::s_zombie_time_ < WallTime::now())
00331       {
00332         // toast this guy. he's dead and nobody is reusing him.
00333         delete i->client_;
00334         i = clients_.erase(i);
00335       }
00336       else
00337       {
00338         ++i; // move along. this guy isn't dead yet.
00339       }
00340     }
00341     else
00342     {
00343       ++i;
00344     }
00345   }
00346 
00347   if (!c)
00348   {
00349     // allocate a new one
00350     c = new XmlRpcClient(host.c_str(), port, uri.c_str());
00351     CachedXmlRpcClient mc(c);
00352     mc.in_use_ = true;
00353     mc.last_use_time_ = WallTime::now();
00354     clients_.push_back(mc);
00355     //ROS_INFO("%d xmlrpc clients allocated\n", xmlrpc_clients.size());
00356   }
00357   // ONUS IS ON THE RECEIVER TO UNSET THE IN_USE FLAG
00358   // by calling releaseXMLRPCClient
00359   return c;
00360 }
00361 
00362 void XMLRPCManager::releaseXMLRPCClient(XmlRpcClient *c)
00363 {
00364   boost::mutex::scoped_lock lock(clients_mutex_);
00365 
00366   for (V_CachedXmlRpcClient::iterator i = clients_.begin();
00367        i != clients_.end(); ++i)
00368   {
00369     if (c == i->client_)
00370     {
00371       i->in_use_ = false;
00372       break;
00373     }
00374   }
00375 }
00376 
00377 void XMLRPCManager::addASyncConnection(const ASyncXMLRPCConnectionPtr& conn)
00378 {
00379   boost::mutex::scoped_lock lock(added_connections_mutex_);
00380   added_connections_.insert(conn);
00381 }
00382 
00383 void XMLRPCManager::removeASyncConnection(const ASyncXMLRPCConnectionPtr& conn)
00384 {
00385   boost::mutex::scoped_lock lock(removed_connections_mutex_);
00386   removed_connections_.insert(conn);
00387 }
00388 
00389 bool XMLRPCManager::bind(const std::string& function_name, const XMLRPCFunc& cb)
00390 {
00391   boost::mutex::scoped_lock lock(functions_mutex_);
00392   if (functions_.find(function_name) != functions_.end())
00393   {
00394     return false;
00395   }
00396 
00397   FunctionInfo info;
00398   info.name = function_name;
00399   info.function = cb;
00400   info.wrapper.reset(new XMLRPCCallWrapper(function_name, cb, &server_));
00401   functions_[function_name] = info;
00402 
00403   return true;
00404 }
00405 
00406 void XMLRPCManager::unbind(const std::string& function_name)
00407 {
00408   unbind_requested_ = true;
00409   boost::mutex::scoped_lock lock(functions_mutex_);
00410   functions_.erase(function_name);
00411   unbind_requested_ = false;
00412 }
00413 
00414 } // namespace ros


roscpp
Author(s): Morgan Quigley, Josh Faust, Brian Gerkey, Troy Straszheim
autogenerated on Fri Aug 28 2015 12:33:11