connection.cpp
Go to the documentation of this file.
1 /*
2  * Software License Agreement (BSD License)
3  *
4  * Copyright (c) 2008, Willow Garage, Inc.
5  * All rights reserved.
6  *
7  * Redistribution and use in source and binary forms, with or without
8  * modification, are permitted provided that the following conditions
9  * are met:
10  *
11  * * Redistributions of source code must retain the above copyright
12  * notice, this list of conditions and the following disclaimer.
13  * * Redistributions in binary form must reproduce the above
14  * copyright notice, this list of conditions and the following
15  * disclaimer in the documentation and/or other materials provided
16  * with the distribution.
17  * * Neither the name of Willow Garage, Inc. nor the names of its
18  * contributors may be used to endorse or promote products derived
19  * from this software without specific prior written permission.
20  *
21  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
24  * FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
25  * COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
26  * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
27  * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
28  * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
29  * CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
30  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
31  * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
32  * POSSIBILITY OF SUCH DAMAGE.
33  */
34 
35 #include "ros/connection.h"
37 #include "ros/file_log.h"
38 
39 #include <ros/assert.h>
40 
41 #include <boost/shared_array.hpp>
42 #include <boost/bind.hpp>
43 
44 namespace ros
45 {
46 
48 : is_server_(false)
49 , dropped_(false)
50 , read_filled_(0)
51 , read_size_(0)
52 , reading_(false)
53 , has_read_callback_(0)
54 , write_sent_(0)
55 , write_size_(0)
56 , writing_(false)
57 , has_write_callback_(0)
58 , sending_header_error_(false)
59 {
60 }
61 
63 {
64  ROS_DEBUG_NAMED("superdebug", "Connection destructing, dropped=%s", dropped_ ? "true" : "false");
65 
67 }
68 
69 void Connection::initialize(const TransportPtr& transport, bool is_server, const HeaderReceivedFunc& header_func)
70 {
71  ROS_ASSERT(transport);
72 
73  transport_ = transport;
74  header_func_ = header_func;
75  is_server_ = is_server;
76 
77  transport_->setReadCallback(boost::bind(&Connection::onReadable, this, boost::placeholders::_1));
78  transport_->setWriteCallback(boost::bind(&Connection::onWriteable, this, boost::placeholders::_1));
79  transport_->setDisconnectCallback(boost::bind(&Connection::onDisconnect, this, boost::placeholders::_1));
80 
81  if (header_func)
82  {
83  read(4, boost::bind(&Connection::onHeaderLengthRead, this, boost::placeholders::_1, boost::placeholders::_2, boost::placeholders::_3, boost::placeholders::_4));
84  }
85 }
86 
87 boost::signals2::connection Connection::addDropListener(const DropFunc& slot)
88 {
89  boost::recursive_mutex::scoped_lock lock(drop_mutex_);
90  return drop_signal_.connect(slot);
91 }
92 
93 void Connection::removeDropListener(const boost::signals2::connection& c)
94 {
95  boost::recursive_mutex::scoped_lock lock(drop_mutex_);
96  c.disconnect();
97 }
98 
99 void Connection::onReadable(const TransportPtr& transport)
100 {
101  (void)transport;
102  ROS_ASSERT(transport == transport_);
103 
104  readTransport();
105 }
106 
108 {
109  boost::recursive_mutex::scoped_try_lock lock(read_mutex_);
110 
111  if (!lock.owns_lock() || dropped_ || reading_)
112  {
113  return;
114  }
115 
116  reading_ = true;
117 
118  while (!dropped_ && has_read_callback_)
119  {
121  uint32_t to_read = read_size_ - read_filled_;
122  if (to_read > 0)
123  {
124  int32_t bytes_read = transport_->read(read_buffer_.get() + read_filled_, to_read);
125  ROS_DEBUG_NAMED("superdebug", "Connection read %d bytes", bytes_read);
126  if (dropped_)
127  {
128  return;
129  }
130  else if (bytes_read < 0)
131  {
132  // Bad read, throw away results and report error
133  ReadFinishedFunc callback;
134  callback = read_callback_;
135  read_callback_.clear();
136  read_buffer_.reset();
137  uint32_t size = read_size_;
138  read_size_ = 0;
139  read_filled_ = 0;
140  has_read_callback_ = 0;
141 
142  if (callback)
143  {
144  callback(shared_from_this(), read_buffer_, size, false);
145  }
146 
147  break;
148  }
149 
150  read_filled_ += bytes_read;
151  }
152 
153  ROS_ASSERT((int32_t)read_size_ >= 0);
154  ROS_ASSERT((int32_t)read_filled_ >= 0);
155  ROS_ASSERT_MSG(read_filled_ <= read_size_, "read_filled_ = %d, read_size_ = %d", read_filled_, read_size_);
156 
157  if (read_filled_ == read_size_ && !dropped_)
158  {
159  ReadFinishedFunc callback;
160  uint32_t size;
162 
164 
165  // store off the read info in case another read() call is made from within the callback
166  callback = read_callback_;
167  size = read_size_;
168  buffer = read_buffer_;
169  read_callback_.clear();
170  read_buffer_.reset();
171  read_size_ = 0;
172  read_filled_ = 0;
173  has_read_callback_ = 0;
174 
175  ROS_DEBUG_NAMED("superdebug", "Calling read callback");
176  callback(shared_from_this(), buffer, size, true);
177  }
178  else
179  {
180  break;
181  }
182  }
183 
184  if (!has_read_callback_)
185  {
186  transport_->disableRead();
187  }
188 
189  reading_ = false;
190 }
191 
193 {
194  boost::recursive_mutex::scoped_try_lock lock(write_mutex_);
195 
196  if (!lock.owns_lock() || dropped_ || writing_)
197  {
198  return;
199  }
200 
201  writing_ = true;
202  bool can_write_more = true;
203 
204  while (has_write_callback_ && can_write_more && !dropped_)
205  {
206  uint32_t to_write = write_size_ - write_sent_;
207  ROS_DEBUG_NAMED("superdebug", "Connection writing %d bytes", to_write);
208  int32_t bytes_sent = transport_->write(write_buffer_.get() + write_sent_, to_write);
209  ROS_DEBUG_NAMED("superdebug", "Connection wrote %d bytes", bytes_sent);
210 
211  if (bytes_sent < 0)
212  {
213  writing_ = false;
214  return;
215  }
216 
217  write_sent_ += bytes_sent;
218 
219  if (bytes_sent < (int)write_size_ - (int)write_sent_)
220  {
221  can_write_more = false;
222  }
223 
224  if (write_sent_ == write_size_ && !dropped_)
225  {
226  WriteFinishedFunc callback;
227 
228  {
229  boost::mutex::scoped_lock lock(write_callback_mutex_);
231  // Store off a copy of the callback in case another write() call happens in it
232  callback = write_callback_;
235  write_sent_ = 0;
236  write_size_ = 0;
238  }
239 
240  ROS_DEBUG_NAMED("superdebug", "Calling write callback");
241  callback(shared_from_this());
242  }
243  }
244 
245  {
246  boost::mutex::scoped_lock lock(write_callback_mutex_);
247  if (!has_write_callback_)
248  {
249  transport_->disableWrite();
250  }
251  }
252 
253  writing_ = false;
254 }
255 
256 void Connection::onWriteable(const TransportPtr& transport)
257 {
258  (void)transport;
259  ROS_ASSERT(transport == transport_);
260 
261  writeTransport();
262 }
263 
264 void Connection::read(uint32_t size, const ReadFinishedFunc& callback)
265 {
267  {
268  return;
269  }
270 
271  {
272  boost::recursive_mutex::scoped_lock lock(read_mutex_);
273 
275 
276  read_callback_ = callback;
277  read_buffer_ = boost::shared_array<uint8_t>(new uint8_t[size]);
278  read_size_ = size;
279  read_filled_ = 0;
280  has_read_callback_ = 1;
281  }
282 
283  transport_->enableRead();
284 
285  // read immediately if possible
286  readTransport();
287 }
288 
289 void Connection::write(const boost::shared_array<uint8_t>& buffer, uint32_t size, const WriteFinishedFunc& callback, bool immediate)
290 {
292  {
293  return;
294  }
295 
296  {
297  boost::mutex::scoped_lock lock(write_callback_mutex_);
298 
300 
301  write_callback_ = callback;
302  write_buffer_ = buffer;
303  write_size_ = size;
304  write_sent_ = 0;
306  }
307 
308  transport_->enableWrite();
309 
310  if (immediate)
311  {
312  // write immediately if possible
313  writeTransport();
314  }
315 }
316 
318 {
319  (void)transport;
320  ROS_ASSERT(transport == transport_);
321 
323 }
324 
326 {
327  ROSCPP_LOG_DEBUG("Connection::drop(%u)", reason);
328  bool did_drop = false;
329  {
330  boost::recursive_mutex::scoped_lock lock(drop_mutex_);
331  if (!dropped_)
332  {
333  dropped_ = true;
334  did_drop = true;
335  }
336  }
337 
338  if (did_drop)
339  {
340  transport_->close();
341  {
342  boost::recursive_mutex::scoped_lock lock(drop_mutex_);
343  drop_signal_(shared_from_this(), reason);
344  }
345  }
346 }
347 
349 {
350  boost::recursive_mutex::scoped_lock lock(drop_mutex_);
351  return dropped_;
352 }
353 
354 void Connection::writeHeader(const M_string& key_vals, const WriteFinishedFunc& finished_callback)
355 {
357  header_written_callback_ = finished_callback;
358 
359  if (!transport_->requiresHeader())
360  {
361  onHeaderWritten(shared_from_this());
362  return;
363  }
364 
366  uint32_t len;
367  Header::write(key_vals, buffer, len);
368 
369  uint32_t msg_len = len + 4;
370  boost::shared_array<uint8_t> full_msg(new uint8_t[msg_len]);
371  memcpy(full_msg.get() + 4, buffer.get(), len);
372  *((uint32_t*)full_msg.get()) = len;
373 
374  write(full_msg, msg_len, boost::bind(&Connection::onHeaderWritten, this, boost::placeholders::_1), false);
375 }
376 
377 void Connection::sendHeaderError(const std::string& error_msg)
378 {
379  M_string m;
380  m["error"] = error_msg;
381 
382  writeHeader(m, boost::bind(&Connection::onErrorHeaderWritten, this, boost::placeholders::_1));
383  sending_header_error_ = true;
384 }
385 
386 void Connection::onHeaderLengthRead(const ConnectionPtr& conn, const boost::shared_array<uint8_t>& buffer, uint32_t size, bool success)
387 {
388  (void)size;
389  ROS_ASSERT(conn.get() == this);
390  ROS_ASSERT(size == 4);
391 
392  if (!success)
393  return;
394 
395  uint32_t len = *((uint32_t*)buffer.get());
396 
397  if (len > 1000000000)
398  {
399  ROS_ERROR("a header of over a gigabyte was " \
400  "predicted in tcpros. that seems highly " \
401  "unlikely, so I'll assume protocol " \
402  "synchronization is lost.");
403  conn->drop(HeaderError);
404  }
405 
406  read(len, boost::bind(&Connection::onHeaderRead, this, boost::placeholders::_1, boost::placeholders::_2, boost::placeholders::_3, boost::placeholders::_4));
407 }
408 
409 void Connection::onHeaderRead(const ConnectionPtr& conn, const boost::shared_array<uint8_t>& buffer, uint32_t size, bool success)
410 {
411  ROS_ASSERT(conn.get() == this);
412 
413  if (!success)
414  return;
415 
416  std::string error_msg;
417  if (!header_.parse(buffer, size, error_msg))
418  {
419  drop(HeaderError);
420  }
421  else
422  {
423  std::string error_val;
424  if (header_.getValue("error", error_val))
425  {
426  ROSCPP_LOG_DEBUG("Received error message in header for connection to [%s]: [%s]", transport_->getTransportInfo().c_str(), error_val.c_str());
427  drop(HeaderError);
428  }
429  else
430  {
432 
433  transport_->parseHeader(header_);
434 
435  header_func_(conn, header_);
436  }
437  }
438 
439 }
440 
442 {
443  ROS_ASSERT(conn.get() == this);
445 
448 }
449 
451 {
452  (void)conn;
453  drop(HeaderError);
454 }
455 
457 {
458  header_func_ = func;
459 
460  if (transport_->requiresHeader())
461  read(4, boost::bind(&Connection::onHeaderLengthRead, this, boost::placeholders::_1, boost::placeholders::_2, boost::placeholders::_3, boost::placeholders::_4));
462 }
463 
465 {
466  std::string callerid;
467  if (header_.getValue("callerid", callerid))
468  {
469  return callerid;
470  }
471 
472  return std::string("unknown");
473 }
474 
476 {
477  std::stringstream ss;
478  ss << "callerid=[" << getCallerId() << "] address=[" << transport_->getTransportInfo() << "]";
479  return ss.str();
480 }
481 
482 }
ros::Connection::writeHeader
void writeHeader(const M_string &key_vals, const WriteFinishedFunc &finished_callback)
Send a list of string key/value pairs as a header message.
Definition: connection.cpp:354
ros::Connection::onErrorHeaderWritten
void onErrorHeaderWritten(const ConnectionPtr &conn)
Definition: connection.cpp:450
ros::Connection::read_size_
uint32_t read_size_
Size of the read buffer, in bytes.
Definition: connection.h:225
ros::Connection::addDropListener
boost::signals2::connection addDropListener(const DropFunc &slot)
Add a callback to be called when this connection has dropped.
Definition: connection.cpp:87
ros::WriteFinishedFunc
boost::function< void(const ConnectionPtr &)> WriteFinishedFunc
Definition: connection.h:60
ros::Connection::read_buffer_
boost::shared_array< uint8_t > read_buffer_
Read buffer that ends up being passed to the read callback.
Definition: connection.h:221
ros::Connection::has_write_callback_
volatile uint32_t has_write_callback_
Definition: connection.h:253
ros::ReadFinishedFunc
boost::function< void(const ConnectionPtr &, const boost::shared_array< uint8_t > &, uint32_t, bool)> ReadFinishedFunc
Definition: connection.h:59
boost::shared_ptr< Transport >
ros::Connection::reading_
bool reading_
Flag telling us if we're in the middle of a read (mostly to avoid recursive deadlocking)
Definition: connection.h:231
ros::Connection::HeaderError
@ HeaderError
Definition: connection.h:76
ros::Connection::drop_signal_
DropSignal drop_signal_
Signal raised when this connection is dropped.
Definition: connection.h:259
ros::Connection::~Connection
~Connection()
Definition: connection.cpp:62
ros::Connection::Connection
Connection()
Definition: connection.cpp:47
ros
ros::Connection::write_callback_mutex_
boost::mutex write_callback_mutex_
Definition: connection.h:245
ros::Connection::writeTransport
void writeTransport()
Write data to our transport. Also manages calling the write callback.
Definition: connection.cpp:192
ros::Connection::readTransport
void readTransport()
Read data off our transport. Also manages calling the read callback. If there is any data to be read,...
Definition: connection.cpp:107
ros::Connection::read
void read(uint32_t size, const ReadFinishedFunc &finished_callback)
Read a number of bytes, calling a callback when finished.
Definition: connection.cpp:264
ros::Connection::TransportDisconnect
@ TransportDisconnect
Definition: connection.h:75
ros::Connection::write_sent_
uint32_t write_sent_
Amount of data we've written from the write buffer.
Definition: connection.h:240
ros::Connection::is_server_
bool is_server_
Are we a server? Servers wait for clients to send a header and then send a header in response.
Definition: connection.h:210
ros::Connection::setHeaderReceivedCallback
void setHeaderReceivedCallback(const HeaderReceivedFunc &func)
Set the header receipt callback.
Definition: connection.cpp:456
ros::Connection::read_callback_
ReadFinishedFunc read_callback_
Function to call when the read is finished.
Definition: connection.h:227
ros::Header::write
static void write(const M_string &key_vals, boost::shared_array< uint8_t > &buffer, uint32_t &size)
ros::Connection::read_filled_
uint32_t read_filled_
Amount of data currently in the read buffer, in bytes.
Definition: connection.h:223
ros::Connection::DropFunc
boost::function< void(const ConnectionPtr &, DropReason reason)> DropFunc
Definition: connection.h:148
ros::Connection::initialize
void initialize(const TransportPtr &transport, bool is_server, const HeaderReceivedFunc &header_func)
Initialize this connection.
Definition: connection.cpp:69
ros::Connection::write_size_
uint32_t write_size_
Size of the write buffer.
Definition: connection.h:242
ros::Connection::sending_header_error_
bool sending_header_error_
If we're sending a header error we disable most other calls.
Definition: connection.h:265
ros::Connection::sendHeaderError
void sendHeaderError(const std::string &error_message)
Send a header error message, of the form "error=<message>". Drops the connection once the data has wr...
Definition: connection.cpp:377
ros::Connection::onHeaderWritten
void onHeaderWritten(const ConnectionPtr &conn)
Definition: connection.cpp:441
ros::Connection::getCallerId
std::string getCallerId()
Definition: connection.cpp:464
ros::Connection::writing_
bool writing_
Flag telling us if we're in the middle of a write (mostly used to avoid recursive deadlocking)
Definition: connection.h:249
ROS_DEBUG_NAMED
#define ROS_DEBUG_NAMED(name,...)
ros::Connection::dropped_
bool dropped_
Have we dropped?
Definition: connection.h:212
ros::Connection::onHeaderLengthRead
void onHeaderLengthRead(const ConnectionPtr &conn, const boost::shared_array< uint8_t > &buffer, uint32_t size, bool success)
Definition: connection.cpp:386
ros::Connection::drop_mutex_
boost::recursive_mutex drop_mutex_
Synchronizes drop() calls.
Definition: connection.h:262
ros::Header::parse
bool parse(const boost::shared_array< uint8_t > &buffer, uint32_t size, std::string &error_msg)
boost::shared_array< uint8_t >
ROS_ASSERT_MSG
#define ROS_ASSERT_MSG(cond,...)
ros::Connection::onDisconnect
void onDisconnect(const TransportPtr &transport)
Called by the Transport when it has been disconnected, either through a call to close() or through an...
Definition: connection.cpp:317
ros::Connection::header_func_
HeaderReceivedFunc header_func_
Function that handles the incoming header.
Definition: connection.h:218
ros::Connection::read_mutex_
boost::recursive_mutex read_mutex_
Mutex used for protecting reading. Recursive because a read can immediately cause another read throug...
Definition: connection.h:229
ros::Connection::DropReason
DropReason
Definition: connection.h:73
ros::Connection::Destructing
@ Destructing
Definition: connection.h:77
ros::Connection::write_buffer_
boost::shared_array< uint8_t > write_buffer_
Buffer to write from.
Definition: connection.h:238
connection.h
ros::Connection::has_read_callback_
volatile uint32_t has_read_callback_
Definition: connection.h:235
ros::Connection::write_callback_
WriteFinishedFunc write_callback_
Function to call when the current write is finished.
Definition: connection.h:244
ros::Connection::transport_
TransportPtr transport_
Transport associated with us.
Definition: connection.h:216
ros::Connection::header_written_callback_
WriteFinishedFunc header_written_callback_
Function to call when the outgoing header has finished writing.
Definition: connection.h:256
ros::Header::getValue
bool getValue(const std::string &key, std::string &value) const
ros::Connection::getRemoteString
std::string getRemoteString()
Definition: connection.cpp:475
ros::HeaderReceivedFunc
boost::function< bool(const ConnectionPtr &, const Header &)> HeaderReceivedFunc
Definition: connection.h:62
ROS_ERROR
#define ROS_ERROR(...)
ros::Connection::onHeaderRead
void onHeaderRead(const ConnectionPtr &conn, const boost::shared_array< uint8_t > &buffer, uint32_t size, bool success)
Definition: connection.cpp:409
ros::Connection::onReadable
void onReadable(const TransportPtr &transport)
Called by the Transport when there is data available to be read.
Definition: connection.cpp:99
transport.h
ros::Connection::onWriteable
void onWriteable(const TransportPtr &transport)
Called by the Transport when it is possible to write data.
Definition: connection.cpp:256
ros::Connection::drop
void drop(DropReason reason)
Drop this connection. Anything added as a drop listener through addDropListener will get called back ...
Definition: connection.cpp:325
ros::Connection::write
void write(const boost::shared_array< uint8_t > &buffer, uint32_t size, const WriteFinishedFunc &finished_callback, bool immedate=true)
Write a buffer of bytes, calling a callback when finished.
Definition: connection.cpp:289
ros::Connection::header_
Header header_
Incoming header.
Definition: connection.h:214
assert.h
ros::Connection::write_mutex_
boost::recursive_mutex write_mutex_
Mutex used for protecting writing. Recursive because a write can immediately cause another write thro...
Definition: connection.h:247
ROS_ASSERT
#define ROS_ASSERT(cond)
ros::Connection::removeDropListener
void removeDropListener(const boost::signals2::connection &c)
Definition: connection.cpp:93
ROSCPP_LOG_DEBUG
#define ROSCPP_LOG_DEBUG(...)
Definition: file_log.h:35
ros::Connection::isDropped
bool isDropped()
Returns whether or not this connection has been dropped.
Definition: connection.cpp:348
file_log.h
ros::M_string
std::map< std::string, std::string > M_string


roscpp
Author(s): Morgan Quigley, Josh Faust, Brian Gerkey, Troy Straszheim, Dirk Thomas , Jacob Perron
autogenerated on Thu Nov 23 2023 04:01:44