xmlrpc_manager.cpp
Go to the documentation of this file.
1 /*
2  * Copyright (C) 2009, Willow Garage, Inc.
3  *
4  * Redistribution and use in source and binary forms, with or without
5  * modification, are permitted provided that the following conditions are met:
6  * * Redistributions of source code must retain the above copyright notice,
7  * this list of conditions and the following disclaimer.
8  * * Redistributions in binary form must reproduce the above copyright
9  * notice, this list of conditions and the following disclaimer in the
10  * documentation and/or other materials provided with the distribution.
11  * * Neither the names of Willow Garage, Inc. nor the names of its
12  * contributors may be used to endorse or promote products derived from
13  * this software without specific prior written permission.
14  *
15  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
16  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
17  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
18  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
19  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
20  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
21  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
22  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
23  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
24  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
25  * POSSIBILITY OF SUCH DAMAGE.
26  */
27 
28 #include "ros/xmlrpc_manager.h"
29 #include "ros/network.h"
30 #include "ros/param.h"
31 #include "ros/assert.h"
32 #include "ros/common.h"
33 #include "ros/file_log.h"
34 #include "ros/io.h"
35 
36 using namespace XmlRpc;
37 
38 namespace ros
39 {
40 
41 namespace xmlrpc
42 {
43 XmlRpc::XmlRpcValue responseStr(int code, const std::string& msg, const std::string& response)
44 {
46  v[0] = code;
47  v[1] = msg;
48  v[2] = response;
49  return v;
50 }
51 
52 XmlRpc::XmlRpcValue responseInt(int code, const std::string& msg, int response)
53 {
55  v[0] = int(code);
56  v[1] = msg;
57  v[2] = response;
58  return v;
59 }
60 
61 XmlRpc::XmlRpcValue responseBool(int code, const std::string& msg, bool response)
62 {
64  v[0] = int(code);
65  v[1] = msg;
66  v[2] = XmlRpc::XmlRpcValue(response);
67  return v;
68 }
69 }
70 
72 {
73 public:
74  XMLRPCCallWrapper(const std::string& function_name, const XMLRPCFunc& cb, XmlRpcServer *s)
75  : XmlRpcServerMethod(function_name, s)
76  , name_(function_name)
77  , func_(cb)
78  { }
79 
80  void execute(XmlRpcValue &params, XmlRpcValue &result)
81  {
82  func_(params, result);
83  }
84 
85 private:
86  std::string name_;
88 };
89 
90 void getPid(const XmlRpcValue& params, XmlRpcValue& result)
91 {
92  (void)params;
93  result = xmlrpc::responseInt(1, "", (int)getpid());
94 }
95 
96 const ros::WallDuration CachedXmlRpcClient::s_zombie_time_(30.0); // reap after 30 seconds
97 
100 const XMLRPCManagerPtr& XMLRPCManager::instance()
101 {
102  if (!g_xmlrpc_manager)
103  {
104  boost::mutex::scoped_lock lock(g_xmlrpc_manager_mutex);
105  if (!g_xmlrpc_manager)
106  {
107  g_xmlrpc_manager.reset(new XMLRPCManager);
108  }
109  }
110 
111  return g_xmlrpc_manager;
112 }
113 
114 XMLRPCManager::XMLRPCManager()
115 : port_(0)
116 , shutting_down_(false)
117 , unbind_requested_(false)
118 {
119 }
120 
122 {
123  shutdown();
124 }
125 
127 {
128  shutting_down_ = false;
129  port_ = 0;
130  bind("getPid", getPid);
131 
132  bool bound = server_.bindAndListen(0);
133  (void) bound;
134  ROS_ASSERT(bound);
135  port_ = server_.get_port();
136  ROS_ASSERT(port_ != 0);
137 
138  std::stringstream ss;
139  ss << "http://" << network::getHost() << ":" << port_ << "/";
140  uri_ = ss.str();
141 
142  server_thread_ = boost::thread(boost::bind(&XMLRPCManager::serverThreadFunc, this));
143 }
144 
146 {
147  if (shutting_down_)
148  {
149  return;
150  }
151 
152  shutting_down_ = true;
153  server_thread_.join();
154 
155  server_.close();
156 
157  // kill the last few clients that were started in the shutdown process
158  {
159  boost::mutex::scoped_lock lock(clients_mutex_);
160 
161  for (V_CachedXmlRpcClient::iterator i = clients_.begin();
162  i != clients_.end();)
163  {
164  if (!i->in_use_)
165  {
166  i->client_->close();
167  delete i->client_;
168  i = clients_.erase(i);
169  }
170  else
171  {
172  ++i;
173  }
174  }
175  }
176 
177  // Wait for the clients that are in use to finish and remove themselves from clients_
178  for (int wait_count = 0; !clients_.empty() && wait_count < 10; wait_count++)
179  {
180  ROSCPP_LOG_DEBUG("waiting for xmlrpc connection to finish...");
181  ros::WallDuration(0.01).sleep();
182  }
183 
184  boost::mutex::scoped_lock lock(functions_mutex_);
185  functions_.clear();
186 
187  {
188  S_ASyncXMLRPCConnection::iterator it = connections_.begin();
189  S_ASyncXMLRPCConnection::iterator end = connections_.end();
190  for (; it != end; ++it)
191  {
192  (*it)->removeFromDispatch(server_.get_dispatch());
193  }
194  }
195 
196  connections_.clear();
197 
198  {
199  boost::mutex::scoped_lock lock(added_connections_mutex_);
200  added_connections_.clear();
201  }
202 
203  {
204  boost::mutex::scoped_lock lock(removed_connections_mutex_);
205  removed_connections_.clear();
206  }
207 }
208 
209 bool XMLRPCManager::validateXmlrpcResponse(const std::string& method, XmlRpcValue &response,
210  XmlRpcValue &payload)
211 {
212  if (response.getType() != XmlRpcValue::TypeArray)
213  {
214  ROSCPP_LOG_DEBUG("XML-RPC call [%s] didn't return an array",
215  method.c_str());
216  return false;
217  }
218  if (response.size() != 2 && response.size() != 3)
219  {
220  ROSCPP_LOG_DEBUG("XML-RPC call [%s] didn't return a 2 or 3-element array",
221  method.c_str());
222  return false;
223  }
224  if (response[0].getType() != XmlRpcValue::TypeInt)
225  {
226  ROSCPP_LOG_DEBUG("XML-RPC call [%s] didn't return a int as the 1st element",
227  method.c_str());
228  return false;
229  }
230  int status_code = response[0];
231  if (response[1].getType() != XmlRpcValue::TypeString)
232  {
233  ROSCPP_LOG_DEBUG("XML-RPC call [%s] didn't return a string as the 2nd element",
234  method.c_str());
235  return false;
236  }
237  std::string status_string = response[1];
238  if (status_code != 1)
239  {
240  ROSCPP_LOG_DEBUG("XML-RPC call [%s] returned an error (%d): [%s]",
241  method.c_str(), status_code, status_string.c_str());
242  return false;
243  }
244  if (response.size() > 2)
245  {
246  payload = response[2];
247  }
248  else
249  {
250  std::string empty_array = "<value><array><data></data></array></value>";
251  int offset = 0;
252  payload = XmlRpcValue(empty_array, &offset);
253  }
254  return true;
255 }
256 
258 {
259  disableAllSignalsInThisThread();
260 
261  while(!shutting_down_)
262  {
263  {
264  boost::mutex::scoped_lock lock(added_connections_mutex_);
265  S_ASyncXMLRPCConnection::iterator it = added_connections_.begin();
266  S_ASyncXMLRPCConnection::iterator end = added_connections_.end();
267  for (; it != end; ++it)
268  {
269  (*it)->addToDispatch(server_.get_dispatch());
270  connections_.insert(*it);
271  }
272 
273  added_connections_.clear();
274  }
275 
276  // Update the XMLRPC server, blocking for at most 100ms in select()
277  {
278  boost::mutex::scoped_lock lock(functions_mutex_);
279  server_.work(0.1);
280  }
281 
282  while (unbind_requested_)
283  {
284  WallDuration(0.01).sleep();
285  }
286 
287  if (shutting_down_)
288  {
289  return;
290  }
291 
292  {
293  S_ASyncXMLRPCConnection::iterator it = connections_.begin();
294  S_ASyncXMLRPCConnection::iterator end = connections_.end();
295  for (; it != end; ++it)
296  {
297  if ((*it)->check())
298  {
300  }
301  }
302  }
303 
304  {
305  boost::mutex::scoped_lock lock(removed_connections_mutex_);
306  S_ASyncXMLRPCConnection::iterator it = removed_connections_.begin();
307  S_ASyncXMLRPCConnection::iterator end = removed_connections_.end();
308  for (; it != end; ++it)
309  {
310  (*it)->removeFromDispatch(server_.get_dispatch());
311  connections_.erase(*it);
312  }
313 
314  removed_connections_.clear();
315  }
316  }
317 }
318 
319 XmlRpcClient* XMLRPCManager::getXMLRPCClient(const std::string &host, const int port, const std::string &uri)
320 {
321  // go through our vector of clients and grab the first available one
322  XmlRpcClient *c = NULL;
323 
324  boost::mutex::scoped_lock lock(clients_mutex_);
325 
326  for (V_CachedXmlRpcClient::iterator i = clients_.begin();
327  !c && i != clients_.end(); )
328  {
329  if (!i->in_use_)
330  {
331  // see where it's pointing
332  if (i->client_->getHost() == host &&
333  i->client_->getPort() == port &&
334  i->client_->getUri() == uri)
335  {
336  // hooray, it's pointing at our destination. re-use it.
337  c = i->client_;
338  i->in_use_ = true;
339  i->last_use_time_ = SteadyTime::now();
340  break;
341  }
342  else if (i->last_use_time_ + CachedXmlRpcClient::s_zombie_time_ < SteadyTime::now())
343  {
344  // toast this guy. he's dead and nobody is reusing him.
345  delete i->client_;
346  i = clients_.erase(i);
347  }
348  else
349  {
350  ++i; // move along. this guy isn't dead yet.
351  }
352  }
353  else
354  {
355  ++i;
356  }
357  }
358 
359  if (!c)
360  {
361  // allocate a new one
362  c = new XmlRpcClient(host.c_str(), port, uri.c_str());
363  CachedXmlRpcClient mc(c);
364  mc.in_use_ = true;
365  mc.last_use_time_ = SteadyTime::now();
366  clients_.push_back(mc);
367  //ROS_INFO("%d xmlrpc clients allocated\n", xmlrpc_clients.size());
368  }
369  // ONUS IS ON THE RECEIVER TO UNSET THE IN_USE FLAG
370  // by calling releaseXMLRPCClient
371  return c;
372 }
373 
375 {
376  boost::mutex::scoped_lock lock(clients_mutex_);
377 
378  for (V_CachedXmlRpcClient::iterator i = clients_.begin();
379  i != clients_.end(); ++i)
380  {
381  if (c == i->client_)
382  {
383  if (shutting_down_)
384  {
385  // if we are shutting down we won't be re-using the client
386  i->client_->close();
387  delete i->client_;
388  clients_.erase(i);
389  }
390  else
391  {
392  i->in_use_ = false;
393  }
394  break;
395  }
396  }
397 }
398 
400 {
401  boost::mutex::scoped_lock lock(added_connections_mutex_);
402  added_connections_.insert(conn);
403 }
404 
406 {
407  boost::mutex::scoped_lock lock(removed_connections_mutex_);
408  removed_connections_.insert(conn);
409 }
410 
411 bool XMLRPCManager::bind(const std::string& function_name, const XMLRPCFunc& cb)
412 {
413  boost::mutex::scoped_lock lock(functions_mutex_);
414  if (functions_.find(function_name) != functions_.end())
415  {
416  return false;
417  }
418 
419  FunctionInfo info;
420  info.name = function_name;
421  info.function = cb;
422  info.wrapper.reset(new XMLRPCCallWrapper(function_name, cb, &server_));
423  functions_[function_name] = info;
424 
425  return true;
426 }
427 
428 void XMLRPCManager::unbind(const std::string& function_name)
429 {
430  unbind_requested_ = true;
431  boost::mutex::scoped_lock lock(functions_mutex_);
432  functions_.erase(function_name);
433  unbind_requested_ = false;
434 }
435 
436 } // namespace ros
void execute(XmlRpcValue &params, XmlRpcValue &result)
volatile bool unbind_requested_
boost::mutex added_connections_mutex_
XmlRpc::XmlRpcValue responseBool(int code, const std::string &msg, bool response)
M_StringToFuncInfo functions_
void getPid(const XmlRpcValue &params, XmlRpcValue &result)
bool validateXmlrpcResponse(const std::string &method, XmlRpc::XmlRpcValue &response, XmlRpc::XmlRpcValue &payload)
Validate an XML/RPC response.
boost::thread server_thread_
XmlRpc::XmlRpcValue responseInt(int code, const std::string &msg, int response)
int size() const
void work(double msTime)
bool bindAndListen(int port, int backlog=5)
boost::mutex g_xmlrpc_manager_mutex
V_CachedXmlRpcClient clients_
XmlRpc::XmlRpcValue responseStr(int code, const std::string &msg, const std::string &response)
boost::mutex functions_mutex_
Type const & getType() const
S_ASyncXMLRPCConnection connections_
void removeASyncConnection(const ASyncXMLRPCConnectionPtr &conn)
XmlRpcDispatch * get_dispatch()
XMLRPCManagerPtr g_xmlrpc_manager
virtual void close()
boost::mutex removed_connections_mutex_
#define ROSCPP_LOG_DEBUG(...)
Definition: file_log.h:35
bool sleep() const
ROSCPP_DECL const std::string & getHost()
Definition: network.cpp:50
boost::function< void(XmlRpc::XmlRpcValue &, XmlRpc::XmlRpcValue &)> XMLRPCFunc
static SteadyTime now()
boost::mutex clients_mutex_
void addASyncConnection(const ASyncXMLRPCConnectionPtr &conn)
bool bind(const std::string &function_name, const XMLRPCFunc &cb)
XmlRpc::XmlRpcClient * getXMLRPCClient(const std::string &host, const int port, const std::string &uri)
XMLRPCCallWrapper(const std::string &function_name, const XMLRPCFunc &cb, XmlRpcServer *s)
void releaseXMLRPCClient(XmlRpc::XmlRpcClient *c)
#define ROS_ASSERT(cond)
S_ASyncXMLRPCConnection removed_connections_
void unbind(const std::string &function_name)
S_ASyncXMLRPCConnection added_connections_
XmlRpc::XmlRpcServer server_


roscpp
Author(s): Morgan Quigley, Josh Faust, Brian Gerkey, Troy Straszheim
autogenerated on Wed Mar 21 2018 07:13:27