00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035 #include "ros/connection.h"
00036 #include "ros/transport/transport.h"
00037 #include "ros/file_log.h"
00038
00039 #include <ros/assert.h>
00040
00041 #include <boost/shared_array.hpp>
00042 #include <boost/bind.hpp>
00043
00044 namespace ros
00045 {
00046
00047 Connection::Connection()
00048 : is_server_(false)
00049 , dropped_(false)
00050 , read_filled_(0)
00051 , read_size_(0)
00052 , reading_(false)
00053 , has_read_callback_(0)
00054 , write_sent_(0)
00055 , write_size_(0)
00056 , writing_(false)
00057 , has_write_callback_(0)
00058 , sending_header_error_(false)
00059 {
00060 }
00061
00062 Connection::~Connection()
00063 {
00064 ROS_DEBUG_NAMED("superdebug", "Connection destructing, dropped=%s", dropped_ ? "true" : "false");
00065
00066 drop(Destructing);
00067 }
00068
00069 void Connection::initialize(const TransportPtr& transport, bool is_server, const HeaderReceivedFunc& header_func)
00070 {
00071 ROS_ASSERT(transport);
00072
00073 transport_ = transport;
00074 header_func_ = header_func;
00075 is_server_ = is_server;
00076
00077 transport_->setReadCallback(boost::bind(&Connection::onReadable, this, _1));
00078 transport_->setWriteCallback(boost::bind(&Connection::onWriteable, this, _1));
00079 transport_->setDisconnectCallback(boost::bind(&Connection::onDisconnect, this, _1));
00080
00081 if (header_func)
00082 {
00083 read(4, boost::bind(&Connection::onHeaderLengthRead, this, _1, _2, _3, _4));
00084 }
00085 }
00086
00087 boost::signals2::connection Connection::addDropListener(const DropFunc& slot)
00088 {
00089 boost::recursive_mutex::scoped_lock lock(drop_mutex_);
00090 return drop_signal_.connect(slot);
00091 }
00092
00093 void Connection::removeDropListener(const boost::signals2::connection& c)
00094 {
00095 boost::recursive_mutex::scoped_lock lock(drop_mutex_);
00096 c.disconnect();
00097 }
00098
00099 void Connection::onReadable(const TransportPtr& transport)
00100 {
00101 ROS_ASSERT(transport == transport_);
00102
00103 readTransport();
00104 }
00105
00106 void Connection::readTransport()
00107 {
00108 boost::recursive_mutex::scoped_try_lock lock(read_mutex_);
00109
00110 if (!lock.owns_lock() || dropped_ || reading_)
00111 {
00112 return;
00113 }
00114
00115 reading_ = true;
00116
00117 while (!dropped_ && has_read_callback_)
00118 {
00119 ROS_ASSERT(read_buffer_);
00120 uint32_t to_read = read_size_ - read_filled_;
00121 if (to_read > 0)
00122 {
00123 int32_t bytes_read = transport_->read(read_buffer_.get() + read_filled_, to_read);
00124 ROS_DEBUG_NAMED("superdebug", "Connection read %d bytes", bytes_read);
00125 if (dropped_)
00126 {
00127 return;
00128 }
00129 else if (bytes_read < 0)
00130 {
00131
00132 ReadFinishedFunc callback;
00133 callback = read_callback_;
00134 read_callback_.clear();
00135 read_buffer_.reset();
00136 uint32_t size = read_size_;
00137 read_size_ = 0;
00138 read_filled_ = 0;
00139 has_read_callback_ = 0;
00140
00141 if (callback)
00142 {
00143 callback(shared_from_this(), read_buffer_, size, false);
00144 }
00145
00146 break;
00147 }
00148
00149 read_filled_ += bytes_read;
00150 }
00151
00152 ROS_ASSERT((int32_t)read_size_ >= 0);
00153 ROS_ASSERT((int32_t)read_filled_ >= 0);
00154 ROS_ASSERT_MSG(read_filled_ <= read_size_, "read_filled_ = %d, read_size_ = %d", read_filled_, read_size_);
00155
00156 if (read_filled_ == read_size_ && !dropped_)
00157 {
00158 ReadFinishedFunc callback;
00159 uint32_t size;
00160 boost::shared_array<uint8_t> buffer;
00161
00162 ROS_ASSERT(has_read_callback_);
00163
00164
00165 callback = read_callback_;
00166 size = read_size_;
00167 buffer = read_buffer_;
00168 read_callback_.clear();
00169 read_buffer_.reset();
00170 read_size_ = 0;
00171 read_filled_ = 0;
00172 has_read_callback_ = 0;
00173
00174 ROS_DEBUG_NAMED("superdebug", "Calling read callback");
00175 callback(shared_from_this(), buffer, size, true);
00176 }
00177 else
00178 {
00179 break;
00180 }
00181 }
00182
00183 if (!has_read_callback_)
00184 {
00185 transport_->disableRead();
00186 }
00187
00188 reading_ = false;
00189 }
00190
00191 void Connection::writeTransport()
00192 {
00193 boost::recursive_mutex::scoped_try_lock lock(write_mutex_);
00194
00195 if (!lock.owns_lock() || dropped_ || writing_)
00196 {
00197 return;
00198 }
00199
00200 writing_ = true;
00201 bool can_write_more = true;
00202
00203 while (has_write_callback_ && can_write_more && !dropped_)
00204 {
00205 uint32_t to_write = write_size_ - write_sent_;
00206 ROS_DEBUG_NAMED("superdebug", "Connection writing %d bytes", to_write);
00207 int32_t bytes_sent = transport_->write(write_buffer_.get() + write_sent_, to_write);
00208 ROS_DEBUG_NAMED("superdebug", "Connection wrote %d bytes", bytes_sent);
00209
00210 if (bytes_sent < 0)
00211 {
00212 writing_ = false;
00213 return;
00214 }
00215
00216 write_sent_ += bytes_sent;
00217
00218 if (bytes_sent < (int)write_size_ - (int)write_sent_)
00219 {
00220 can_write_more = false;
00221 }
00222
00223 if (write_sent_ == write_size_ && !dropped_)
00224 {
00225 WriteFinishedFunc callback;
00226
00227 {
00228 boost::mutex::scoped_lock lock(write_callback_mutex_);
00229 ROS_ASSERT(has_write_callback_);
00230
00231 callback = write_callback_;
00232 write_callback_ = WriteFinishedFunc();
00233 write_buffer_ = boost::shared_array<uint8_t>();
00234 write_sent_ = 0;
00235 write_size_ = 0;
00236 has_write_callback_ = 0;
00237 }
00238
00239 ROS_DEBUG_NAMED("superdebug", "Calling write callback");
00240 callback(shared_from_this());
00241 }
00242 }
00243
00244 {
00245 boost::mutex::scoped_lock lock(write_callback_mutex_);
00246 if (!has_write_callback_)
00247 {
00248 transport_->disableWrite();
00249 }
00250 }
00251
00252 writing_ = false;
00253 }
00254
00255 void Connection::onWriteable(const TransportPtr& transport)
00256 {
00257 ROS_ASSERT(transport == transport_);
00258
00259 writeTransport();
00260 }
00261
00262 void Connection::read(uint32_t size, const ReadFinishedFunc& callback)
00263 {
00264 if (dropped_ || sending_header_error_)
00265 {
00266 return;
00267 }
00268
00269 {
00270 boost::recursive_mutex::scoped_lock lock(read_mutex_);
00271
00272 ROS_ASSERT(!read_callback_);
00273
00274 read_callback_ = callback;
00275 read_buffer_ = boost::shared_array<uint8_t>(new uint8_t[size]);
00276 read_size_ = size;
00277 read_filled_ = 0;
00278 has_read_callback_ = 1;
00279 }
00280
00281 transport_->enableRead();
00282
00283
00284 readTransport();
00285 }
00286
00287 void Connection::write(const boost::shared_array<uint8_t>& buffer, uint32_t size, const WriteFinishedFunc& callback, bool immediate)
00288 {
00289 if (dropped_ || sending_header_error_)
00290 {
00291 return;
00292 }
00293
00294 {
00295 boost::mutex::scoped_lock lock(write_callback_mutex_);
00296
00297 ROS_ASSERT(!write_callback_);
00298
00299 write_callback_ = callback;
00300 write_buffer_ = buffer;
00301 write_size_ = size;
00302 write_sent_ = 0;
00303 has_write_callback_ = 1;
00304 }
00305
00306 transport_->enableWrite();
00307
00308 if (immediate)
00309 {
00310
00311 writeTransport();
00312 }
00313 }
00314
00315 void Connection::onDisconnect(const TransportPtr& transport)
00316 {
00317 ROS_ASSERT(transport == transport_);
00318
00319 drop(TransportDisconnect);
00320 }
00321
00322 void Connection::drop(DropReason reason)
00323 {
00324 ROSCPP_LOG_DEBUG("Connection::drop(%u)", reason);
00325 bool did_drop = false;
00326 {
00327 boost::recursive_mutex::scoped_lock lock(drop_mutex_);
00328 if (!dropped_)
00329 {
00330 dropped_ = true;
00331 did_drop = true;
00332 }
00333 }
00334
00335 if (did_drop)
00336 {
00337 drop_signal_(shared_from_this(), reason);
00338 transport_->close();
00339 }
00340 }
00341
00342 bool Connection::isDropped()
00343 {
00344 boost::recursive_mutex::scoped_lock lock(drop_mutex_);
00345 return dropped_;
00346 }
00347
00348 void Connection::writeHeader(const M_string& key_vals, const WriteFinishedFunc& finished_callback)
00349 {
00350 ROS_ASSERT(!header_written_callback_);
00351 header_written_callback_ = finished_callback;
00352
00353 if (!transport_->requiresHeader())
00354 {
00355 onHeaderWritten(shared_from_this());
00356 return;
00357 }
00358
00359 boost::shared_array<uint8_t> buffer;
00360 uint32_t len;
00361 Header::write(key_vals, buffer, len);
00362
00363 uint32_t msg_len = len + 4;
00364 boost::shared_array<uint8_t> full_msg(new uint8_t[msg_len]);
00365 memcpy(full_msg.get() + 4, buffer.get(), len);
00366 *((uint32_t*)full_msg.get()) = len;
00367
00368 write(full_msg, msg_len, boost::bind(&Connection::onHeaderWritten, this, _1), false);
00369 }
00370
00371 void Connection::sendHeaderError(const std::string& error_msg)
00372 {
00373 M_string m;
00374 m["error"] = error_msg;
00375
00376 writeHeader(m, boost::bind(&Connection::onErrorHeaderWritten, this, _1));
00377 sending_header_error_ = true;
00378 }
00379
00380 void Connection::onHeaderLengthRead(const ConnectionPtr& conn, const boost::shared_array<uint8_t>& buffer, uint32_t size, bool success)
00381 {
00382 ROS_ASSERT(conn.get() == this);
00383 ROS_ASSERT(size == 4);
00384
00385 if (!success)
00386 return;
00387
00388 uint32_t len = *((uint32_t*)buffer.get());
00389
00390 if (len > 1000000000)
00391 {
00392 ROS_ERROR("a header of over a gigabyte was " \
00393 "predicted in tcpros. that seems highly " \
00394 "unlikely, so I'll assume protocol " \
00395 "synchronization is lost.");
00396 conn->drop(HeaderError);
00397 }
00398
00399 read(len, boost::bind(&Connection::onHeaderRead, this, _1, _2, _3, _4));
00400 }
00401
00402 void Connection::onHeaderRead(const ConnectionPtr& conn, const boost::shared_array<uint8_t>& buffer, uint32_t size, bool success)
00403 {
00404 ROS_ASSERT(conn.get() == this);
00405
00406 if (!success)
00407 return;
00408
00409 std::string error_msg;
00410 if (!header_.parse(buffer, size, error_msg))
00411 {
00412 drop(HeaderError);
00413 }
00414 else
00415 {
00416 std::string error_val;
00417 if (header_.getValue("error", error_val))
00418 {
00419 ROSCPP_LOG_DEBUG("Received error message in header for connection to [%s]: [%s]", transport_->getTransportInfo().c_str(), error_val.c_str());
00420 drop(HeaderError);
00421 }
00422 else
00423 {
00424 ROS_ASSERT(header_func_);
00425
00426 transport_->parseHeader(header_);
00427
00428 header_func_(conn, header_);
00429 }
00430 }
00431
00432 }
00433
00434 void Connection::onHeaderWritten(const ConnectionPtr& conn)
00435 {
00436 ROS_ASSERT(conn.get() == this);
00437 ROS_ASSERT(header_written_callback_);
00438
00439 header_written_callback_(conn);
00440 header_written_callback_ = WriteFinishedFunc();
00441 }
00442
00443 void Connection::onErrorHeaderWritten(const ConnectionPtr& conn)
00444 {
00445 (void)conn;
00446 drop(HeaderError);
00447 }
00448
00449 void Connection::setHeaderReceivedCallback(const HeaderReceivedFunc& func)
00450 {
00451 header_func_ = func;
00452
00453 if (transport_->requiresHeader())
00454 read(4, boost::bind(&Connection::onHeaderLengthRead, this, _1, _2, _3, _4));
00455 }
00456
00457 std::string Connection::getCallerId()
00458 {
00459 std::string callerid;
00460 if (header_.getValue("callerid", callerid))
00461 {
00462 return callerid;
00463 }
00464
00465 return std::string("unknown");
00466 }
00467
00468 std::string Connection::getRemoteString()
00469 {
00470 std::stringstream ss;
00471 ss << "callerid=[" << getCallerId() << "] address=[" << transport_->getTransportInfo() << "]";
00472 return ss.str();
00473 }
00474
00475 }