xmlrpc_manager.cpp
Go to the documentation of this file.
1 /*
2  * Copyright (C) 2009, Willow Garage, Inc.
3  *
4  * Redistribution and use in source and binary forms, with or without
5  * modification, are permitted provided that the following conditions are met:
6  * * Redistributions of source code must retain the above copyright notice,
7  * this list of conditions and the following disclaimer.
8  * * Redistributions in binary form must reproduce the above copyright
9  * notice, this list of conditions and the following disclaimer in the
10  * documentation and/or other materials provided with the distribution.
11  * * Neither the names of Willow Garage, Inc. nor the names of its
12  * contributors may be used to endorse or promote products derived from
13  * this software without specific prior written permission.
14  *
15  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
16  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
17  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
18  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
19  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
20  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
21  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
22  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
23  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
24  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
25  * POSSIBILITY OF SUCH DAMAGE.
26  */
27 
28 #include "ros/xmlrpc_manager.h"
29 #include "ros/network.h"
30 #include "ros/param.h"
31 #include "ros/assert.h"
32 #include "ros/common.h"
33 #include "ros/file_log.h"
34 #include "ros/io.h"
35 
36 using namespace XmlRpc;
37 
38 namespace ros
39 {
40 
41 namespace xmlrpc
42 {
43 XmlRpc::XmlRpcValue responseStr(int code, const std::string& msg, const std::string& response)
44 {
46  v[0] = code;
47  v[1] = msg;
48  v[2] = response;
49  return v;
50 }
51 
52 XmlRpc::XmlRpcValue responseInt(int code, const std::string& msg, int response)
53 {
55  v[0] = int(code);
56  v[1] = msg;
57  v[2] = response;
58  return v;
59 }
60 
61 XmlRpc::XmlRpcValue responseBool(int code, const std::string& msg, bool response)
62 {
64  v[0] = int(code);
65  v[1] = msg;
66  v[2] = XmlRpc::XmlRpcValue(response);
67  return v;
68 }
69 }
70 
72 {
73 public:
74  XMLRPCCallWrapper(const std::string& function_name, const XMLRPCFunc& cb, XmlRpcServer *s)
75  : XmlRpcServerMethod(function_name, s)
76  , name_(function_name)
77  , func_(cb)
78  { }
79 
80  void execute(XmlRpcValue &params, XmlRpcValue &result)
81  {
82  func_(params, result);
83  }
84 
85 private:
86  std::string name_;
88 };
89 
90 void getPid(const XmlRpcValue& params, XmlRpcValue& result)
91 {
92  (void)params;
93  result = xmlrpc::responseInt(1, "", (int)getpid());
94 }
95 
96 const ros::WallDuration CachedXmlRpcClient::s_zombie_time_(30.0); // reap after 30 seconds
97 
98 const XMLRPCManagerPtr& XMLRPCManager::instance()
99 {
100  static XMLRPCManagerPtr xmlrpc_manager = boost::make_shared<XMLRPCManager>();
101  return xmlrpc_manager;
102 }
103 
104 XMLRPCManager::XMLRPCManager()
105 : port_(0)
106 , shutting_down_(false)
107 , unbind_requested_(false)
108 {
109 }
110 
112 {
113  shutdown();
114 }
115 
117 {
118  shutting_down_ = false;
119  port_ = 0;
120  bind("getPid", getPid);
121 
122  bool bound = server_.bindAndListen(0);
123  (void) bound;
124  ROS_ASSERT(bound);
125  port_ = server_.get_port();
126  ROS_ASSERT(port_ != 0);
127 
128  std::stringstream ss;
129  ss << "http://" << network::getHost() << ":" << port_ << "/";
130  uri_ = ss.str();
131 
132  server_thread_ = boost::thread(boost::bind(&XMLRPCManager::serverThreadFunc, this));
133 }
134 
136 {
137  if (shutting_down_)
138  {
139  return;
140  }
141 
142  shutting_down_ = true;
143  server_thread_.join();
144 
145  server_.close();
146 
147  // kill the last few clients that were started in the shutdown process
148  {
149  boost::mutex::scoped_lock lock(clients_mutex_);
150 
151  for (V_CachedXmlRpcClient::iterator i = clients_.begin();
152  i != clients_.end();)
153  {
154  if (!i->in_use_)
155  {
156  i->client_->close();
157  delete i->client_;
158  i = clients_.erase(i);
159  }
160  else
161  {
162  ++i;
163  }
164  }
165  }
166 
167  // Wait for the clients that are in use to finish and remove themselves from clients_
168  for (int wait_count = 0; !clients_.empty() && wait_count < 10; wait_count++)
169  {
170  ROSCPP_LOG_DEBUG("waiting for xmlrpc connection to finish...");
171  ros::WallDuration(0.01).sleep();
172  }
173 
174  boost::mutex::scoped_lock lock(functions_mutex_);
175  functions_.clear();
176 
177  {
178  S_ASyncXMLRPCConnection::iterator it = connections_.begin();
179  S_ASyncXMLRPCConnection::iterator end = connections_.end();
180  for (; it != end; ++it)
181  {
182  (*it)->removeFromDispatch(server_.get_dispatch());
183  }
184  }
185 
186  connections_.clear();
187 
188  {
189  boost::mutex::scoped_lock lock(added_connections_mutex_);
190  added_connections_.clear();
191  }
192 
193  {
194  boost::mutex::scoped_lock lock(removed_connections_mutex_);
195  removed_connections_.clear();
196  }
197 }
198 
199 bool XMLRPCManager::validateXmlrpcResponse(const std::string& method, XmlRpcValue &response,
200  XmlRpcValue &payload)
201 {
202  if (response.getType() != XmlRpcValue::TypeArray)
203  {
204  ROSCPP_LOG_DEBUG("XML-RPC call [%s] didn't return an array",
205  method.c_str());
206  return false;
207  }
208  if (response.size() != 2 && response.size() != 3)
209  {
210  ROSCPP_LOG_DEBUG("XML-RPC call [%s] didn't return a 2 or 3-element array",
211  method.c_str());
212  return false;
213  }
214  if (response[0].getType() != XmlRpcValue::TypeInt)
215  {
216  ROSCPP_LOG_DEBUG("XML-RPC call [%s] didn't return a int as the 1st element",
217  method.c_str());
218  return false;
219  }
220  int status_code = response[0];
221  if (response[1].getType() != XmlRpcValue::TypeString)
222  {
223  ROSCPP_LOG_DEBUG("XML-RPC call [%s] didn't return a string as the 2nd element",
224  method.c_str());
225  return false;
226  }
227  std::string status_string = response[1];
228  if (status_code != 1)
229  {
230  ROSCPP_LOG_DEBUG("XML-RPC call [%s] returned an error (%d): [%s]",
231  method.c_str(), status_code, status_string.c_str());
232  return false;
233  }
234  if (response.size() > 2)
235  {
236  payload = response[2];
237  }
238  else
239  {
240  std::string empty_array = "<value><array><data></data></array></value>";
241  int offset = 0;
242  payload = XmlRpcValue(empty_array, &offset);
243  }
244  return true;
245 }
246 
248 {
249  disableAllSignalsInThisThread();
250 
251  while(!shutting_down_)
252  {
253  {
254  boost::mutex::scoped_lock lock(added_connections_mutex_);
255  S_ASyncXMLRPCConnection::iterator it = added_connections_.begin();
256  S_ASyncXMLRPCConnection::iterator end = added_connections_.end();
257  for (; it != end; ++it)
258  {
259  (*it)->addToDispatch(server_.get_dispatch());
260  connections_.insert(*it);
261  }
262 
263  added_connections_.clear();
264  }
265 
266  // Update the XMLRPC server, blocking for at most 100ms in select()
267  {
268  boost::mutex::scoped_lock lock(functions_mutex_);
269  server_.work(0.1);
270  }
271 
272  while (unbind_requested_)
273  {
274  WallDuration(0.01).sleep();
275  }
276 
277  if (shutting_down_)
278  {
279  return;
280  }
281 
282  {
283  S_ASyncXMLRPCConnection::iterator it = connections_.begin();
284  S_ASyncXMLRPCConnection::iterator end = connections_.end();
285  for (; it != end; ++it)
286  {
287  if ((*it)->check())
288  {
290  }
291  }
292  }
293 
294  {
295  boost::mutex::scoped_lock lock(removed_connections_mutex_);
296  S_ASyncXMLRPCConnection::iterator it = removed_connections_.begin();
297  S_ASyncXMLRPCConnection::iterator end = removed_connections_.end();
298  for (; it != end; ++it)
299  {
300  (*it)->removeFromDispatch(server_.get_dispatch());
301  connections_.erase(*it);
302  }
303 
304  removed_connections_.clear();
305  }
306  }
307 }
308 
309 XmlRpcClient* XMLRPCManager::getXMLRPCClient(const std::string &host, const int port, const std::string &uri)
310 {
311  // go through our vector of clients and grab the first available one
312  XmlRpcClient *c = NULL;
313 
314  boost::mutex::scoped_lock lock(clients_mutex_);
315 
316  for (V_CachedXmlRpcClient::iterator i = clients_.begin();
317  !c && i != clients_.end(); )
318  {
319  if (!i->in_use_)
320  {
321  // see where it's pointing
322  if (i->client_->getHost() == host &&
323  i->client_->getPort() == port &&
324  i->client_->getUri() == uri)
325  {
326  // hooray, it's pointing at our destination. re-use it.
327  c = i->client_;
328  i->in_use_ = true;
329  i->last_use_time_ = SteadyTime::now();
330  break;
331  }
332  else if (i->last_use_time_ + CachedXmlRpcClient::s_zombie_time_ < SteadyTime::now())
333  {
334  // toast this guy. he's dead and nobody is reusing him.
335  delete i->client_;
336  i = clients_.erase(i);
337  }
338  else
339  {
340  ++i; // move along. this guy isn't dead yet.
341  }
342  }
343  else
344  {
345  ++i;
346  }
347  }
348 
349  if (!c)
350  {
351  // allocate a new one
352  c = new XmlRpcClient(host.c_str(), port, uri.c_str());
353  CachedXmlRpcClient mc(c);
354  mc.in_use_ = true;
355  mc.last_use_time_ = SteadyTime::now();
356  clients_.push_back(mc);
357  //ROS_INFO("%d xmlrpc clients allocated\n", xmlrpc_clients.size());
358  }
359  // ONUS IS ON THE RECEIVER TO UNSET THE IN_USE FLAG
360  // by calling releaseXMLRPCClient
361  return c;
362 }
363 
365 {
366  boost::mutex::scoped_lock lock(clients_mutex_);
367 
368  for (V_CachedXmlRpcClient::iterator i = clients_.begin();
369  i != clients_.end(); ++i)
370  {
371  if (c == i->client_)
372  {
373  if (shutting_down_)
374  {
375  // if we are shutting down we won't be re-using the client
376  i->client_->close();
377  delete i->client_;
378  clients_.erase(i);
379  }
380  else
381  {
382  i->in_use_ = false;
383  }
384  break;
385  }
386  }
387 }
388 
390 {
391  boost::mutex::scoped_lock lock(added_connections_mutex_);
392  added_connections_.insert(conn);
393 }
394 
396 {
397  boost::mutex::scoped_lock lock(removed_connections_mutex_);
398  removed_connections_.insert(conn);
399 }
400 
401 bool XMLRPCManager::bind(const std::string& function_name, const XMLRPCFunc& cb)
402 {
403  boost::mutex::scoped_lock lock(functions_mutex_);
404  if (functions_.find(function_name) != functions_.end())
405  {
406  return false;
407  }
408 
409  FunctionInfo info;
410  info.name = function_name;
411  info.function = cb;
412  info.wrapper.reset(new XMLRPCCallWrapper(function_name, cb, &server_));
413  functions_[function_name] = info;
414 
415  return true;
416 }
417 
418 void XMLRPCManager::unbind(const std::string& function_name)
419 {
420  unbind_requested_ = true;
421  boost::mutex::scoped_lock lock(functions_mutex_);
422  functions_.erase(function_name);
423  unbind_requested_ = false;
424 }
425 
426 } // namespace ros
void execute(XmlRpcValue &params, XmlRpcValue &result)
volatile bool unbind_requested_
boost::mutex added_connections_mutex_
XmlRpc::XmlRpcValue responseBool(int code, const std::string &msg, bool response)
M_StringToFuncInfo functions_
void getPid(const XmlRpcValue &params, XmlRpcValue &result)
bool validateXmlrpcResponse(const std::string &method, XmlRpc::XmlRpcValue &response, XmlRpc::XmlRpcValue &payload)
Validate an XML/RPC response.
boost::thread server_thread_
XmlRpc::XmlRpcValue responseInt(int code, const std::string &msg, int response)
int size() const
void work(double msTime)
bool bindAndListen(int port, int backlog=5)
V_CachedXmlRpcClient clients_
XmlRpc::XmlRpcValue responseStr(int code, const std::string &msg, const std::string &response)
boost::mutex functions_mutex_
Type const & getType() const
S_ASyncXMLRPCConnection connections_
void removeASyncConnection(const ASyncXMLRPCConnectionPtr &conn)
XmlRpcDispatch * get_dispatch()
virtual void close()
boost::mutex removed_connections_mutex_
#define ROSCPP_LOG_DEBUG(...)
Definition: file_log.h:35
bool sleep() const
ROSCPP_DECL const std::string & getHost()
Definition: network.cpp:50
boost::function< void(XmlRpc::XmlRpcValue &, XmlRpc::XmlRpcValue &)> XMLRPCFunc
static SteadyTime now()
boost::mutex clients_mutex_
void addASyncConnection(const ASyncXMLRPCConnectionPtr &conn)
bool bind(const std::string &function_name, const XMLRPCFunc &cb)
XmlRpc::XmlRpcClient * getXMLRPCClient(const std::string &host, const int port, const std::string &uri)
XMLRPCCallWrapper(const std::string &function_name, const XMLRPCFunc &cb, XmlRpcServer *s)
void releaseXMLRPCClient(XmlRpc::XmlRpcClient *c)
#define ROS_ASSERT(cond)
S_ASyncXMLRPCConnection removed_connections_
void unbind(const std::string &function_name)
S_ASyncXMLRPCConnection added_connections_
XmlRpc::XmlRpcServer server_


roscpp
Author(s): Morgan Quigley, Josh Faust, Brian Gerkey, Troy Straszheim
autogenerated on Sun Feb 3 2019 03:29:54