connection.cpp
Go to the documentation of this file.
1 /*
2  * Software License Agreement (BSD License)
3  *
4  * Copyright (c) 2008, Willow Garage, Inc.
5  * All rights reserved.
6  *
7  * Redistribution and use in source and binary forms, with or without
8  * modification, are permitted provided that the following conditions
9  * are met:
10  *
11  * * Redistributions of source code must retain the above copyright
12  * notice, this list of conditions and the following disclaimer.
13  * * Redistributions in binary form must reproduce the above
14  * copyright notice, this list of conditions and the following
15  * disclaimer in the documentation and/or other materials provided
16  * with the distribution.
17  * * Neither the name of Willow Garage, Inc. nor the names of its
18  * contributors may be used to endorse or promote products derived
19  * from this software without specific prior written permission.
20  *
21  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
24  * FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
25  * COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
26  * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
27  * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
28  * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
29  * CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
30  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
31  * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
32  * POSSIBILITY OF SUCH DAMAGE.
33  */
34 
35 #include "ros/connection.h"
37 #include "ros/file_log.h"
38 
39 #include <ros/assert.h>
40 
41 #include <boost/shared_array.hpp>
42 #include <boost/bind.hpp>
43 
44 namespace ros
45 {
46 
48 : is_server_(false)
49 , dropped_(false)
50 , read_filled_(0)
51 , read_size_(0)
52 , reading_(false)
53 , has_read_callback_(0)
54 , write_sent_(0)
55 , write_size_(0)
56 , writing_(false)
57 , has_write_callback_(0)
58 , sending_header_error_(false)
59 {
60 }
61 
63 {
64  ROS_DEBUG_NAMED("superdebug", "Connection destructing, dropped=%s", dropped_ ? "true" : "false");
65 
67 }
68 
69 void Connection::initialize(const TransportPtr& transport, bool is_server, const HeaderReceivedFunc& header_func)
70 {
71  ROS_ASSERT(transport);
72 
73  transport_ = transport;
74  header_func_ = header_func;
75  is_server_ = is_server;
76 
77  transport_->setReadCallback(boost::bind(&Connection::onReadable, this, _1));
78  transport_->setWriteCallback(boost::bind(&Connection::onWriteable, this, _1));
79  transport_->setDisconnectCallback(boost::bind(&Connection::onDisconnect, this, _1));
80 
81  if (header_func)
82  {
83  read(4, boost::bind(&Connection::onHeaderLengthRead, this, _1, _2, _3, _4));
84  }
85 }
86 
87 boost::signals2::connection Connection::addDropListener(const DropFunc& slot)
88 {
89  boost::recursive_mutex::scoped_lock lock(drop_mutex_);
90  return drop_signal_.connect(slot);
91 }
92 
93 void Connection::removeDropListener(const boost::signals2::connection& c)
94 {
95  boost::recursive_mutex::scoped_lock lock(drop_mutex_);
96  c.disconnect();
97 }
98 
99 void Connection::onReadable(const TransportPtr& transport)
100 {
101  (void)transport;
102  ROS_ASSERT(transport == transport_);
103 
104  readTransport();
105 }
106 
108 {
109  boost::recursive_mutex::scoped_try_lock lock(read_mutex_);
110 
111  if (!lock.owns_lock() || dropped_ || reading_)
112  {
113  return;
114  }
115 
116  reading_ = true;
117 
118  while (!dropped_ && has_read_callback_)
119  {
121  uint32_t to_read = read_size_ - read_filled_;
122  if (to_read > 0)
123  {
124  int32_t bytes_read = transport_->read(read_buffer_.get() + read_filled_, to_read);
125  ROS_DEBUG_NAMED("superdebug", "Connection read %d bytes", bytes_read);
126  if (dropped_)
127  {
128  return;
129  }
130  else if (bytes_read < 0)
131  {
132  // Bad read, throw away results and report error
133  ReadFinishedFunc callback;
134  callback = read_callback_;
135  read_callback_.clear();
136  read_buffer_.reset();
137  uint32_t size = read_size_;
138  read_size_ = 0;
139  read_filled_ = 0;
140  has_read_callback_ = 0;
141 
142  if (callback)
143  {
144  callback(shared_from_this(), read_buffer_, size, false);
145  }
146 
147  break;
148  }
149 
150  read_filled_ += bytes_read;
151  }
152 
153  ROS_ASSERT((int32_t)read_size_ >= 0);
154  ROS_ASSERT((int32_t)read_filled_ >= 0);
155  ROS_ASSERT_MSG(read_filled_ <= read_size_, "read_filled_ = %d, read_size_ = %d", read_filled_, read_size_);
156 
157  if (read_filled_ == read_size_ && !dropped_)
158  {
159  ReadFinishedFunc callback;
160  uint32_t size;
162 
164 
165  // store off the read info in case another read() call is made from within the callback
166  callback = read_callback_;
167  size = read_size_;
168  buffer = read_buffer_;
169  read_callback_.clear();
170  read_buffer_.reset();
171  read_size_ = 0;
172  read_filled_ = 0;
173  has_read_callback_ = 0;
174 
175  ROS_DEBUG_NAMED("superdebug", "Calling read callback");
176  callback(shared_from_this(), buffer, size, true);
177  }
178  else
179  {
180  break;
181  }
182  }
183 
184  if (!has_read_callback_)
185  {
186  transport_->disableRead();
187  }
188 
189  reading_ = false;
190 }
191 
193 {
194  boost::recursive_mutex::scoped_try_lock lock(write_mutex_);
195 
196  if (!lock.owns_lock() || dropped_ || writing_)
197  {
198  return;
199  }
200 
201  writing_ = true;
202  bool can_write_more = true;
203 
204  while (has_write_callback_ && can_write_more && !dropped_)
205  {
206  uint32_t to_write = write_size_ - write_sent_;
207  ROS_DEBUG_NAMED("superdebug", "Connection writing %d bytes", to_write);
208  int32_t bytes_sent = transport_->write(write_buffer_.get() + write_sent_, to_write);
209  ROS_DEBUG_NAMED("superdebug", "Connection wrote %d bytes", bytes_sent);
210 
211  if (bytes_sent < 0)
212  {
213  writing_ = false;
214  return;
215  }
216 
217  write_sent_ += bytes_sent;
218 
219  if (bytes_sent < (int)write_size_ - (int)write_sent_)
220  {
221  can_write_more = false;
222  }
223 
224  if (write_sent_ == write_size_ && !dropped_)
225  {
226  WriteFinishedFunc callback;
227 
228  {
229  boost::mutex::scoped_lock lock(write_callback_mutex_);
231  // Store off a copy of the callback in case another write() call happens in it
232  callback = write_callback_;
235  write_sent_ = 0;
236  write_size_ = 0;
238  }
239 
240  ROS_DEBUG_NAMED("superdebug", "Calling write callback");
241  callback(shared_from_this());
242  }
243  }
244 
245  {
246  boost::mutex::scoped_lock lock(write_callback_mutex_);
247  if (!has_write_callback_)
248  {
249  transport_->disableWrite();
250  }
251  }
252 
253  writing_ = false;
254 }
255 
256 void Connection::onWriteable(const TransportPtr& transport)
257 {
258  (void)transport;
259  ROS_ASSERT(transport == transport_);
260 
261  writeTransport();
262 }
263 
264 void Connection::read(uint32_t size, const ReadFinishedFunc& callback)
265 {
267  {
268  return;
269  }
270 
271  {
272  boost::recursive_mutex::scoped_lock lock(read_mutex_);
273 
275 
276  read_callback_ = callback;
277  read_buffer_ = boost::shared_array<uint8_t>(new uint8_t[size]);
278  read_size_ = size;
279  read_filled_ = 0;
280  has_read_callback_ = 1;
281  }
282 
283  transport_->enableRead();
284 
285  // read immediately if possible
286  readTransport();
287 }
288 
289 void Connection::write(const boost::shared_array<uint8_t>& buffer, uint32_t size, const WriteFinishedFunc& callback, bool immediate)
290 {
292  {
293  return;
294  }
295 
296  {
297  boost::mutex::scoped_lock lock(write_callback_mutex_);
298 
300 
301  write_callback_ = callback;
302  write_buffer_ = buffer;
303  write_size_ = size;
304  write_sent_ = 0;
306  }
307 
308  transport_->enableWrite();
309 
310  if (immediate)
311  {
312  // write immediately if possible
313  writeTransport();
314  }
315 }
316 
318 {
319  (void)transport;
320  ROS_ASSERT(transport == transport_);
321 
323 }
324 
326 {
327  ROSCPP_LOG_DEBUG("Connection::drop(%u)", reason);
328  bool did_drop = false;
329  {
330  boost::recursive_mutex::scoped_lock lock(drop_mutex_);
331  if (!dropped_)
332  {
333  dropped_ = true;
334  did_drop = true;
335  }
336  }
337 
338  if (did_drop)
339  {
340  drop_signal_(shared_from_this(), reason);
341  transport_->close();
342  }
343 }
344 
346 {
347  boost::recursive_mutex::scoped_lock lock(drop_mutex_);
348  return dropped_;
349 }
350 
351 void Connection::writeHeader(const M_string& key_vals, const WriteFinishedFunc& finished_callback)
352 {
354  header_written_callback_ = finished_callback;
355 
356  if (!transport_->requiresHeader())
357  {
358  onHeaderWritten(shared_from_this());
359  return;
360  }
361 
363  uint32_t len;
364  Header::write(key_vals, buffer, len);
365 
366  uint32_t msg_len = len + 4;
367  boost::shared_array<uint8_t> full_msg(new uint8_t[msg_len]);
368  memcpy(full_msg.get() + 4, buffer.get(), len);
369  *((uint32_t*)full_msg.get()) = len;
370 
371  write(full_msg, msg_len, boost::bind(&Connection::onHeaderWritten, this, _1), false);
372 }
373 
374 void Connection::sendHeaderError(const std::string& error_msg)
375 {
376  M_string m;
377  m["error"] = error_msg;
378 
379  writeHeader(m, boost::bind(&Connection::onErrorHeaderWritten, this, _1));
380  sending_header_error_ = true;
381 }
382 
383 void Connection::onHeaderLengthRead(const ConnectionPtr& conn, const boost::shared_array<uint8_t>& buffer, uint32_t size, bool success)
384 {
385  (void)size;
386  ROS_ASSERT(conn.get() == this);
387  ROS_ASSERT(size == 4);
388 
389  if (!success)
390  return;
391 
392  uint32_t len = *((uint32_t*)buffer.get());
393 
394  if (len > 1000000000)
395  {
396  ROS_ERROR("a header of over a gigabyte was " \
397  "predicted in tcpros. that seems highly " \
398  "unlikely, so I'll assume protocol " \
399  "synchronization is lost.");
400  conn->drop(HeaderError);
401  }
402 
403  read(len, boost::bind(&Connection::onHeaderRead, this, _1, _2, _3, _4));
404 }
405 
406 void Connection::onHeaderRead(const ConnectionPtr& conn, const boost::shared_array<uint8_t>& buffer, uint32_t size, bool success)
407 {
408  ROS_ASSERT(conn.get() == this);
409 
410  if (!success)
411  return;
412 
413  std::string error_msg;
414  if (!header_.parse(buffer, size, error_msg))
415  {
416  drop(HeaderError);
417  }
418  else
419  {
420  std::string error_val;
421  if (header_.getValue("error", error_val))
422  {
423  ROSCPP_LOG_DEBUG("Received error message in header for connection to [%s]: [%s]", transport_->getTransportInfo().c_str(), error_val.c_str());
424  drop(HeaderError);
425  }
426  else
427  {
429 
430  transport_->parseHeader(header_);
431 
432  header_func_(conn, header_);
433  }
434  }
435 
436 }
437 
439 {
440  ROS_ASSERT(conn.get() == this);
442 
445 }
446 
448 {
449  (void)conn;
450  drop(HeaderError);
451 }
452 
454 {
455  header_func_ = func;
456 
457  if (transport_->requiresHeader())
458  read(4, boost::bind(&Connection::onHeaderLengthRead, this, _1, _2, _3, _4));
459 }
460 
462 {
463  std::string callerid;
464  if (header_.getValue("callerid", callerid))
465  {
466  return callerid;
467  }
468 
469  return std::string("unknown");
470 }
471 
473 {
474  std::stringstream ss;
475  ss << "callerid=[" << getCallerId() << "] address=[" << transport_->getTransportInfo() << "]";
476  return ss.str();
477 }
478 
479 }
boost::signals2::connection addDropListener(const DropFunc &slot)
Add a callback to be called when this connection has dropped.
Definition: connection.cpp:87
void removeDropListener(const boost::signals2::connection &c)
Definition: connection.cpp:93
bool is_server_
Are we a server? Servers wait for clients to send a header and then send a header in response...
Definition: connection.h:210
void readTransport()
Read data off our transport. Also manages calling the read callback. If there is any data to be read...
Definition: connection.cpp:107
volatile uint32_t has_write_callback_
Definition: connection.h:253
void read(uint32_t size, const ReadFinishedFunc &finished_callback)
Read a number of bytes, calling a callback when finished.
Definition: connection.cpp:264
void writeHeader(const M_string &key_vals, const WriteFinishedFunc &finished_callback)
Send a list of string key/value pairs as a header message.
Definition: connection.cpp:351
uint32_t write_sent_
Amount of data we&#39;ve written from the write buffer.
Definition: connection.h:240
void onErrorHeaderWritten(const ConnectionPtr &conn)
Definition: connection.cpp:447
bool sending_header_error_
If we&#39;re sending a header error we disable most other calls.
Definition: connection.h:265
bool reading_
Flag telling us if we&#39;re in the middle of a read (mostly to avoid recursive deadlocking) ...
Definition: connection.h:231
boost::function< void(const ConnectionPtr &, DropReason reason)> DropFunc
Definition: connection.h:148
ReadFinishedFunc read_callback_
Function to call when the read is finished.
Definition: connection.h:227
void setHeaderReceivedCallback(const HeaderReceivedFunc &func)
Set the header receipt callback.
Definition: connection.cpp:453
DropSignal drop_signal_
Signal raised when this connection is dropped.
Definition: connection.h:259
boost::mutex write_callback_mutex_
Definition: connection.h:245
boost::function< bool(const ConnectionPtr &, const Header &)> HeaderReceivedFunc
Definition: connection.h:62
void initialize(const TransportPtr &transport, bool is_server, const HeaderReceivedFunc &header_func)
Initialize this connection.
Definition: connection.cpp:69
void writeTransport()
Write data to our transport. Also manages calling the write callback.
Definition: connection.cpp:192
boost::recursive_mutex drop_mutex_
Synchronizes drop() calls.
Definition: connection.h:262
std::string getCallerId()
Definition: connection.cpp:461
std::map< std::string, std::string > M_string
void onHeaderWritten(const ConnectionPtr &conn)
Definition: connection.cpp:438
#define ROS_DEBUG_NAMED(name,...)
#define ROSCPP_LOG_DEBUG(...)
Definition: file_log.h:35
boost::recursive_mutex read_mutex_
Mutex used for protecting reading. Recursive because a read can immediately cause another read throug...
Definition: connection.h:229
void sendHeaderError(const std::string &error_message)
Send a header error message, of the form "error=<message>". Drops the connection once the data has wr...
Definition: connection.cpp:374
uint32_t read_filled_
Amount of data currently in the read buffer, in bytes.
Definition: connection.h:223
uint32_t write_size_
Size of the write buffer.
Definition: connection.h:242
bool parse(const boost::shared_array< uint8_t > &buffer, uint32_t size, std::string &error_msg)
bool dropped_
Have we dropped?
Definition: connection.h:212
void onHeaderLengthRead(const ConnectionPtr &conn, const boost::shared_array< uint8_t > &buffer, uint32_t size, bool success)
Definition: connection.cpp:383
boost::function< void(const ConnectionPtr &, const boost::shared_array< uint8_t > &, uint32_t, bool)> ReadFinishedFunc
Definition: connection.h:59
#define ROS_ASSERT_MSG(cond,...)
static void write(const M_string &key_vals, boost::shared_array< uint8_t > &buffer, uint32_t &size)
std::string getRemoteString()
Definition: connection.cpp:472
bool getValue(const std::string &key, std::string &value) const
bool writing_
Flag telling us if we&#39;re in the middle of a write (mostly used to avoid recursive deadlocking) ...
Definition: connection.h:249
void onDisconnect(const TransportPtr &transport)
Called by the Transport when it has been disconnected, either through a call to close() or through an...
Definition: connection.cpp:317
void drop(DropReason reason)
Drop this connection. Anything added as a drop listener through addDropListener will get called back ...
Definition: connection.cpp:325
WriteFinishedFunc write_callback_
Function to call when the current write is finished.
Definition: connection.h:244
WriteFinishedFunc header_written_callback_
Function to call when the outgoing header has finished writing.
Definition: connection.h:256
Header header_
Incoming header.
Definition: connection.h:214
boost::shared_array< uint8_t > write_buffer_
Buffer to write from.
Definition: connection.h:238
void write(const boost::shared_array< uint8_t > &buffer, uint32_t size, const WriteFinishedFunc &finished_callback, bool immedate=true)
Write a buffer of bytes, calling a callback when finished.
Definition: connection.cpp:289
boost::recursive_mutex write_mutex_
Mutex used for protecting writing. Recursive because a write can immediately cause another write thro...
Definition: connection.h:247
HeaderReceivedFunc header_func_
Function that handles the incoming header.
Definition: connection.h:218
boost::function< void(const ConnectionPtr &)> WriteFinishedFunc
Definition: connection.h:60
TransportPtr transport_
Transport associated with us.
Definition: connection.h:216
bool isDropped()
Returns whether or not this connection has been dropped.
Definition: connection.cpp:345
uint32_t read_size_
Size of the read buffer, in bytes.
Definition: connection.h:225
#define ROS_ASSERT(cond)
volatile uint32_t has_read_callback_
Definition: connection.h:235
void onHeaderRead(const ConnectionPtr &conn, const boost::shared_array< uint8_t > &buffer, uint32_t size, bool success)
Definition: connection.cpp:406
#define ROS_ERROR(...)
void onWriteable(const TransportPtr &transport)
Called by the Transport when it is possible to write data.
Definition: connection.cpp:256
void onReadable(const TransportPtr &transport)
Called by the Transport when there is data available to be read.
Definition: connection.cpp:99
boost::shared_array< uint8_t > read_buffer_
Read buffer that ends up being passed to the read callback.
Definition: connection.h:221


roscpp
Author(s): Morgan Quigley, Josh Faust, Brian Gerkey, Troy Straszheim
autogenerated on Sun Aug 26 2018 03:03:32