41 #include <boost/shared_array.hpp> 42 #include <boost/bind.hpp> 53 , has_read_callback_(0)
57 , has_write_callback_(0)
58 , sending_header_error_(false)
89 boost::recursive_mutex::scoped_lock lock(
drop_mutex_);
95 boost::recursive_mutex::scoped_lock lock(
drop_mutex_);
109 boost::recursive_mutex::scoped_try_lock lock(
read_mutex_);
130 else if (bytes_read < 0)
144 callback(shared_from_this(),
read_buffer_, size,
false);
150 read_filled_ += bytes_read;
176 callback(shared_from_this(), buffer, size,
true);
194 boost::recursive_mutex::scoped_try_lock lock(
write_mutex_);
202 bool can_write_more =
true;
207 ROS_DEBUG_NAMED(
"superdebug",
"Connection writing %d bytes", to_write);
209 ROS_DEBUG_NAMED(
"superdebug",
"Connection wrote %d bytes", bytes_sent);
217 write_sent_ += bytes_sent;
219 if (bytes_sent < (
int)
write_size_ - (
int)write_sent_)
221 can_write_more =
false;
241 callback(shared_from_this());
272 boost::recursive_mutex::scoped_lock lock(
read_mutex_);
328 bool did_drop =
false;
330 boost::recursive_mutex::scoped_lock lock(
drop_mutex_);
347 boost::recursive_mutex::scoped_lock lock(
drop_mutex_);
366 uint32_t msg_len = len + 4;
368 memcpy(full_msg.get() + 4, buffer.get(), len);
369 *((uint32_t*)full_msg.get()) = len;
377 m[
"error"] = error_msg;
392 uint32_t len = *((uint32_t*)buffer.get());
394 if (len > 1000000000)
396 ROS_ERROR(
"a header of over a gigabyte was " \
397 "predicted in tcpros. that seems highly " \
398 "unlikely, so I'll assume protocol " \
399 "synchronization is lost.");
413 std::string error_msg;
420 std::string error_val;
423 ROSCPP_LOG_DEBUG(
"Received error message in header for connection to [%s]: [%s]",
transport_->getTransportInfo().c_str(), error_val.c_str());
463 std::string callerid;
469 return std::string(
"unknown");
474 std::stringstream ss;
boost::signals2::connection addDropListener(const DropFunc &slot)
Add a callback to be called when this connection has dropped.
void removeDropListener(const boost::signals2::connection &c)
bool is_server_
Are we a server? Servers wait for clients to send a header and then send a header in response...
void readTransport()
Read data off our transport. Also manages calling the read callback. If there is any data to be read...
volatile uint32_t has_write_callback_
void read(uint32_t size, const ReadFinishedFunc &finished_callback)
Read a number of bytes, calling a callback when finished.
void writeHeader(const M_string &key_vals, const WriteFinishedFunc &finished_callback)
Send a list of string key/value pairs as a header message.
uint32_t write_sent_
Amount of data we've written from the write buffer.
void onErrorHeaderWritten(const ConnectionPtr &conn)
bool sending_header_error_
If we're sending a header error we disable most other calls.
bool reading_
Flag telling us if we're in the middle of a read (mostly to avoid recursive deadlocking) ...
boost::function< void(const ConnectionPtr &, DropReason reason)> DropFunc
ReadFinishedFunc read_callback_
Function to call when the read is finished.
void setHeaderReceivedCallback(const HeaderReceivedFunc &func)
Set the header receipt callback.
DropSignal drop_signal_
Signal raised when this connection is dropped.
boost::mutex write_callback_mutex_
boost::function< bool(const ConnectionPtr &, const Header &)> HeaderReceivedFunc
void initialize(const TransportPtr &transport, bool is_server, const HeaderReceivedFunc &header_func)
Initialize this connection.
void writeTransport()
Write data to our transport. Also manages calling the write callback.
boost::recursive_mutex drop_mutex_
Synchronizes drop() calls.
std::string getCallerId()
std::map< std::string, std::string > M_string
void onHeaderWritten(const ConnectionPtr &conn)
#define ROS_DEBUG_NAMED(name,...)
#define ROSCPP_LOG_DEBUG(...)
boost::recursive_mutex read_mutex_
Mutex used for protecting reading. Recursive because a read can immediately cause another read throug...
void sendHeaderError(const std::string &error_message)
Send a header error message, of the form "error=<message>". Drops the connection once the data has wr...
uint32_t read_filled_
Amount of data currently in the read buffer, in bytes.
uint32_t write_size_
Size of the write buffer.
bool dropped_
Have we dropped?
void onHeaderLengthRead(const ConnectionPtr &conn, const boost::shared_array< uint8_t > &buffer, uint32_t size, bool success)
boost::function< void(const ConnectionPtr &, const boost::shared_array< uint8_t > &, uint32_t, bool)> ReadFinishedFunc
#define ROS_ASSERT_MSG(cond,...)
std::string getRemoteString()
bool writing_
Flag telling us if we're in the middle of a write (mostly used to avoid recursive deadlocking) ...
void onDisconnect(const TransportPtr &transport)
Called by the Transport when it has been disconnected, either through a call to close() or through an...
void drop(DropReason reason)
Drop this connection. Anything added as a drop listener through addDropListener will get called back ...
WriteFinishedFunc write_callback_
Function to call when the current write is finished.
WriteFinishedFunc header_written_callback_
Function to call when the outgoing header has finished writing.
Header header_
Incoming header.
boost::shared_array< uint8_t > write_buffer_
Buffer to write from.
void write(const boost::shared_array< uint8_t > &buffer, uint32_t size, const WriteFinishedFunc &finished_callback, bool immedate=true)
Write a buffer of bytes, calling a callback when finished.
boost::recursive_mutex write_mutex_
Mutex used for protecting writing. Recursive because a write can immediately cause another write thro...
HeaderReceivedFunc header_func_
Function that handles the incoming header.
boost::function< void(const ConnectionPtr &)> WriteFinishedFunc
TransportPtr transport_
Transport associated with us.
bool isDropped()
Returns whether or not this connection has been dropped.
uint32_t read_size_
Size of the read buffer, in bytes.
volatile uint32_t has_read_callback_
void onHeaderRead(const ConnectionPtr &conn, const boost::shared_array< uint8_t > &buffer, uint32_t size, bool success)
void onWriteable(const TransportPtr &transport)
Called by the Transport when it is possible to write data.
void onReadable(const TransportPtr &transport)
Called by the Transport when there is data available to be read.
boost::shared_array< uint8_t > read_buffer_
Read buffer that ends up being passed to the read callback.