service_server_link.cpp
Go to the documentation of this file.
1 /*
2  * Software License Agreement (BSD License)
3  *
4  * Copyright (c) 2008, Willow Garage, Inc.
5  * All rights reserved.
6  *
7  * Redistribution and use in source and binary forms, with or without
8  * modification, are permitted provided that the following conditions
9  * are met:
10  *
11  * * Redistributions of source code must retain the above copyright
12  * notice, this list of conditions and the following disclaimer.
13  * * Redistributions in binary form must reproduce the above
14  * copyright notice, this list of conditions and the following
15  * disclaimer in the documentation and/or other materials provided
16  * with the distribution.
17  * * Neither the name of Willow Garage, Inc. nor the names of its
18  * contributors may be used to endorse or promote products derived
19  * from this software without specific prior written permission.
20  *
21  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
24  * FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
25  * COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
26  * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
27  * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
28  * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
29  * CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
30  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
31  * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
32  * POSSIBILITY OF SUCH DAMAGE.
33  */
34 
36 #include "ros/header.h"
37 #include "ros/connection.h"
38 #include "ros/service_manager.h"
40 #include "ros/this_node.h"
41 #include "ros/file_log.h"
42 
43 #include <boost/bind.hpp>
44 
45 #include <sstream>
46 
47 namespace ros
48 {
49 
50 ServiceServerLink::ServiceServerLink(const std::string& service_name, bool persistent, const std::string& request_md5sum,
51  const std::string& response_md5sum, const M_string& header_values)
52 : service_name_(service_name)
53 , persistent_(persistent)
54 , request_md5sum_(request_md5sum)
55 , response_md5sum_(response_md5sum)
56 , extra_outgoing_header_values_(header_values)
57 , header_written_(false)
58 , header_read_(false)
59 , dropped_(false)
60 {
61 }
62 
64 {
65  ROS_ASSERT(connection_->isDropped());
66 
67  clearCalls();
68 }
69 
71 {
72  CallInfoPtr local = info;
73  {
74  boost::mutex::scoped_lock lock(local->finished_mutex_);
75  local->finished_ = true;
76  local->finished_condition_.notify_all();
77  }
78 
79  if (boost::this_thread::get_id() != info->caller_thread_id_)
80  {
81  while (!local->call_finished_)
82  {
83  boost::this_thread::yield();
84  }
85  }
86 }
87 
89 {
90  CallInfoPtr local_current;
91 
92  {
93  boost::mutex::scoped_lock lock(call_queue_mutex_);
94  local_current = current_call_;
95  }
96 
97  if (local_current)
98  {
99  cancelCall(local_current);
100  }
101 
102  boost::mutex::scoped_lock lock(call_queue_mutex_);
103 
104  while (!call_queue_.empty())
105  {
106  CallInfoPtr info = call_queue_.front();
107 
108  cancelCall(info);
109 
110  call_queue_.pop();
111  }
112 }
113 
115 {
116  connection_ = connection;
117  connection_->addDropListener(boost::bind(&ServiceServerLink::onConnectionDropped, this, _1));
118  connection_->setHeaderReceivedCallback(boost::bind(&ServiceServerLink::onHeaderReceived, this, _1, _2));
119 
121  header["service"] = service_name_;
122  header["md5sum"] = request_md5sum_;
123  header["callerid"] = this_node::getName();
124  header["persistent"] = persistent_ ? "1" : "0";
125  header.insert(extra_outgoing_header_values_.begin(), extra_outgoing_header_values_.end());
126 
127  connection_->writeHeader(header, boost::bind(&ServiceServerLink::onHeaderWritten, this, _1));
128 
129  return true;
130 }
131 
133 {
134  (void)conn;
135  header_written_ = true;
136 }
137 
139 {
140  (void)conn;
141  std::string md5sum, type;
142  if (!header.getValue("md5sum", md5sum))
143  {
144  ROS_ERROR("TCPROS header from service server did not have required element: md5sum");
145  return false;
146  }
147 
148  bool empty = false;
149  {
150  boost::mutex::scoped_lock lock(call_queue_mutex_);
151  empty = call_queue_.empty();
152 
153  if (empty)
154  {
155  header_read_ = true;
156  }
157  }
158 
159  if (!empty)
160  {
161  processNextCall();
162 
163  header_read_ = true;
164  }
165 
166  return true;
167 }
168 
170 {
171  ROS_ASSERT(conn == connection_);
172  ROSCPP_LOG_DEBUG("Service client from [%s] for [%s] dropped", conn->getRemoteString().c_str(), service_name_.c_str());
173 
174  dropped_ = true;
175  clearCalls();
176 
177  ServiceManager::instance()->removeServiceServerLink(shared_from_this());
178 }
179 
181 {
182  (void)conn;
183  //ros::WallDuration(0.1).sleep();
184  connection_->read(5, boost::bind(&ServiceServerLink::onResponseOkAndLength, this, _1, _2, _3, _4));
185 }
186 
187 void ServiceServerLink::onResponseOkAndLength(const ConnectionPtr& conn, const boost::shared_array<uint8_t>& buffer, uint32_t size, bool success)
188 {
189  (void)size;
190  ROS_ASSERT(conn == connection_);
191  ROS_ASSERT(size == 5);
192 
193  if (!success)
194  return;
195 
196  uint8_t ok = buffer[0];
197  uint32_t len = *((uint32_t*)(buffer.get() + 1));
198 
199  if (len > 1000000000)
200  {
201  ROS_ERROR("a message of over a gigabyte was " \
202  "predicted in tcpros. that seems highly " \
203  "unlikely, so I'll assume protocol " \
204  "synchronization is lost.");
205  conn->drop(Connection::Destructing);
206 
207  return;
208  }
209 
210  {
211  boost::mutex::scoped_lock lock(call_queue_mutex_);
212  if ( ok != 0 ) {
213  current_call_->success_ = true;
214  } else {
215  current_call_->success_ = false;
216  }
217  }
218 
219  if (len > 0)
220  {
221  connection_->read(len, boost::bind(&ServiceServerLink::onResponse, this, _1, _2, _3, _4));
222  }
223  else
224  {
225  onResponse(conn, boost::shared_array<uint8_t>(), 0, true);
226  }
227 }
228 
229 void ServiceServerLink::onResponse(const ConnectionPtr& conn, const boost::shared_array<uint8_t>& buffer, uint32_t size, bool success)
230 {
231  (void)conn;
232  ROS_ASSERT(conn == connection_);
233 
234  if (!success)
235  return;
236 
237  {
238  boost::mutex::scoped_lock queue_lock(call_queue_mutex_);
239 
240  if (current_call_->success_)
241  {
242  *current_call_->resp_ = SerializedMessage(buffer, size);
243  }
244  else
245  {
246  current_call_->exception_string_ = std::string(reinterpret_cast<char*>(buffer.get()), size);
247  }
248  }
249 
250  callFinished();
251 }
252 
254 {
255  CallInfoPtr saved_call;
257  {
258  boost::mutex::scoped_lock queue_lock(call_queue_mutex_);
259  boost::mutex::scoped_lock finished_lock(current_call_->finished_mutex_);
260 
261  ROS_DEBUG_NAMED("superdebug", "Client to service [%s] call finished with success=[%s]", service_name_.c_str(), current_call_->success_ ? "true" : "false");
262 
263  current_call_->finished_ = true;
264  current_call_->finished_condition_.notify_all();
265 
266  saved_call = current_call_;
268 
269  // If the call queue is empty here, we may be deleted as soon as we release these locks, so keep a shared pointer to ourselves until we return
270  // ugly
271  // jfaust TODO there's got to be a better way
272  self = shared_from_this();
273  }
274 
275  saved_call = CallInfoPtr();
276 
277  processNextCall();
278 }
279 
281 {
282  bool empty = false;
283  {
284  boost::mutex::scoped_lock lock(call_queue_mutex_);
285 
286  if (current_call_)
287  {
288  return;
289  }
290 
291  if (!call_queue_.empty())
292  {
293  ROS_DEBUG_NAMED("superdebug", "[%s] Client to service [%s] processing next service call", persistent_ ? "persistent" : "non-persistent", service_name_.c_str());
294 
295  current_call_ = call_queue_.front();
296  call_queue_.pop();
297  }
298  else
299  {
300  empty = true;
301  }
302  }
303 
304  if (empty)
305  {
306  if (!persistent_)
307  {
308  ROS_DEBUG_NAMED("superdebug", "Dropping non-persistent client to service [%s]", service_name_.c_str());
310  }
311  else
312  {
313  ROS_DEBUG_NAMED("superdebug", "Keeping persistent client to service [%s]", service_name_.c_str());
314  }
315  }
316  else
317  {
318  SerializedMessage request;
319 
320  {
321  boost::mutex::scoped_lock lock(call_queue_mutex_);
322  request = current_call_->req_;
323  }
324 
325  connection_->write(request.buf, request.num_bytes, boost::bind(&ServiceServerLink::onRequestWritten, this, _1));
326  }
327 }
328 
330 {
331  CallInfoPtr info(boost::make_shared<CallInfo>());
332  info->req_ = req;
333  info->resp_ = &resp;
334  info->success_ = false;
335  info->finished_ = false;
336  info->call_finished_ = false;
337  info->caller_thread_id_ = boost::this_thread::get_id();
338 
339  //ros::WallDuration(0.1).sleep();
340 
341  bool immediate = false;
342  {
343  boost::mutex::scoped_lock lock(call_queue_mutex_);
344 
345  if (connection_->isDropped())
346  {
347  ROSCPP_LOG_DEBUG("ServiceServerLink::call called on dropped connection for service [%s]", service_name_.c_str());
348  info->call_finished_ = true;
349  return false;
350  }
351 
352  if (call_queue_.empty() && header_written_ && header_read_)
353  {
354  immediate = true;
355  }
356 
357  call_queue_.push(info);
358  }
359 
360  if (immediate)
361  {
362  processNextCall();
363  }
364 
365  {
366  boost::mutex::scoped_lock lock(info->finished_mutex_);
367 
368  while (!info->finished_)
369  {
370  info->finished_condition_.wait(lock);
371  }
372  }
373 
374  info->call_finished_ = true;
375 
376  if (info->exception_string_.length() > 0)
377  {
378  ROS_ERROR("Service call failed: service [%s] responded with an error: %s", service_name_.c_str(), info->exception_string_.c_str());
379  }
380 
381  return info->success_;
382 }
383 
385 {
386  return !dropped_;
387 }
388 
389 } // namespace ros
390 
ROSCPP_DECL const std::string & getName()
Returns the name of the current node.
Definition: this_node.cpp:81
std_msgs::Header * header(M &m)
static const ServiceManagerPtr & instance()
#define ROS_DEBUG_NAMED(name,...)
#define ROSCPP_LOG_DEBUG(...)
Definition: file_log.h:35
ROSCPP_DECL bool ok()
Check whether it&#39;s time to exit.
Definition: init.cpp:573
bool getValue(const std::string &key, std::string &value) const
boost::shared_array< uint8_t > buf
const char * md5sum()
#define ROS_ASSERT(cond)
#define ROS_ERROR(...)


roscpp
Author(s): Morgan Quigley, Josh Faust, Brian Gerkey, Troy Straszheim
autogenerated on Sun Aug 26 2018 03:03:33