xmlrpc_manager.cpp
Go to the documentation of this file.
1 /*
2  * Copyright (C) 2009, Willow Garage, Inc.
3  *
4  * Redistribution and use in source and binary forms, with or without
5  * modification, are permitted provided that the following conditions are met:
6  * * Redistributions of source code must retain the above copyright notice,
7  * this list of conditions and the following disclaimer.
8  * * Redistributions in binary form must reproduce the above copyright
9  * notice, this list of conditions and the following disclaimer in the
10  * documentation and/or other materials provided with the distribution.
11  * * Neither the names of Willow Garage, Inc. nor the names of its
12  * contributors may be used to endorse or promote products derived from
13  * this software without specific prior written permission.
14  *
15  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
16  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
17  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
18  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
19  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
20  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
21  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
22  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
23  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
24  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
25  * POSSIBILITY OF SUCH DAMAGE.
26  */
27 
28 #include "ros/xmlrpc_manager.h"
29 #include "ros/network.h"
30 #include "ros/param.h"
31 #include "ros/assert.h"
32 #include "ros/common.h"
33 #include "ros/file_log.h"
34 #include "ros/io.h"
35 
36 using namespace XmlRpc;
37 
38 namespace ros
39 {
40 
41 namespace xmlrpc
42 {
43 XmlRpc::XmlRpcValue responseStr(int code, const std::string& msg, const std::string& response)
44 {
46  v[0] = code;
47  v[1] = msg;
48  v[2] = response;
49  return v;
50 }
51 
52 XmlRpc::XmlRpcValue responseInt(int code, const std::string& msg, int response)
53 {
55  v[0] = int(code);
56  v[1] = msg;
57  v[2] = response;
58  return v;
59 }
60 
61 XmlRpc::XmlRpcValue responseBool(int code, const std::string& msg, bool response)
62 {
64  v[0] = int(code);
65  v[1] = msg;
66  v[2] = XmlRpc::XmlRpcValue(response);
67  return v;
68 }
69 }
70 
72 {
73 public:
74  XMLRPCCallWrapper(const std::string& function_name, const XMLRPCFunc& cb, XmlRpcServer *s)
75  : XmlRpcServerMethod(function_name, s)
76  , name_(function_name)
77  , func_(cb)
78  { }
79 
80  void execute(XmlRpcValue &params, XmlRpcValue &result)
81  {
82  func_(params, result);
83  }
84 
85 private:
86  std::string name_;
88 };
89 
90 void getPid(const XmlRpcValue& params, XmlRpcValue& result)
91 {
92  (void)params;
93  result = xmlrpc::responseInt(1, "", (int)getpid());
94 }
95 
96 const ros::WallDuration CachedXmlRpcClient::s_zombie_time_(30.0); // reap after 30 seconds
97 
98 const XMLRPCManagerPtr& XMLRPCManager::instance()
99 {
100  static XMLRPCManagerPtr xmlrpc_manager = boost::make_shared<XMLRPCManager>();
101  return xmlrpc_manager;
102 }
103 
104 XMLRPCManager::XMLRPCManager()
105 : port_(0)
106 , shutting_down_(false)
107 , unbind_requested_(false)
108 {
109 }
110 
112 {
113  shutdown();
114 }
115 
117 {
118  shutting_down_ = false;
119  port_ = 0;
120  bind("getPid", getPid);
121 
122  bool bound = server_.bindAndListen(0);
123  (void) bound;
124  ROS_ASSERT(bound);
125  port_ = server_.get_port();
126  ROS_ASSERT(port_ != 0);
127 
128  std::stringstream ss;
129  ss << "http://" << network::getHost() << ":" << port_ << "/";
130  uri_ = ss.str();
131 
132  server_thread_ = boost::thread(boost::bind(&XMLRPCManager::serverThreadFunc, this));
133 }
134 
136 {
137  if (shutting_down_)
138  {
139  return;
140  }
141 
142  // before shutting down, unsubscribe all the cached parameter
144 
145  shutting_down_ = true;
146  server_thread_.join();
147 
148  server_.close();
149 
150  // kill the last few clients that were started in the shutdown process
151  {
152  boost::mutex::scoped_lock lock(clients_mutex_);
153 
154  for (V_CachedXmlRpcClient::iterator i = clients_.begin();
155  i != clients_.end();)
156  {
157  if (!i->in_use_)
158  {
159  i->client_->close();
160  delete i->client_;
161  i = clients_.erase(i);
162  }
163  else
164  {
165  ++i;
166  }
167  }
168  }
169 
170  // Wait for the clients that are in use to finish and remove themselves from clients_
171  for (int wait_count = 0; !clients_.empty() && wait_count < 10; wait_count++)
172  {
173  ROSCPP_LOG_DEBUG("waiting for xmlrpc connection to finish...");
174  ros::WallDuration(0.01).sleep();
175  }
176 
177  boost::mutex::scoped_lock lock(functions_mutex_);
178  functions_.clear();
179 
180  {
181  S_ASyncXMLRPCConnection::iterator it = connections_.begin();
182  S_ASyncXMLRPCConnection::iterator end = connections_.end();
183  for (; it != end; ++it)
184  {
185  (*it)->removeFromDispatch(server_.get_dispatch());
186  }
187  }
188 
189  connections_.clear();
190 
191  {
192  boost::mutex::scoped_lock lock(added_connections_mutex_);
193  added_connections_.clear();
194  }
195 
196  {
197  boost::mutex::scoped_lock lock(removed_connections_mutex_);
198  removed_connections_.clear();
199  }
200 }
201 
202 bool XMLRPCManager::validateXmlrpcResponse(const std::string& method, XmlRpcValue &response,
203  XmlRpcValue &payload)
204 {
205  if (response.getType() != XmlRpcValue::TypeArray)
206  {
207  ROSCPP_LOG_DEBUG("XML-RPC call [%s] didn't return an array",
208  method.c_str());
209  return false;
210  }
211  if (response.size() != 2 && response.size() != 3)
212  {
213  ROSCPP_LOG_DEBUG("XML-RPC call [%s] didn't return a 2 or 3-element array",
214  method.c_str());
215  return false;
216  }
217  if (response[0].getType() != XmlRpcValue::TypeInt)
218  {
219  ROSCPP_LOG_DEBUG("XML-RPC call [%s] didn't return a int as the 1st element",
220  method.c_str());
221  return false;
222  }
223  int status_code = response[0];
224  if (response[1].getType() != XmlRpcValue::TypeString)
225  {
226  ROSCPP_LOG_DEBUG("XML-RPC call [%s] didn't return a string as the 2nd element",
227  method.c_str());
228  return false;
229  }
230  std::string status_string = response[1];
231  if (status_code != 1)
232  {
233  ROSCPP_LOG_DEBUG("XML-RPC call [%s] returned an error (%d): [%s]",
234  method.c_str(), status_code, status_string.c_str());
235  return false;
236  }
237  if (response.size() > 2)
238  {
239  payload = response[2];
240  }
241  else
242  {
243  std::string empty_array = "<value><array><data></data></array></value>";
244  int offset = 0;
245  payload = XmlRpcValue(empty_array, &offset);
246  }
247  return true;
248 }
249 
251 {
252  disableAllSignalsInThisThread();
253 
254  while(!shutting_down_)
255  {
256  {
257  boost::mutex::scoped_lock lock(added_connections_mutex_);
258  S_ASyncXMLRPCConnection::iterator it = added_connections_.begin();
259  S_ASyncXMLRPCConnection::iterator end = added_connections_.end();
260  for (; it != end; ++it)
261  {
262  (*it)->addToDispatch(server_.get_dispatch());
263  connections_.insert(*it);
264  }
265 
266  added_connections_.clear();
267  }
268 
269  // Update the XMLRPC server, blocking for at most 100ms in select()
270  {
271  boost::mutex::scoped_lock lock(functions_mutex_);
272  server_.work(0.1);
273  }
274 
275  while (unbind_requested_)
276  {
277  WallDuration(0.01).sleep();
278  }
279 
280  if (shutting_down_)
281  {
282  return;
283  }
284 
285  {
286  S_ASyncXMLRPCConnection::iterator it = connections_.begin();
287  S_ASyncXMLRPCConnection::iterator end = connections_.end();
288  for (; it != end; ++it)
289  {
290  if ((*it)->check())
291  {
293  }
294  }
295  }
296 
297  {
298  boost::mutex::scoped_lock lock(removed_connections_mutex_);
299  S_ASyncXMLRPCConnection::iterator it = removed_connections_.begin();
300  S_ASyncXMLRPCConnection::iterator end = removed_connections_.end();
301  for (; it != end; ++it)
302  {
303  (*it)->removeFromDispatch(server_.get_dispatch());
304  connections_.erase(*it);
305  }
306 
307  removed_connections_.clear();
308  }
309  }
310 }
311 
312 XmlRpcClient* XMLRPCManager::getXMLRPCClient(const std::string &host, const int port, const std::string &uri)
313 {
314  // go through our vector of clients and grab the first available one
315  XmlRpcClient *c = NULL;
316 
317  boost::mutex::scoped_lock lock(clients_mutex_);
318 
319  for (V_CachedXmlRpcClient::iterator i = clients_.begin();
320  !c && i != clients_.end(); )
321  {
322  if (!i->in_use_)
323  {
324  // see where it's pointing
325  if (i->client_->getHost() == host &&
326  i->client_->getPort() == port &&
327  i->client_->getUri() == uri)
328  {
329  // hooray, it's pointing at our destination. re-use it.
330  c = i->client_;
331  i->in_use_ = true;
332  i->last_use_time_ = SteadyTime::now();
333  break;
334  }
335  else if (i->last_use_time_ + CachedXmlRpcClient::s_zombie_time_ < SteadyTime::now())
336  {
337  // toast this guy. he's dead and nobody is reusing him.
338  delete i->client_;
339  i = clients_.erase(i);
340  }
341  else
342  {
343  ++i; // move along. this guy isn't dead yet.
344  }
345  }
346  else
347  {
348  ++i;
349  }
350  }
351 
352  if (!c)
353  {
354  // allocate a new one
355  c = new XmlRpcClient(host.c_str(), port, uri.c_str());
356  CachedXmlRpcClient mc(c);
357  mc.in_use_ = true;
358  mc.last_use_time_ = SteadyTime::now();
359  clients_.push_back(mc);
360  //ROS_INFO("%d xmlrpc clients allocated\n", xmlrpc_clients.size());
361  }
362  // ONUS IS ON THE RECEIVER TO UNSET THE IN_USE FLAG
363  // by calling releaseXMLRPCClient
364  return c;
365 }
366 
368 {
369  boost::mutex::scoped_lock lock(clients_mutex_);
370 
371  for (V_CachedXmlRpcClient::iterator i = clients_.begin();
372  i != clients_.end(); ++i)
373  {
374  if (c == i->client_)
375  {
376  if (shutting_down_ || !c->getKeepOpen())
377  {
378  // if we are shutting down we won't be re-using the client
379  i->client_->close();
380  delete i->client_;
381  clients_.erase(i);
382  }
383  else
384  {
385  i->in_use_ = false;
386  }
387  break;
388  }
389  }
390 }
391 
393 {
394  boost::mutex::scoped_lock lock(added_connections_mutex_);
395  added_connections_.insert(conn);
396 }
397 
399 {
400  boost::mutex::scoped_lock lock(removed_connections_mutex_);
401  removed_connections_.insert(conn);
402 }
403 
404 bool XMLRPCManager::bind(const std::string& function_name, const XMLRPCFunc& cb)
405 {
406  boost::mutex::scoped_lock lock(functions_mutex_);
407  if (functions_.find(function_name) != functions_.end())
408  {
409  return false;
410  }
411 
412  FunctionInfo info;
413  info.name = function_name;
414  info.function = cb;
415  info.wrapper.reset(new XMLRPCCallWrapper(function_name, cb, &server_));
416  functions_[function_name] = info;
417 
418  return true;
419 }
420 
421 void XMLRPCManager::unbind(const std::string& function_name)
422 {
423  unbind_requested_ = true;
424  boost::mutex::scoped_lock lock(functions_mutex_);
425  functions_.erase(function_name);
426  unbind_requested_ = false;
427 }
428 
429 } // namespace ros
void execute(XmlRpcValue &params, XmlRpcValue &result)
volatile bool unbind_requested_
bool getKeepOpen() const
boost::mutex added_connections_mutex_
XmlRpc::XmlRpcValue responseBool(int code, const std::string &msg, bool response)
M_StringToFuncInfo functions_
void getPid(const XmlRpcValue &params, XmlRpcValue &result)
bool validateXmlrpcResponse(const std::string &method, XmlRpc::XmlRpcValue &response, XmlRpc::XmlRpcValue &payload)
Validate an XML/RPC response.
boost::thread server_thread_
XmlRpc::XmlRpcValue responseInt(int code, const std::string &msg, int response)
int size() const
void work(double msTime)
bool bindAndListen(int port, int backlog=5)
V_CachedXmlRpcClient clients_
XmlRpc::XmlRpcValue responseStr(int code, const std::string &msg, const std::string &response)
boost::mutex functions_mutex_
Type const & getType() const
S_ASyncXMLRPCConnection connections_
void removeASyncConnection(const ASyncXMLRPCConnectionPtr &conn)
XmlRpcDispatch * get_dispatch()
virtual void close()
boost::mutex removed_connections_mutex_
#define ROSCPP_LOG_DEBUG(...)
Definition: file_log.h:35
bool sleep() const
ROSCPP_DECL const std::string & getHost()
Definition: network.cpp:50
boost::function< void(XmlRpc::XmlRpcValue &, XmlRpc::XmlRpcValue &)> XMLRPCFunc
static SteadyTime now()
boost::mutex clients_mutex_
void addASyncConnection(const ASyncXMLRPCConnectionPtr &conn)
bool bind(const std::string &function_name, const XMLRPCFunc &cb)
XmlRpc::XmlRpcClient * getXMLRPCClient(const std::string &host, const int port, const std::string &uri)
XMLRPCCallWrapper(const std::string &function_name, const XMLRPCFunc &cb, XmlRpcServer *s)
ROSCPP_DECL void unsubscribeCachedParam(const std::string &key)
Unsubscribe cached parameter from the master.
Definition: param.cpp:814
void releaseXMLRPCClient(XmlRpc::XmlRpcClient *c)
#define ROS_ASSERT(cond)
S_ASyncXMLRPCConnection removed_connections_
void unbind(const std::string &function_name)
S_ASyncXMLRPCConnection added_connections_
XmlRpc::XmlRpcServer server_


roscpp
Author(s): Morgan Quigley, Josh Faust, Brian Gerkey, Troy Straszheim, Dirk Thomas
autogenerated on Mon Nov 2 2020 03:52:26