$search
00001 /* 00002 * Copyright (C) 2009, Willow Garage, Inc. 00003 * 00004 * Redistribution and use in source and binary forms, with or without 00005 * modification, are permitted provided that the following conditions are met: 00006 * * Redistributions of source code must retain the above copyright notice, 00007 * this list of conditions and the following disclaimer. 00008 * * Redistributions in binary form must reproduce the above copyright 00009 * notice, this list of conditions and the following disclaimer in the 00010 * documentation and/or other materials provided with the distribution. 00011 * * Neither the names of Willow Garage, Inc. nor the names of its 00012 * contributors may be used to endorse or promote products derived from 00013 * this software without specific prior written permission. 00014 * 00015 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" 00016 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 00017 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 00018 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE 00019 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 00020 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 00021 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 00022 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 00023 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) 00024 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 00025 * POSSIBILITY OF SUCH DAMAGE. 00026 */ 00027 00028 #include "ros/xmlrpc_manager.h" 00029 #include "ros/network.h" 00030 #include "ros/param.h" 00031 #include "ros/assert.h" 00032 #include "ros/common.h" 00033 #include "ros/file_log.h" 00034 #include "ros/io.h" 00035 00036 using namespace XmlRpc; 00037 00038 namespace ros 00039 { 00040 00041 namespace xmlrpc 00042 { 00043 XmlRpc::XmlRpcValue responseStr(int code, const std::string& msg, const std::string& response) 00044 { 00045 XmlRpc::XmlRpcValue v; 00046 v[0] = code; 00047 v[1] = msg; 00048 v[2] = response; 00049 return v; 00050 } 00051 00052 XmlRpc::XmlRpcValue responseInt(int code, const std::string& msg, int response) 00053 { 00054 XmlRpc::XmlRpcValue v; 00055 v[0] = int(code); 00056 v[1] = msg; 00057 v[2] = response; 00058 return v; 00059 } 00060 00061 XmlRpc::XmlRpcValue responseBool(int code, const std::string& msg, bool response) 00062 { 00063 XmlRpc::XmlRpcValue v; 00064 v[0] = int(code); 00065 v[1] = msg; 00066 v[2] = XmlRpc::XmlRpcValue(response); 00067 return v; 00068 } 00069 } 00070 00071 class XMLRPCCallWrapper : public XmlRpcServerMethod 00072 { 00073 public: 00074 XMLRPCCallWrapper(const std::string& function_name, const XMLRPCFunc& cb, XmlRpcServer *s) 00075 : XmlRpcServerMethod(function_name, s) 00076 , name_(function_name) 00077 , func_(cb) 00078 { } 00079 00080 void execute(XmlRpcValue ¶ms, XmlRpcValue &result) 00081 { 00082 func_(params, result); 00083 } 00084 00085 private: 00086 std::string name_; 00087 XMLRPCFunc func_; 00088 }; 00089 00090 void getPid(const XmlRpcValue& params, XmlRpcValue& result) 00091 { 00092 result = xmlrpc::responseInt(1, "", (int)getpid()); 00093 } 00094 00095 const ros::WallDuration CachedXmlRpcClient::s_zombie_time_(30.0); // reap after 30 seconds 00096 00097 XMLRPCManagerPtr g_xmlrpc_manager; 00098 boost::mutex g_xmlrpc_manager_mutex; 00099 const XMLRPCManagerPtr& XMLRPCManager::instance() 00100 { 00101 if (!g_xmlrpc_manager) 00102 { 00103 boost::mutex::scoped_lock lock(g_xmlrpc_manager_mutex); 00104 if (!g_xmlrpc_manager) 00105 { 00106 g_xmlrpc_manager.reset(new XMLRPCManager); 00107 } 00108 } 00109 00110 return g_xmlrpc_manager; 00111 } 00112 00113 XMLRPCManager::XMLRPCManager() 00114 : port_(0) 00115 , shutting_down_(false) 00116 , unbind_requested_(false) 00117 { 00118 } 00119 00120 XMLRPCManager::~XMLRPCManager() 00121 { 00122 shutdown(); 00123 } 00124 00125 void XMLRPCManager::start() 00126 { 00127 shutting_down_ = false; 00128 port_ = 0; 00129 bind("getPid", getPid); 00130 00131 bool bound = server_.bindAndListen(0); 00132 (void) bound; 00133 ROS_ASSERT(bound); 00134 port_ = server_.get_port(); 00135 ROS_ASSERT(port_ != 0); 00136 00137 std::stringstream ss; 00138 ss << "http://" << network::getHost() << ":" << port_ << "/"; 00139 uri_ = ss.str(); 00140 00141 server_thread_ = boost::thread(boost::bind(&XMLRPCManager::serverThreadFunc, this)); 00142 } 00143 00144 void XMLRPCManager::shutdown() 00145 { 00146 if (shutting_down_) 00147 { 00148 return; 00149 } 00150 00151 shutting_down_ = true; 00152 server_thread_.join(); 00153 00154 server_.close(); 00155 00156 // kill the last few clients that were started in the shutdown process 00157 for (V_CachedXmlRpcClient::iterator i = clients_.begin(); 00158 i != clients_.end(); ++i) 00159 { 00160 for (int wait_count = 0; i->in_use_ && wait_count < 10; wait_count++) 00161 { 00162 ROSCPP_LOG_DEBUG("waiting for xmlrpc connection to finish..."); 00163 ros::WallDuration(0.01).sleep(); 00164 } 00165 00166 i->client_->close(); 00167 delete i->client_; 00168 } 00169 00170 clients_.clear(); 00171 00172 boost::mutex::scoped_lock lock(functions_mutex_); 00173 functions_.clear(); 00174 00175 { 00176 S_ASyncXMLRPCConnection::iterator it = connections_.begin(); 00177 S_ASyncXMLRPCConnection::iterator end = connections_.end(); 00178 for (; it != end; ++it) 00179 { 00180 (*it)->removeFromDispatch(server_.get_dispatch()); 00181 } 00182 } 00183 00184 connections_.clear(); 00185 00186 { 00187 boost::mutex::scoped_lock lock(added_connections_mutex_); 00188 added_connections_.clear(); 00189 } 00190 00191 { 00192 boost::mutex::scoped_lock lock(removed_connections_mutex_); 00193 removed_connections_.clear(); 00194 } 00195 } 00196 00197 bool XMLRPCManager::validateXmlrpcResponse(const std::string& method, XmlRpcValue &response, 00198 XmlRpcValue &payload) 00199 { 00200 if (response.getType() != XmlRpcValue::TypeArray) 00201 { 00202 ROSCPP_LOG_DEBUG("XML-RPC call [%s] didn't return an array", 00203 method.c_str()); 00204 return false; 00205 } 00206 if (response.size() != 3) 00207 { 00208 ROSCPP_LOG_DEBUG("XML-RPC call [%s] didn't return a 3-element array", 00209 method.c_str()); 00210 return false; 00211 } 00212 if (response[0].getType() != XmlRpcValue::TypeInt) 00213 { 00214 ROSCPP_LOG_DEBUG("XML-RPC call [%s] didn't return a int as the 1st element", 00215 method.c_str()); 00216 return false; 00217 } 00218 int status_code = response[0]; 00219 if (response[1].getType() != XmlRpcValue::TypeString) 00220 { 00221 ROSCPP_LOG_DEBUG("XML-RPC call [%s] didn't return a string as the 2nd element", 00222 method.c_str()); 00223 return false; 00224 } 00225 std::string status_string = response[1]; 00226 if (status_code != 1) 00227 { 00228 ROSCPP_LOG_DEBUG("XML-RPC call [%s] returned an error (%d): [%s]", 00229 method.c_str(), status_code, status_string.c_str()); 00230 return false; 00231 } 00232 payload = response[2]; 00233 return true; 00234 } 00235 00236 void XMLRPCManager::serverThreadFunc() 00237 { 00238 disableAllSignalsInThisThread(); 00239 00240 while(!shutting_down_) 00241 { 00242 { 00243 boost::mutex::scoped_lock lock(added_connections_mutex_); 00244 S_ASyncXMLRPCConnection::iterator it = added_connections_.begin(); 00245 S_ASyncXMLRPCConnection::iterator end = added_connections_.end(); 00246 for (; it != end; ++it) 00247 { 00248 (*it)->addToDispatch(server_.get_dispatch()); 00249 connections_.insert(*it); 00250 } 00251 00252 added_connections_.clear(); 00253 } 00254 00255 // Update the XMLRPC server, blocking for at most 100ms in select() 00256 { 00257 boost::mutex::scoped_lock lock(functions_mutex_); 00258 server_.work(0.1); 00259 } 00260 00261 while (unbind_requested_) 00262 { 00263 WallDuration(0.01).sleep(); 00264 } 00265 00266 if (shutting_down_) 00267 { 00268 return; 00269 } 00270 00271 { 00272 S_ASyncXMLRPCConnection::iterator it = connections_.begin(); 00273 S_ASyncXMLRPCConnection::iterator end = connections_.end(); 00274 for (; it != end; ++it) 00275 { 00276 if ((*it)->check()) 00277 { 00278 removeASyncConnection(*it); 00279 } 00280 } 00281 } 00282 00283 { 00284 boost::mutex::scoped_lock lock(removed_connections_mutex_); 00285 S_ASyncXMLRPCConnection::iterator it = removed_connections_.begin(); 00286 S_ASyncXMLRPCConnection::iterator end = removed_connections_.end(); 00287 for (; it != end; ++it) 00288 { 00289 (*it)->removeFromDispatch(server_.get_dispatch()); 00290 connections_.erase(*it); 00291 } 00292 00293 removed_connections_.clear(); 00294 } 00295 } 00296 } 00297 00298 XmlRpcClient* XMLRPCManager::getXMLRPCClient(const std::string &host, const int port, const std::string &uri) 00299 { 00300 // go through our vector of clients and grab the first available one 00301 XmlRpcClient *c = NULL; 00302 00303 boost::mutex::scoped_lock lock(clients_mutex_); 00304 00305 for (V_CachedXmlRpcClient::iterator i = clients_.begin(); 00306 !c && i != clients_.end(); ) 00307 { 00308 if (!i->in_use_) 00309 { 00310 // see where it's pointing 00311 if (i->client_->getHost() == host && 00312 i->client_->getPort() == port && 00313 i->client_->getUri() == uri) 00314 { 00315 // hooray, it's pointing at our destination. re-use it. 00316 c = i->client_; 00317 i->in_use_ = true; 00318 i->last_use_time_ = WallTime::now(); 00319 break; 00320 } 00321 else if (i->last_use_time_ + CachedXmlRpcClient::s_zombie_time_ < WallTime::now()) 00322 { 00323 // toast this guy. he's dead and nobody is reusing him. 00324 delete i->client_; 00325 i = clients_.erase(i); 00326 } 00327 else 00328 { 00329 ++i; // move along. this guy isn't dead yet. 00330 } 00331 } 00332 else 00333 { 00334 ++i; 00335 } 00336 } 00337 00338 if (!c) 00339 { 00340 // allocate a new one 00341 c = new XmlRpcClient(host.c_str(), port, uri.c_str()); 00342 CachedXmlRpcClient mc(c); 00343 mc.in_use_ = true; 00344 mc.last_use_time_ = WallTime::now(); 00345 clients_.push_back(mc); 00346 //ROS_INFO("%d xmlrpc clients allocated\n", xmlrpc_clients.size()); 00347 } 00348 // ONUS IS ON THE RECEIVER TO UNSET THE IN_USE FLAG 00349 // by calling releaseXMLRPCClient 00350 return c; 00351 } 00352 00353 void XMLRPCManager::releaseXMLRPCClient(XmlRpcClient *c) 00354 { 00355 boost::mutex::scoped_lock lock(clients_mutex_); 00356 00357 for (V_CachedXmlRpcClient::iterator i = clients_.begin(); 00358 i != clients_.end(); ++i) 00359 { 00360 if (c == i->client_) 00361 { 00362 i->in_use_ = false; 00363 break; 00364 } 00365 } 00366 } 00367 00368 void XMLRPCManager::addASyncConnection(const ASyncXMLRPCConnectionPtr& conn) 00369 { 00370 boost::mutex::scoped_lock lock(added_connections_mutex_); 00371 added_connections_.insert(conn); 00372 } 00373 00374 void XMLRPCManager::removeASyncConnection(const ASyncXMLRPCConnectionPtr& conn) 00375 { 00376 boost::mutex::scoped_lock lock(removed_connections_mutex_); 00377 removed_connections_.insert(conn); 00378 } 00379 00380 bool XMLRPCManager::bind(const std::string& function_name, const XMLRPCFunc& cb) 00381 { 00382 boost::mutex::scoped_lock lock(functions_mutex_); 00383 if (functions_.find(function_name) != functions_.end()) 00384 { 00385 return false; 00386 } 00387 00388 FunctionInfo info; 00389 info.name = function_name; 00390 info.function = cb; 00391 info.wrapper.reset(new XMLRPCCallWrapper(function_name, cb, &server_)); 00392 functions_[function_name] = info; 00393 00394 return true; 00395 } 00396 00397 void XMLRPCManager::unbind(const std::string& function_name) 00398 { 00399 unbind_requested_ = true; 00400 boost::mutex::scoped_lock lock(functions_mutex_); 00401 functions_.erase(function_name); 00402 unbind_requested_ = false; 00403 } 00404 00405 } // namespace ros