service_server_link.cpp
Go to the documentation of this file.
1 /*
2  * Software License Agreement (BSD License)
3  *
4  * Copyright (c) 2008, Willow Garage, Inc.
5  * All rights reserved.
6  *
7  * Redistribution and use in source and binary forms, with or without
8  * modification, are permitted provided that the following conditions
9  * are met:
10  *
11  * * Redistributions of source code must retain the above copyright
12  * notice, this list of conditions and the following disclaimer.
13  * * Redistributions in binary form must reproduce the above
14  * copyright notice, this list of conditions and the following
15  * disclaimer in the documentation and/or other materials provided
16  * with the distribution.
17  * * Neither the name of Willow Garage, Inc. nor the names of its
18  * contributors may be used to endorse or promote products derived
19  * from this software without specific prior written permission.
20  *
21  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
24  * FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
25  * COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
26  * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
27  * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
28  * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
29  * CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
30  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
31  * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
32  * POSSIBILITY OF SUCH DAMAGE.
33  */
34 
36 #include "ros/header.h"
37 #include "ros/connection.h"
38 #include "ros/service_manager.h"
40 #include "ros/this_node.h"
41 #include "ros/file_log.h"
42 
43 #include <boost/bind/bind.hpp>
44 
45 #include <sstream>
46 
47 namespace ros
48 {
49 
50 ServiceServerLink::ServiceServerLink(const std::string& service_name, bool persistent, const std::string& request_md5sum,
51  const std::string& response_md5sum, const M_string& header_values)
52 : service_name_(service_name)
53 , persistent_(persistent)
54 , request_md5sum_(request_md5sum)
55 , response_md5sum_(response_md5sum)
56 , extra_outgoing_header_values_(header_values)
57 , header_written_(false)
58 , header_read_(false)
59 , dropped_(false)
60 {
61 }
62 
64 {
65  ROS_ASSERT(connection_->isDropped());
66 
67  clearCalls();
68 }
69 
71 {
72  CallInfoPtr local = info;
73  {
74  boost::mutex::scoped_lock lock(local->finished_mutex_);
75  local->finished_ = true;
76  local->finished_condition_.notify_all();
77  }
78 
79  if (boost::this_thread::get_id() != info->caller_thread_id_)
80  {
81  while (!local->call_finished_)
82  {
83  boost::this_thread::yield();
84  }
85  }
86 }
87 
89 {
90  CallInfoPtr local_current;
91 
92  {
93  boost::mutex::scoped_lock lock(call_queue_mutex_);
94  local_current = current_call_;
95  }
96 
97  if (local_current)
98  {
99  cancelCall(local_current);
100  }
101 
102  boost::mutex::scoped_lock lock(call_queue_mutex_);
103 
104  while (!call_queue_.empty())
105  {
106  CallInfoPtr info = call_queue_.front();
107 
108  cancelCall(info);
109 
110  call_queue_.pop();
111  }
112 }
113 
115 {
116  connection_ = connection;
117  connection_->addDropListener(boost::bind(&ServiceServerLink::onConnectionDropped, this, boost::placeholders::_1));
118  connection_->setHeaderReceivedCallback(boost::bind(&ServiceServerLink::onHeaderReceived, this, boost::placeholders::_1, boost::placeholders::_2));
119 
121  header["service"] = service_name_;
122  header["md5sum"] = request_md5sum_;
123  header["callerid"] = this_node::getName();
124  header["persistent"] = persistent_ ? "1" : "0";
126 
127  connection_->writeHeader(header, boost::bind(&ServiceServerLink::onHeaderWritten, this, boost::placeholders::_1));
128 
129  return true;
130 }
131 
133 {
134  (void)conn;
135  header_written_ = true;
136 }
137 
139 {
140  (void)conn;
141  std::string md5sum, type;
142  if (!header.getValue("md5sum", md5sum))
143  {
144  ROS_ERROR("TCPROS header from service server did not have required element: md5sum");
145  return false;
146  }
147 
148  bool empty = false;
149  {
150  boost::mutex::scoped_lock lock(call_queue_mutex_);
151  empty = call_queue_.empty();
152 
153  if (empty)
154  {
155  header_read_ = true;
156  }
157  }
158 
159  if (!empty)
160  {
161  processNextCall();
162 
163  header_read_ = true;
164  }
165 
166  return true;
167 }
168 
170 {
171  ROS_ASSERT(conn == connection_);
172  ROSCPP_LOG_DEBUG("Service client from [%s] for [%s] dropped", conn->getRemoteString().c_str(), service_name_.c_str());
173 
174  dropped_ = true;
175  clearCalls();
176 
177  ServiceManager::instance()->removeServiceServerLink(shared_from_this());
178 }
179 
181 {
182  (void)conn;
183  //ros::WallDuration(0.1).sleep();
184  connection_->read(5, boost::bind(&ServiceServerLink::onResponseOkAndLength, this, boost::placeholders::_1, boost::placeholders::_2, boost::placeholders::_3, boost::placeholders::_4));
185 }
186 
187 void ServiceServerLink::onResponseOkAndLength(const ConnectionPtr& conn, const boost::shared_array<uint8_t>& buffer, uint32_t size, bool success)
188 {
189  (void)size;
190  ROS_ASSERT(conn == connection_);
191  ROS_ASSERT(size == 5);
192 
193  if (!success)
194  return;
195 
196  uint8_t ok = buffer[0];
197  uint32_t len = *((uint32_t*)(buffer.get() + 1));
198 
199  if (len > 1000000000)
200  {
201  ROS_ERROR("a message of over a gigabyte was " \
202  "predicted in tcpros. that seems highly " \
203  "unlikely, so I'll assume protocol " \
204  "synchronization is lost.");
205  conn->drop(Connection::Destructing);
206 
207  return;
208  }
209 
210  {
211  boost::mutex::scoped_lock lock(call_queue_mutex_);
212  if ( ok != 0 ) {
213  current_call_->success_ = true;
214  } else {
215  current_call_->success_ = false;
216  }
217  }
218 
219  if (len > 0)
220  {
221  connection_->read(len, boost::bind(&ServiceServerLink::onResponse, this, boost::placeholders::_1, boost::placeholders::_2, boost::placeholders::_3, boost::placeholders::_4));
222  }
223  else
224  {
225  onResponse(conn, boost::shared_array<uint8_t>(), 0, true);
226  }
227 }
228 
229 void ServiceServerLink::onResponse(const ConnectionPtr& conn, const boost::shared_array<uint8_t>& buffer, uint32_t size, bool success)
230 {
231  (void)conn;
232  ROS_ASSERT(conn == connection_);
233 
234  if (!success)
235  return;
236 
237  {
238  boost::mutex::scoped_lock queue_lock(call_queue_mutex_);
239 
240  if (current_call_->success_)
241  {
242  *current_call_->resp_ = SerializedMessage(buffer, size);
243  }
244  else
245  {
246  current_call_->exception_string_ = std::string(reinterpret_cast<char*>(buffer.get()), size);
247  }
248  }
249 
250  callFinished();
251 }
252 
254 {
255  CallInfoPtr saved_call;
257  {
258  boost::mutex::scoped_lock queue_lock(call_queue_mutex_);
259  boost::mutex::scoped_lock finished_lock(current_call_->finished_mutex_);
260 
261  ROS_DEBUG_NAMED("superdebug", "Client to service [%s] call finished with success=[%s]", service_name_.c_str(), current_call_->success_ ? "true" : "false");
262 
263  current_call_->finished_ = true;
264  current_call_->finished_condition_.notify_all();
265  current_call_->call_finished_ = true;
266 
267  saved_call = current_call_;
269 
270  // If the call queue is empty here, we may be deleted as soon as we release these locks, so keep a shared pointer to ourselves until we return
271  // ugly
272  // jfaust TODO there's got to be a better way
273  self = shared_from_this();
274  }
275 
276  saved_call = CallInfoPtr();
277 
278  processNextCall();
279 }
280 
282 {
283  bool empty = false;
284  {
285  boost::mutex::scoped_lock lock(call_queue_mutex_);
286 
287  if (current_call_)
288  {
289  return;
290  }
291 
292  if (!call_queue_.empty())
293  {
294  ROS_DEBUG_NAMED("superdebug", "[%s] Client to service [%s] processing next service call", persistent_ ? "persistent" : "non-persistent", service_name_.c_str());
295 
296  current_call_ = call_queue_.front();
297  call_queue_.pop();
298  }
299  else
300  {
301  empty = true;
302  }
303  }
304 
305  if (empty)
306  {
307  if (!persistent_)
308  {
309  ROS_DEBUG_NAMED("superdebug", "Dropping non-persistent client to service [%s]", service_name_.c_str());
311  }
312  else
313  {
314  ROS_DEBUG_NAMED("superdebug", "Keeping persistent client to service [%s]", service_name_.c_str());
315  }
316  }
317  else
318  {
319  SerializedMessage request;
320 
321  {
322  boost::mutex::scoped_lock lock(call_queue_mutex_);
323  request = current_call_->req_;
324  }
325 
326  connection_->write(request.buf, request.num_bytes, boost::bind(&ServiceServerLink::onRequestWritten, this, boost::placeholders::_1));
327  }
328 }
329 
331 {
332  CallInfoPtr info(boost::make_shared<CallInfo>());
333  info->req_ = req;
334  info->resp_ = &resp;
335  info->success_ = false;
336  info->finished_ = false;
337  info->call_finished_ = false;
338  info->caller_thread_id_ = boost::this_thread::get_id();
339 
340  //ros::WallDuration(0.1).sleep();
341 
342  bool immediate = false;
343  {
344  if (connection_->isDropped())
345  {
346  ROSCPP_LOG_DEBUG("ServiceServerLink::call called on dropped connection for service [%s]", service_name_.c_str());
347  info->call_finished_ = true;
348  return false;
349  }
350 
351  boost::mutex::scoped_lock lock(call_queue_mutex_);
352 
353  if (call_queue_.empty() && header_written_ && header_read_)
354  {
355  immediate = true;
356  }
357 
358  call_queue_.push(info);
359  }
360 
361  if (immediate)
362  {
363  processNextCall();
364  }
365 
366  {
367  boost::mutex::scoped_lock lock(info->finished_mutex_);
368 
369  while (!info->finished_)
370  {
371  info->finished_condition_.wait(lock);
372  }
373  }
374 
375  info->call_finished_ = true;
376 
377  if (info->exception_string_.length() > 0)
378  {
379  ROS_ERROR("Service call failed: service [%s] responded with an error: %s", service_name_.c_str(), info->exception_string_.c_str());
380  }
381 
382  return info->success_;
383 }
384 
386 {
387  return !dropped_;
388 }
389 
390 } // namespace ros
this_node.h
ros::SerializedMessage
md5sum
const char * md5sum()
boost::shared_ptr< CallInfo >
ros
ros::Header
service_manager.h
ros::ServiceManager::instance
static const ServiceManagerPtr & instance()
Definition: service_manager.cpp:55
ros::ok
ROSCPP_DECL bool ok()
Check whether it's time to exit.
Definition: init.cpp:603
ROS_DEBUG_NAMED
#define ROS_DEBUG_NAMED(name,...)
boost::shared_array< uint8_t >
ros::SerializedMessage::num_bytes
size_t num_bytes
ros::Connection::Destructing
@ Destructing
Definition: connection.h:77
ros::this_node::getName
const ROSCPP_DECL std::string & getName()
Returns the name of the current node.
Definition: this_node.cpp:74
connection.h
ROS_ERROR
#define ROS_ERROR(...)
header.h
transport.h
header
const std::string header
ROS_ASSERT
#define ROS_ASSERT(cond)
ROSCPP_LOG_DEBUG
#define ROSCPP_LOG_DEBUG(...)
Definition: file_log.h:35
ros::SerializedMessage::buf
boost::shared_array< uint8_t > buf
file_log.h


roscpp
Author(s): Morgan Quigley, Josh Faust, Brian Gerkey, Troy Straszheim, Dirk Thomas , Jacob Perron
autogenerated on Sat Sep 14 2024 02:59:35