00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028 #include "ros/xmlrpc_manager.h"
00029 #include "ros/network.h"
00030 #include "ros/param.h"
00031 #include "ros/assert.h"
00032 #include "ros/common.h"
00033 #include "ros/file_log.h"
00034
00035 using namespace XmlRpc;
00036
00037 namespace ros
00038 {
00039
00040 namespace xmlrpc
00041 {
00042 XmlRpc::XmlRpcValue responseStr(int code, const std::string& msg, const std::string& response)
00043 {
00044 XmlRpc::XmlRpcValue v;
00045 v[0] = code;
00046 v[1] = msg;
00047 v[2] = response;
00048 return v;
00049 }
00050
00051 XmlRpc::XmlRpcValue responseInt(int code, const std::string& msg, int response)
00052 {
00053 XmlRpc::XmlRpcValue v;
00054 v[0] = int(code);
00055 v[1] = msg;
00056 v[2] = response;
00057 return v;
00058 }
00059
00060 XmlRpc::XmlRpcValue responseBool(int code, const std::string& msg, bool response)
00061 {
00062 XmlRpc::XmlRpcValue v;
00063 v[0] = int(code);
00064 v[1] = msg;
00065 v[2] = XmlRpc::XmlRpcValue(response);
00066 return v;
00067 }
00068 }
00069
00070 class XMLRPCCallWrapper : public XmlRpcServerMethod
00071 {
00072 public:
00073 XMLRPCCallWrapper(const std::string& function_name, const XMLRPCFunc& cb, XmlRpcServer *s)
00074 : XmlRpcServerMethod(function_name, s)
00075 , name_(function_name)
00076 , func_(cb)
00077 { }
00078
00079 void execute(XmlRpcValue ¶ms, XmlRpcValue &result)
00080 {
00081 func_(params, result);
00082 }
00083
00084 private:
00085 std::string name_;
00086 XMLRPCFunc func_;
00087 };
00088
00089 void getPid(const XmlRpcValue& params, XmlRpcValue& result)
00090 {
00091 result = xmlrpc::responseInt(1, "", (int)getpid());
00092 }
00093
00094 const ros::WallDuration CachedXmlRpcClient::s_zombie_time_(30.0);
00095
00096 XMLRPCManagerPtr g_xmlrpc_manager;
00097 boost::mutex g_xmlrpc_manager_mutex;
00098 const XMLRPCManagerPtr& XMLRPCManager::instance()
00099 {
00100 if (!g_xmlrpc_manager)
00101 {
00102 boost::mutex::scoped_lock lock(g_xmlrpc_manager_mutex);
00103 if (!g_xmlrpc_manager)
00104 {
00105 g_xmlrpc_manager.reset(new XMLRPCManager);
00106 }
00107 }
00108
00109 return g_xmlrpc_manager;
00110 }
00111
00112 XMLRPCManager::XMLRPCManager()
00113 : port_(0)
00114 , shutting_down_(false)
00115 , unbind_requested_(false)
00116 {
00117 }
00118
00119 XMLRPCManager::~XMLRPCManager()
00120 {
00121 shutdown();
00122 }
00123
00124 void XMLRPCManager::start()
00125 {
00126 shutting_down_ = false;
00127 port_ = 0;
00128 bind("getPid", getPid);
00129
00130 bool bound = server_.bindAndListen(0);
00131 ROS_ASSERT(bound);
00132 port_ = server_.get_port();
00133 ROS_ASSERT(port_ != 0);
00134
00135 std::stringstream ss;
00136 ss << "http://" << network::getHost() << ":" << port_ << "/";
00137 uri_ = ss.str();
00138
00139 server_thread_ = boost::thread(boost::bind(&XMLRPCManager::serverThreadFunc, this));
00140 }
00141
00142 void XMLRPCManager::shutdown()
00143 {
00144 if (shutting_down_)
00145 {
00146 return;
00147 }
00148
00149 shutting_down_ = true;
00150 server_thread_.join();
00151
00152 server_.close();
00153
00154
00155 for (V_CachedXmlRpcClient::iterator i = clients_.begin();
00156 i != clients_.end(); ++i)
00157 {
00158 for (int wait_count = 0; i->in_use_ && wait_count < 10; wait_count++)
00159 {
00160 ROSCPP_LOG_DEBUG("waiting for xmlrpc connection to finish...");
00161 ros::WallDuration(0.01).sleep();
00162 }
00163
00164 i->client_->close();
00165 delete i->client_;
00166 }
00167
00168 clients_.clear();
00169
00170 boost::mutex::scoped_lock lock(functions_mutex_);
00171 functions_.clear();
00172
00173 {
00174 S_ASyncXMLRPCConnection::iterator it = connections_.begin();
00175 S_ASyncXMLRPCConnection::iterator end = connections_.end();
00176 for (; it != end; ++it)
00177 {
00178 (*it)->removeFromDispatch(server_.get_dispatch());
00179 }
00180 }
00181
00182 connections_.clear();
00183
00184 {
00185 boost::mutex::scoped_lock lock(added_connections_mutex_);
00186 added_connections_.clear();
00187 }
00188
00189 {
00190 boost::mutex::scoped_lock lock(removed_connections_mutex_);
00191 removed_connections_.clear();
00192 }
00193 }
00194
00195 bool XMLRPCManager::validateXmlrpcResponse(const std::string& method, XmlRpcValue &response,
00196 XmlRpcValue &payload)
00197 {
00198 if (response.getType() != XmlRpcValue::TypeArray)
00199 {
00200 ROSCPP_LOG_DEBUG("XML-RPC call [%s] didn't return an array",
00201 method.c_str());
00202 return false;
00203 }
00204 if (response.size() != 3)
00205 {
00206 ROSCPP_LOG_DEBUG("XML-RPC call [%s] didn't return a 3-element array",
00207 method.c_str());
00208 return false;
00209 }
00210 if (response[0].getType() != XmlRpcValue::TypeInt)
00211 {
00212 ROSCPP_LOG_DEBUG("XML-RPC call [%s] didn't return a int as the 1st element",
00213 method.c_str());
00214 return false;
00215 }
00216 int status_code = response[0];
00217 if (response[1].getType() != XmlRpcValue::TypeString)
00218 {
00219 ROSCPP_LOG_DEBUG("XML-RPC call [%s] didn't return a string as the 2nd element",
00220 method.c_str());
00221 return false;
00222 }
00223 std::string status_string = response[1];
00224 if (status_code != 1)
00225 {
00226 ROSCPP_LOG_DEBUG("XML-RPC call [%s] returned an error (%d): [%s]",
00227 method.c_str(), status_code, status_string.c_str());
00228 return false;
00229 }
00230 payload = response[2];
00231 return true;
00232 }
00233
00234 void XMLRPCManager::serverThreadFunc()
00235 {
00236 disableAllSignalsInThisThread();
00237
00238 while(!shutting_down_)
00239 {
00240 {
00241 boost::mutex::scoped_lock lock(added_connections_mutex_);
00242 S_ASyncXMLRPCConnection::iterator it = added_connections_.begin();
00243 S_ASyncXMLRPCConnection::iterator end = added_connections_.end();
00244 for (; it != end; ++it)
00245 {
00246 (*it)->addToDispatch(server_.get_dispatch());
00247 connections_.insert(*it);
00248 }
00249
00250 added_connections_.clear();
00251 }
00252
00253
00254 {
00255 boost::mutex::scoped_lock lock(functions_mutex_);
00256 server_.work(0.1);
00257 }
00258
00259 while (unbind_requested_)
00260 {
00261 WallDuration(0.01).sleep();
00262 }
00263
00264 if (shutting_down_)
00265 {
00266 return;
00267 }
00268
00269 {
00270 S_ASyncXMLRPCConnection::iterator it = connections_.begin();
00271 S_ASyncXMLRPCConnection::iterator end = connections_.end();
00272 for (; it != end; ++it)
00273 {
00274 if ((*it)->check())
00275 {
00276 removeASyncConnection(*it);
00277 }
00278 }
00279 }
00280
00281 {
00282 boost::mutex::scoped_lock lock(removed_connections_mutex_);
00283 S_ASyncXMLRPCConnection::iterator it = removed_connections_.begin();
00284 S_ASyncXMLRPCConnection::iterator end = removed_connections_.end();
00285 for (; it != end; ++it)
00286 {
00287 (*it)->removeFromDispatch(server_.get_dispatch());
00288 connections_.erase(*it);
00289 }
00290
00291 removed_connections_.clear();
00292 }
00293 }
00294 }
00295
00296 XmlRpcClient* XMLRPCManager::getXMLRPCClient(const std::string &host, const int port, const std::string &uri)
00297 {
00298
00299 XmlRpcClient *c = NULL;
00300
00301 boost::mutex::scoped_lock lock(clients_mutex_);
00302
00303 for (V_CachedXmlRpcClient::iterator i = clients_.begin();
00304 !c && i != clients_.end(); )
00305 {
00306 if (!i->in_use_)
00307 {
00308
00309 if (i->client_->getHost() == host &&
00310 i->client_->getPort() == port &&
00311 i->client_->getUri() == uri)
00312 {
00313
00314 c = i->client_;
00315 i->in_use_ = true;
00316 i->last_use_time_ = WallTime::now();
00317 break;
00318 }
00319 else if (i->last_use_time_ + CachedXmlRpcClient::s_zombie_time_ < WallTime::now())
00320 {
00321
00322 delete i->client_;
00323 i = clients_.erase(i);
00324 }
00325 else
00326 {
00327 ++i;
00328 }
00329 }
00330 else
00331 {
00332 ++i;
00333 }
00334 }
00335
00336 if (!c)
00337 {
00338
00339 c = new XmlRpcClient(host.c_str(), port, uri.c_str());
00340 CachedXmlRpcClient mc(c);
00341 mc.in_use_ = true;
00342 mc.last_use_time_ = WallTime::now();
00343 clients_.push_back(mc);
00344
00345 }
00346
00347
00348 return c;
00349 }
00350
00351 void XMLRPCManager::releaseXMLRPCClient(XmlRpcClient *c)
00352 {
00353 boost::mutex::scoped_lock lock(clients_mutex_);
00354
00355 for (V_CachedXmlRpcClient::iterator i = clients_.begin();
00356 i != clients_.end(); ++i)
00357 {
00358 if (c == i->client_)
00359 {
00360 i->in_use_ = false;
00361 break;
00362 }
00363 }
00364 }
00365
00366 void XMLRPCManager::addASyncConnection(const ASyncXMLRPCConnectionPtr& conn)
00367 {
00368 boost::mutex::scoped_lock lock(added_connections_mutex_);
00369 added_connections_.insert(conn);
00370 }
00371
00372 void XMLRPCManager::removeASyncConnection(const ASyncXMLRPCConnectionPtr& conn)
00373 {
00374 boost::mutex::scoped_lock lock(removed_connections_mutex_);
00375 removed_connections_.insert(conn);
00376 }
00377
00378 bool XMLRPCManager::bind(const std::string& function_name, const XMLRPCFunc& cb)
00379 {
00380 boost::mutex::scoped_lock lock(functions_mutex_);
00381 if (functions_.find(function_name) != functions_.end())
00382 {
00383 return false;
00384 }
00385
00386 FunctionInfo info;
00387 info.name = function_name;
00388 info.function = cb;
00389 info.wrapper.reset(new XMLRPCCallWrapper(function_name, cb, &server_));
00390 functions_[function_name] = info;
00391
00392 return true;
00393 }
00394
00395 void XMLRPCManager::unbind(const std::string& function_name)
00396 {
00397 unbind_requested_ = true;
00398 boost::mutex::scoped_lock lock(functions_mutex_);
00399 functions_.erase(function_name);
00400 unbind_requested_ = false;
00401 }
00402
00403 }