xmlrpc_manager.cpp
Go to the documentation of this file.
00001 /*
00002  * Copyright (C) 2009, Willow Garage, Inc.
00003  *
00004  * Redistribution and use in source and binary forms, with or without
00005  * modification, are permitted provided that the following conditions are met:
00006  *   * Redistributions of source code must retain the above copyright notice,
00007  *     this list of conditions and the following disclaimer.
00008  *   * Redistributions in binary form must reproduce the above copyright
00009  *     notice, this list of conditions and the following disclaimer in the
00010  *     documentation and/or other materials provided with the distribution.
00011  *   * Neither the names of Willow Garage, Inc. nor the names of its
00012  *     contributors may be used to endorse or promote products derived from
00013  *     this software without specific prior written permission.
00014  *
00015  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
00016  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
00017  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
00018  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
00019  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
00020  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
00021  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
00022  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
00023  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
00024  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
00025  * POSSIBILITY OF SUCH DAMAGE.
00026  */
00027 
00028 #include "ros/xmlrpc_manager.h"
00029 #include "ros/network.h"
00030 #include "ros/param.h"
00031 #include "ros/assert.h"
00032 #include "ros/common.h"
00033 #include "ros/file_log.h"
00034 #include "ros/io.h"
00035 
00036 using namespace XmlRpc;
00037 
00038 namespace ros
00039 {
00040 
00041 namespace xmlrpc
00042 {
00043 XmlRpc::XmlRpcValue responseStr(int code, const std::string& msg, const std::string& response)
00044 {
00045   XmlRpc::XmlRpcValue v;
00046   v[0] = code;
00047   v[1] = msg;
00048   v[2] = response;
00049   return v;
00050 }
00051 
00052 XmlRpc::XmlRpcValue responseInt(int code, const std::string& msg, int response)
00053 {
00054   XmlRpc::XmlRpcValue v;
00055   v[0] = int(code);
00056   v[1] = msg;
00057   v[2] = response;
00058   return v;
00059 }
00060 
00061 XmlRpc::XmlRpcValue responseBool(int code, const std::string& msg, bool response)
00062 {
00063   XmlRpc::XmlRpcValue v;
00064   v[0] = int(code);
00065   v[1] = msg;
00066   v[2] = XmlRpc::XmlRpcValue(response);
00067   return v;
00068 }
00069 }
00070 
00071 class XMLRPCCallWrapper : public XmlRpcServerMethod
00072 {
00073 public:
00074   XMLRPCCallWrapper(const std::string& function_name, const XMLRPCFunc& cb, XmlRpcServer *s)
00075   : XmlRpcServerMethod(function_name, s)
00076   , name_(function_name)
00077   , func_(cb)
00078   { }
00079 
00080   void execute(XmlRpcValue &params, XmlRpcValue &result)
00081   {
00082     func_(params, result);
00083   }
00084 
00085 private:
00086   std::string name_;
00087   XMLRPCFunc func_;
00088 };
00089 
00090 void getPid(const XmlRpcValue& params, XmlRpcValue& result)
00091 {
00092   result = xmlrpc::responseInt(1, "", (int)getpid());
00093 }
00094 
00095 const ros::WallDuration CachedXmlRpcClient::s_zombie_time_(30.0); // reap after 30 seconds
00096 
00097 XMLRPCManagerPtr g_xmlrpc_manager;
00098 boost::mutex g_xmlrpc_manager_mutex;
00099 const XMLRPCManagerPtr& XMLRPCManager::instance()
00100 {
00101   if (!g_xmlrpc_manager)
00102   {
00103     boost::mutex::scoped_lock lock(g_xmlrpc_manager_mutex);
00104     if (!g_xmlrpc_manager)
00105     {
00106       g_xmlrpc_manager.reset(new XMLRPCManager);
00107     }
00108   }
00109 
00110   return g_xmlrpc_manager;
00111 }
00112 
00113 XMLRPCManager::XMLRPCManager()
00114 : port_(0)
00115 , shutting_down_(false)
00116 , unbind_requested_(false)
00117 {
00118 }
00119 
00120 XMLRPCManager::~XMLRPCManager()
00121 {
00122   shutdown();
00123 }
00124 
00125 void XMLRPCManager::start()
00126 {
00127   shutting_down_ = false;
00128   port_ = 0;
00129   bind("getPid", getPid);
00130 
00131   bool bound = server_.bindAndListen(0);
00132   (void) bound;
00133   ROS_ASSERT(bound);
00134   port_ = server_.get_port();
00135   ROS_ASSERT(port_ != 0);
00136 
00137   std::stringstream ss;
00138   ss << "http://" << network::getHost() << ":" << port_ << "/";
00139   uri_ = ss.str();
00140 
00141   server_thread_ = boost::thread(boost::bind(&XMLRPCManager::serverThreadFunc, this));
00142 }
00143 
00144 void XMLRPCManager::shutdown()
00145 {
00146   if (shutting_down_)
00147   {
00148     return;
00149   }
00150 
00151   shutting_down_ = true;
00152   server_thread_.join();
00153 
00154   server_.close();
00155 
00156   // kill the last few clients that were started in the shutdown process
00157   for (V_CachedXmlRpcClient::iterator i = clients_.begin();
00158        i != clients_.end(); ++i)
00159   {
00160     for (int wait_count = 0; i->in_use_ && wait_count < 10; wait_count++)
00161     {
00162       ROSCPP_LOG_DEBUG("waiting for xmlrpc connection to finish...");
00163       ros::WallDuration(0.01).sleep();
00164     }
00165 
00166     i->client_->close();
00167     delete i->client_;
00168   }
00169 
00170   clients_.clear();
00171 
00172   boost::mutex::scoped_lock lock(functions_mutex_);
00173   functions_.clear();
00174 
00175   {
00176     S_ASyncXMLRPCConnection::iterator it = connections_.begin();
00177     S_ASyncXMLRPCConnection::iterator end = connections_.end();
00178     for (; it != end; ++it)
00179     {
00180       (*it)->removeFromDispatch(server_.get_dispatch());
00181     }
00182   }
00183 
00184   connections_.clear();
00185 
00186   {
00187     boost::mutex::scoped_lock lock(added_connections_mutex_);
00188     added_connections_.clear();
00189   }
00190 
00191   {
00192     boost::mutex::scoped_lock lock(removed_connections_mutex_);
00193     removed_connections_.clear();
00194   }
00195 }
00196 
00197 bool XMLRPCManager::validateXmlrpcResponse(const std::string& method, XmlRpcValue &response,
00198                                     XmlRpcValue &payload)
00199 {
00200   if (response.getType() != XmlRpcValue::TypeArray)
00201   {
00202     ROSCPP_LOG_DEBUG("XML-RPC call [%s] didn't return an array",
00203         method.c_str());
00204     return false;
00205   }
00206   if (response.size() != 3)
00207   {
00208     ROSCPP_LOG_DEBUG("XML-RPC call [%s] didn't return a 3-element array",
00209         method.c_str());
00210     return false;
00211   }
00212   if (response[0].getType() != XmlRpcValue::TypeInt)
00213   {
00214     ROSCPP_LOG_DEBUG("XML-RPC call [%s] didn't return a int as the 1st element",
00215         method.c_str());
00216     return false;
00217   }
00218   int status_code = response[0];
00219   if (response[1].getType() != XmlRpcValue::TypeString)
00220   {
00221     ROSCPP_LOG_DEBUG("XML-RPC call [%s] didn't return a string as the 2nd element",
00222         method.c_str());
00223     return false;
00224   }
00225   std::string status_string = response[1];
00226   if (status_code != 1)
00227   {
00228     ROSCPP_LOG_DEBUG("XML-RPC call [%s] returned an error (%d): [%s]",
00229         method.c_str(), status_code, status_string.c_str());
00230     return false;
00231   }
00232   payload = response[2];
00233   return true;
00234 }
00235 
00236 void XMLRPCManager::serverThreadFunc()
00237 {
00238   disableAllSignalsInThisThread();
00239 
00240   while(!shutting_down_)
00241   {
00242     {
00243       boost::mutex::scoped_lock lock(added_connections_mutex_);
00244       S_ASyncXMLRPCConnection::iterator it = added_connections_.begin();
00245       S_ASyncXMLRPCConnection::iterator end = added_connections_.end();
00246       for (; it != end; ++it)
00247       {
00248         (*it)->addToDispatch(server_.get_dispatch());
00249         connections_.insert(*it);
00250       }
00251 
00252       added_connections_.clear();
00253     }
00254 
00255     // Update the XMLRPC server, blocking for at most 100ms in select()
00256     {
00257       boost::mutex::scoped_lock lock(functions_mutex_);
00258       server_.work(0.1);
00259     }
00260 
00261     while (unbind_requested_)
00262     {
00263       WallDuration(0.01).sleep();
00264     }
00265 
00266     if (shutting_down_)
00267     {
00268       return;
00269     }
00270 
00271     {
00272       S_ASyncXMLRPCConnection::iterator it = connections_.begin();
00273       S_ASyncXMLRPCConnection::iterator end = connections_.end();
00274       for (; it != end; ++it)
00275       {
00276         if ((*it)->check())
00277         {
00278           removeASyncConnection(*it);
00279         }
00280       }
00281     }
00282 
00283     {
00284       boost::mutex::scoped_lock lock(removed_connections_mutex_);
00285       S_ASyncXMLRPCConnection::iterator it = removed_connections_.begin();
00286       S_ASyncXMLRPCConnection::iterator end = removed_connections_.end();
00287       for (; it != end; ++it)
00288       {
00289         (*it)->removeFromDispatch(server_.get_dispatch());
00290         connections_.erase(*it);
00291       }
00292 
00293       removed_connections_.clear();
00294     }
00295   }
00296 }
00297 
00298 XmlRpcClient* XMLRPCManager::getXMLRPCClient(const std::string &host, const int port, const std::string &uri)
00299 {
00300   // go through our vector of clients and grab the first available one
00301   XmlRpcClient *c = NULL;
00302 
00303   boost::mutex::scoped_lock lock(clients_mutex_);
00304 
00305   for (V_CachedXmlRpcClient::iterator i = clients_.begin();
00306        !c && i != clients_.end(); )
00307   {
00308     if (!i->in_use_)
00309     {
00310       // see where it's pointing
00311       if (i->client_->getHost() == host &&
00312           i->client_->getPort() == port &&
00313           i->client_->getUri()  == uri)
00314       {
00315         // hooray, it's pointing at our destination. re-use it.
00316         c = i->client_;
00317         i->in_use_ = true;
00318         i->last_use_time_ = WallTime::now();
00319         break;
00320       }
00321       else if (i->last_use_time_ + CachedXmlRpcClient::s_zombie_time_ < WallTime::now())
00322       {
00323         // toast this guy. he's dead and nobody is reusing him.
00324         delete i->client_;
00325         i = clients_.erase(i);
00326       }
00327       else
00328       {
00329         ++i; // move along. this guy isn't dead yet.
00330       }
00331     }
00332     else
00333     {
00334       ++i;
00335     }
00336   }
00337 
00338   if (!c)
00339   {
00340     // allocate a new one
00341     c = new XmlRpcClient(host.c_str(), port, uri.c_str());
00342     CachedXmlRpcClient mc(c);
00343     mc.in_use_ = true;
00344     mc.last_use_time_ = WallTime::now();
00345     clients_.push_back(mc);
00346     //ROS_INFO("%d xmlrpc clients allocated\n", xmlrpc_clients.size());
00347   }
00348   // ONUS IS ON THE RECEIVER TO UNSET THE IN_USE FLAG
00349   // by calling releaseXMLRPCClient
00350   return c;
00351 }
00352 
00353 void XMLRPCManager::releaseXMLRPCClient(XmlRpcClient *c)
00354 {
00355   boost::mutex::scoped_lock lock(clients_mutex_);
00356 
00357   for (V_CachedXmlRpcClient::iterator i = clients_.begin();
00358        i != clients_.end(); ++i)
00359   {
00360     if (c == i->client_)
00361     {
00362       i->in_use_ = false;
00363       break;
00364     }
00365   }
00366 }
00367 
00368 void XMLRPCManager::addASyncConnection(const ASyncXMLRPCConnectionPtr& conn)
00369 {
00370   boost::mutex::scoped_lock lock(added_connections_mutex_);
00371   added_connections_.insert(conn);
00372 }
00373 
00374 void XMLRPCManager::removeASyncConnection(const ASyncXMLRPCConnectionPtr& conn)
00375 {
00376   boost::mutex::scoped_lock lock(removed_connections_mutex_);
00377   removed_connections_.insert(conn);
00378 }
00379 
00380 bool XMLRPCManager::bind(const std::string& function_name, const XMLRPCFunc& cb)
00381 {
00382   boost::mutex::scoped_lock lock(functions_mutex_);
00383   if (functions_.find(function_name) != functions_.end())
00384   {
00385     return false;
00386   }
00387 
00388   FunctionInfo info;
00389   info.name = function_name;
00390   info.function = cb;
00391   info.wrapper.reset(new XMLRPCCallWrapper(function_name, cb, &server_));
00392   functions_[function_name] = info;
00393 
00394   return true;
00395 }
00396 
00397 void XMLRPCManager::unbind(const std::string& function_name)
00398 {
00399   unbind_requested_ = true;
00400   boost::mutex::scoped_lock lock(functions_mutex_);
00401   functions_.erase(function_name);
00402   unbind_requested_ = false;
00403 }
00404 
00405 } // namespace ros


roscpp
Author(s): Morgan Quigley, Josh Faust, Brian Gerkey, Troy Straszheim
autogenerated on Mon Oct 6 2014 11:46:44