00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028 #include "ros/xmlrpc_manager.h"
00029 #include "ros/network.h"
00030 #include "ros/param.h"
00031 #include "ros/assert.h"
00032 #include "ros/common.h"
00033 #include "ros/file_log.h"
00034 #include "ros/io.h"
00035
00036 using namespace XmlRpc;
00037
00038 namespace ros
00039 {
00040
00041 namespace xmlrpc
00042 {
00043 XmlRpc::XmlRpcValue responseStr(int code, const std::string& msg, const std::string& response)
00044 {
00045 XmlRpc::XmlRpcValue v;
00046 v[0] = code;
00047 v[1] = msg;
00048 v[2] = response;
00049 return v;
00050 }
00051
00052 XmlRpc::XmlRpcValue responseInt(int code, const std::string& msg, int response)
00053 {
00054 XmlRpc::XmlRpcValue v;
00055 v[0] = int(code);
00056 v[1] = msg;
00057 v[2] = response;
00058 return v;
00059 }
00060
00061 XmlRpc::XmlRpcValue responseBool(int code, const std::string& msg, bool response)
00062 {
00063 XmlRpc::XmlRpcValue v;
00064 v[0] = int(code);
00065 v[1] = msg;
00066 v[2] = XmlRpc::XmlRpcValue(response);
00067 return v;
00068 }
00069 }
00070
00071 class XMLRPCCallWrapper : public XmlRpcServerMethod
00072 {
00073 public:
00074 XMLRPCCallWrapper(const std::string& function_name, const XMLRPCFunc& cb, XmlRpcServer *s)
00075 : XmlRpcServerMethod(function_name, s)
00076 , name_(function_name)
00077 , func_(cb)
00078 { }
00079
00080 void execute(XmlRpcValue ¶ms, XmlRpcValue &result)
00081 {
00082 func_(params, result);
00083 }
00084
00085 private:
00086 std::string name_;
00087 XMLRPCFunc func_;
00088 };
00089
00090 void getPid(const XmlRpcValue& params, XmlRpcValue& result)
00091 {
00092 result = xmlrpc::responseInt(1, "", (int)getpid());
00093 }
00094
00095 const ros::WallDuration CachedXmlRpcClient::s_zombie_time_(30.0);
00096
00097 XMLRPCManagerPtr g_xmlrpc_manager;
00098 boost::mutex g_xmlrpc_manager_mutex;
00099 const XMLRPCManagerPtr& XMLRPCManager::instance()
00100 {
00101 if (!g_xmlrpc_manager)
00102 {
00103 boost::mutex::scoped_lock lock(g_xmlrpc_manager_mutex);
00104 if (!g_xmlrpc_manager)
00105 {
00106 g_xmlrpc_manager.reset(new XMLRPCManager);
00107 }
00108 }
00109
00110 return g_xmlrpc_manager;
00111 }
00112
00113 XMLRPCManager::XMLRPCManager()
00114 : port_(0)
00115 , shutting_down_(false)
00116 , unbind_requested_(false)
00117 {
00118 }
00119
00120 XMLRPCManager::~XMLRPCManager()
00121 {
00122 shutdown();
00123 }
00124
00125 void XMLRPCManager::start()
00126 {
00127 shutting_down_ = false;
00128 port_ = 0;
00129 bind("getPid", getPid);
00130
00131 bool bound = server_.bindAndListen(0);
00132 (void) bound;
00133 ROS_ASSERT(bound);
00134 port_ = server_.get_port();
00135 ROS_ASSERT(port_ != 0);
00136
00137 std::stringstream ss;
00138 ss << "http://" << network::getHost() << ":" << port_ << "/";
00139 uri_ = ss.str();
00140
00141 server_thread_ = boost::thread(boost::bind(&XMLRPCManager::serverThreadFunc, this));
00142 }
00143
00144 void XMLRPCManager::shutdown()
00145 {
00146 if (shutting_down_)
00147 {
00148 return;
00149 }
00150
00151 shutting_down_ = true;
00152 server_thread_.join();
00153
00154 server_.close();
00155
00156
00157 for (V_CachedXmlRpcClient::iterator i = clients_.begin();
00158 i != clients_.end(); ++i)
00159 {
00160 for (int wait_count = 0; i->in_use_ && wait_count < 10; wait_count++)
00161 {
00162 ROSCPP_LOG_DEBUG("waiting for xmlrpc connection to finish...");
00163 ros::WallDuration(0.01).sleep();
00164 }
00165
00166 i->client_->close();
00167 delete i->client_;
00168 }
00169
00170 clients_.clear();
00171
00172 boost::mutex::scoped_lock lock(functions_mutex_);
00173 functions_.clear();
00174
00175 {
00176 S_ASyncXMLRPCConnection::iterator it = connections_.begin();
00177 S_ASyncXMLRPCConnection::iterator end = connections_.end();
00178 for (; it != end; ++it)
00179 {
00180 (*it)->removeFromDispatch(server_.get_dispatch());
00181 }
00182 }
00183
00184 connections_.clear();
00185
00186 {
00187 boost::mutex::scoped_lock lock(added_connections_mutex_);
00188 added_connections_.clear();
00189 }
00190
00191 {
00192 boost::mutex::scoped_lock lock(removed_connections_mutex_);
00193 removed_connections_.clear();
00194 }
00195 }
00196
00197 bool XMLRPCManager::validateXmlrpcResponse(const std::string& method, XmlRpcValue &response,
00198 XmlRpcValue &payload)
00199 {
00200 if (response.getType() != XmlRpcValue::TypeArray)
00201 {
00202 ROSCPP_LOG_DEBUG("XML-RPC call [%s] didn't return an array",
00203 method.c_str());
00204 return false;
00205 }
00206 if (response.size() != 3)
00207 {
00208 ROSCPP_LOG_DEBUG("XML-RPC call [%s] didn't return a 3-element array",
00209 method.c_str());
00210 return false;
00211 }
00212 if (response[0].getType() != XmlRpcValue::TypeInt)
00213 {
00214 ROSCPP_LOG_DEBUG("XML-RPC call [%s] didn't return a int as the 1st element",
00215 method.c_str());
00216 return false;
00217 }
00218 int status_code = response[0];
00219 if (response[1].getType() != XmlRpcValue::TypeString)
00220 {
00221 ROSCPP_LOG_DEBUG("XML-RPC call [%s] didn't return a string as the 2nd element",
00222 method.c_str());
00223 return false;
00224 }
00225 std::string status_string = response[1];
00226 if (status_code != 1)
00227 {
00228 ROSCPP_LOG_DEBUG("XML-RPC call [%s] returned an error (%d): [%s]",
00229 method.c_str(), status_code, status_string.c_str());
00230 return false;
00231 }
00232 payload = response[2];
00233 return true;
00234 }
00235
00236 void XMLRPCManager::serverThreadFunc()
00237 {
00238 disableAllSignalsInThisThread();
00239
00240 while(!shutting_down_)
00241 {
00242 {
00243 boost::mutex::scoped_lock lock(added_connections_mutex_);
00244 S_ASyncXMLRPCConnection::iterator it = added_connections_.begin();
00245 S_ASyncXMLRPCConnection::iterator end = added_connections_.end();
00246 for (; it != end; ++it)
00247 {
00248 (*it)->addToDispatch(server_.get_dispatch());
00249 connections_.insert(*it);
00250 }
00251
00252 added_connections_.clear();
00253 }
00254
00255
00256 {
00257 boost::mutex::scoped_lock lock(functions_mutex_);
00258 server_.work(0.1);
00259 }
00260
00261 while (unbind_requested_)
00262 {
00263 WallDuration(0.01).sleep();
00264 }
00265
00266 if (shutting_down_)
00267 {
00268 return;
00269 }
00270
00271 {
00272 S_ASyncXMLRPCConnection::iterator it = connections_.begin();
00273 S_ASyncXMLRPCConnection::iterator end = connections_.end();
00274 for (; it != end; ++it)
00275 {
00276 if ((*it)->check())
00277 {
00278 removeASyncConnection(*it);
00279 }
00280 }
00281 }
00282
00283 {
00284 boost::mutex::scoped_lock lock(removed_connections_mutex_);
00285 S_ASyncXMLRPCConnection::iterator it = removed_connections_.begin();
00286 S_ASyncXMLRPCConnection::iterator end = removed_connections_.end();
00287 for (; it != end; ++it)
00288 {
00289 (*it)->removeFromDispatch(server_.get_dispatch());
00290 connections_.erase(*it);
00291 }
00292
00293 removed_connections_.clear();
00294 }
00295 }
00296 }
00297
00298 XmlRpcClient* XMLRPCManager::getXMLRPCClient(const std::string &host, const int port, const std::string &uri)
00299 {
00300
00301 XmlRpcClient *c = NULL;
00302
00303 boost::mutex::scoped_lock lock(clients_mutex_);
00304
00305 for (V_CachedXmlRpcClient::iterator i = clients_.begin();
00306 !c && i != clients_.end(); )
00307 {
00308 if (!i->in_use_)
00309 {
00310
00311 if (i->client_->getHost() == host &&
00312 i->client_->getPort() == port &&
00313 i->client_->getUri() == uri)
00314 {
00315
00316 c = i->client_;
00317 i->in_use_ = true;
00318 i->last_use_time_ = WallTime::now();
00319 break;
00320 }
00321 else if (i->last_use_time_ + CachedXmlRpcClient::s_zombie_time_ < WallTime::now())
00322 {
00323
00324 delete i->client_;
00325 i = clients_.erase(i);
00326 }
00327 else
00328 {
00329 ++i;
00330 }
00331 }
00332 else
00333 {
00334 ++i;
00335 }
00336 }
00337
00338 if (!c)
00339 {
00340
00341 c = new XmlRpcClient(host.c_str(), port, uri.c_str());
00342 CachedXmlRpcClient mc(c);
00343 mc.in_use_ = true;
00344 mc.last_use_time_ = WallTime::now();
00345 clients_.push_back(mc);
00346
00347 }
00348
00349
00350 return c;
00351 }
00352
00353 void XMLRPCManager::releaseXMLRPCClient(XmlRpcClient *c)
00354 {
00355 boost::mutex::scoped_lock lock(clients_mutex_);
00356
00357 for (V_CachedXmlRpcClient::iterator i = clients_.begin();
00358 i != clients_.end(); ++i)
00359 {
00360 if (c == i->client_)
00361 {
00362 i->in_use_ = false;
00363 break;
00364 }
00365 }
00366 }
00367
00368 void XMLRPCManager::addASyncConnection(const ASyncXMLRPCConnectionPtr& conn)
00369 {
00370 boost::mutex::scoped_lock lock(added_connections_mutex_);
00371 added_connections_.insert(conn);
00372 }
00373
00374 void XMLRPCManager::removeASyncConnection(const ASyncXMLRPCConnectionPtr& conn)
00375 {
00376 boost::mutex::scoped_lock lock(removed_connections_mutex_);
00377 removed_connections_.insert(conn);
00378 }
00379
00380 bool XMLRPCManager::bind(const std::string& function_name, const XMLRPCFunc& cb)
00381 {
00382 boost::mutex::scoped_lock lock(functions_mutex_);
00383 if (functions_.find(function_name) != functions_.end())
00384 {
00385 return false;
00386 }
00387
00388 FunctionInfo info;
00389 info.name = function_name;
00390 info.function = cb;
00391 info.wrapper.reset(new XMLRPCCallWrapper(function_name, cb, &server_));
00392 functions_[function_name] = info;
00393
00394 return true;
00395 }
00396
00397 void XMLRPCManager::unbind(const std::string& function_name)
00398 {
00399 unbind_requested_ = true;
00400 boost::mutex::scoped_lock lock(functions_mutex_);
00401 functions_.erase(function_name);
00402 unbind_requested_ = false;
00403 }
00404
00405 }