00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035 #include "ros/connection.h"
00036 #include "ros/transport/transport.h"
00037 #include "ros/file_log.h"
00038
00039 #include <ros/assert.h>
00040
00041 #include <boost/shared_array.hpp>
00042 #include <boost/bind.hpp>
00043
00044 namespace ros
00045 {
00046
00047 Connection::Connection()
00048 : is_server_(false)
00049 , dropped_(false)
00050 , read_filled_(0)
00051 , read_size_(0)
00052 , reading_(false)
00053 , has_read_callback_(0)
00054 , write_sent_(0)
00055 , write_size_(0)
00056 , writing_(false)
00057 , has_write_callback_(0)
00058 , sending_header_error_(false)
00059 {
00060 }
00061
00062 Connection::~Connection()
00063 {
00064 ROS_DEBUG_NAMED("superdebug", "Connection destructing, dropped=%s", dropped_ ? "true" : "false");
00065
00066 drop(Destructing);
00067 }
00068
00069 void Connection::initialize(const TransportPtr& transport, bool is_server, const HeaderReceivedFunc& header_func)
00070 {
00071 ROS_ASSERT(transport);
00072
00073 transport_ = transport;
00074 header_func_ = header_func;
00075 is_server_ = is_server;
00076
00077 transport_->setReadCallback(boost::bind(&Connection::onReadable, this, _1));
00078 transport_->setWriteCallback(boost::bind(&Connection::onWriteable, this, _1));
00079 transport_->setDisconnectCallback(boost::bind(&Connection::onDisconnect, this, _1));
00080
00081 if (header_func)
00082 {
00083 read(4, boost::bind(&Connection::onHeaderLengthRead, this, _1, _2, _3, _4));
00084 }
00085 }
00086
00087 boost::signals::connection Connection::addDropListener(const DropFunc& slot)
00088 {
00089 boost::recursive_mutex::scoped_lock lock(drop_mutex_);
00090 return drop_signal_.connect(slot);
00091 }
00092
00093 void Connection::removeDropListener(const boost::signals::connection& c)
00094 {
00095 boost::recursive_mutex::scoped_lock lock(drop_mutex_);
00096 c.disconnect();
00097 }
00098
00099 void Connection::onReadable(const TransportPtr& transport)
00100 {
00101 ROS_ASSERT(transport == transport_);
00102
00103 readTransport();
00104 }
00105
00106 void Connection::readTransport()
00107 {
00108 boost::recursive_mutex::scoped_try_lock lock(read_mutex_);
00109
00110 if (!lock.owns_lock() || dropped_ || reading_)
00111 {
00112 return;
00113 }
00114
00115 reading_ = true;
00116
00117 while (!dropped_ && has_read_callback_)
00118 {
00119 ROS_ASSERT(read_buffer_);
00120 uint32_t to_read = read_size_ - read_filled_;
00121 if (to_read > 0)
00122 {
00123 int32_t bytes_read = transport_->read(read_buffer_.get() + read_filled_, to_read);
00124 ROS_DEBUG_NAMED("superdebug", "Connection read %d bytes", bytes_read);
00125 if (dropped_)
00126 {
00127 return;
00128 }
00129 else if (bytes_read < 0)
00130 {
00131
00132 ReadFinishedFunc callback;
00133 callback = read_callback_;
00134 read_callback_.clear();
00135 read_buffer_.reset();
00136 uint32_t size = read_size_;
00137 read_size_ = 0;
00138 read_filled_ = 0;
00139 has_read_callback_ = 0;
00140
00141 if (callback)
00142 {
00143 callback(shared_from_this(), read_buffer_, size, false);
00144 }
00145
00146 break;
00147 }
00148
00149 read_filled_ += bytes_read;
00150 }
00151
00152 ROS_ASSERT((int32_t)read_size_ >= 0);
00153 ROS_ASSERT((int32_t)read_filled_ >= 0);
00154 ROS_ASSERT_MSG(read_filled_ <= read_size_, "read_filled_ = %d, read_size_ = %d", read_filled_, read_size_);
00155
00156 if (read_filled_ == read_size_ && !dropped_)
00157 {
00158 ReadFinishedFunc callback;
00159 uint32_t size;
00160 boost::shared_array<uint8_t> buffer;
00161
00162 ROS_ASSERT(has_read_callback_);
00163
00164
00165 callback = read_callback_;
00166 size = read_size_;
00167 buffer = read_buffer_;
00168 read_callback_.clear();
00169 read_buffer_.reset();
00170 read_size_ = 0;
00171 read_filled_ = 0;
00172 has_read_callback_ = 0;
00173
00174 ROS_DEBUG_NAMED("superdebug", "Calling read callback");
00175 callback(shared_from_this(), buffer, size, true);
00176 }
00177 else
00178 {
00179 break;
00180 }
00181 }
00182
00183 if (!has_read_callback_)
00184 {
00185 transport_->disableRead();
00186 }
00187
00188 reading_ = false;
00189 }
00190
00191 void Connection::writeTransport()
00192 {
00193 boost::recursive_mutex::scoped_try_lock lock(write_mutex_);
00194
00195 if (!lock.owns_lock() || dropped_ || writing_)
00196 {
00197 return;
00198 }
00199
00200 writing_ = true;
00201 bool can_write_more = true;
00202
00203 while (has_write_callback_ && can_write_more && !dropped_)
00204 {
00205 uint32_t to_write = write_size_ - write_sent_;
00206 ROS_DEBUG_NAMED("superdebug", "Connection writing %d bytes", to_write);
00207 int32_t bytes_sent = transport_->write(write_buffer_.get() + write_sent_, to_write);
00208 ROS_DEBUG_NAMED("superdebug", "Connection wrote %d bytes", bytes_sent);
00209
00210 if (bytes_sent < 0)
00211 {
00212 writing_ = false;
00213 return;
00214 }
00215
00216 write_sent_ += bytes_sent;
00217
00218 if (bytes_sent < (int)write_size_ - (int)write_sent_)
00219 {
00220 can_write_more = false;
00221 }
00222
00223 if (write_sent_ == write_size_ && !dropped_)
00224 {
00225 WriteFinishedFunc callback;
00226
00227 {
00228 boost::mutex::scoped_lock lock(write_callback_mutex_);
00229 ROS_ASSERT(has_write_callback_);
00230
00231 callback = write_callback_;
00232 write_callback_ = WriteFinishedFunc();
00233 write_buffer_ = boost::shared_array<uint8_t>();
00234 write_sent_ = 0;
00235 write_size_ = 0;
00236 has_write_callback_ = 0;
00237 }
00238
00239 ROS_DEBUG_NAMED("superdebug", "Calling write callback");
00240 callback(shared_from_this());
00241 }
00242 }
00243
00244 {
00245 boost::mutex::scoped_lock lock(write_callback_mutex_);
00246 if (!has_write_callback_)
00247 {
00248 transport_->disableWrite();
00249 }
00250 }
00251
00252 writing_ = false;
00253 }
00254
00255 void Connection::onWriteable(const TransportPtr& transport)
00256 {
00257 ROS_ASSERT(transport == transport_);
00258
00259 writeTransport();
00260 }
00261
00262 void Connection::read(uint32_t size, const ReadFinishedFunc& callback)
00263 {
00264 if (dropped_ || sending_header_error_)
00265 {
00266 return;
00267 }
00268
00269 {
00270 boost::recursive_mutex::scoped_lock lock(read_mutex_);
00271
00272 ROS_ASSERT(!read_callback_);
00273
00274 read_callback_ = callback;
00275 read_buffer_ = boost::shared_array<uint8_t>(new uint8_t[size]);
00276 read_size_ = size;
00277 read_filled_ = 0;
00278 has_read_callback_ = 1;
00279 }
00280
00281 transport_->enableRead();
00282
00283
00284 readTransport();
00285 }
00286
00287 void Connection::write(const boost::shared_array<uint8_t>& buffer, uint32_t size, const WriteFinishedFunc& callback, bool immediate)
00288 {
00289 if (dropped_ || sending_header_error_)
00290 {
00291 return;
00292 }
00293
00294 {
00295 boost::mutex::scoped_lock lock(write_callback_mutex_);
00296
00297 ROS_ASSERT(!write_callback_);
00298
00299 write_callback_ = callback;
00300 write_buffer_ = buffer;
00301 write_size_ = size;
00302 write_sent_ = 0;
00303 has_write_callback_ = 1;
00304 }
00305
00306 transport_->enableWrite();
00307
00308 if (immediate)
00309 {
00310
00311 writeTransport();
00312 }
00313 }
00314
00315 void Connection::onDisconnect(const TransportPtr& transport)
00316 {
00317 ROS_ASSERT(transport == transport_);
00318
00319 drop(TransportDisconnect);
00320 }
00321
00322 void Connection::drop(DropReason reason)
00323 {
00324 ROSCPP_LOG_DEBUG("Connection::drop(%u)", reason);
00325 bool did_drop = false;
00326 {
00327 boost::recursive_mutex::scoped_lock lock(drop_mutex_);
00328 if (!dropped_)
00329 {
00330 dropped_ = true;
00331 did_drop = true;
00332
00333 drop_signal_(shared_from_this(), reason);
00334 }
00335 }
00336
00337 if (did_drop)
00338 {
00339 transport_->close();
00340 }
00341 }
00342
00343 bool Connection::isDropped()
00344 {
00345 boost::recursive_mutex::scoped_lock lock(drop_mutex_);
00346 return dropped_;
00347 }
00348
00349 void Connection::writeHeader(const M_string& key_vals, const WriteFinishedFunc& finished_callback)
00350 {
00351 ROS_ASSERT(!header_written_callback_);
00352 header_written_callback_ = finished_callback;
00353
00354 if (!transport_->requiresHeader())
00355 {
00356 onHeaderWritten(shared_from_this());
00357 return;
00358 }
00359
00360 boost::shared_array<uint8_t> buffer;
00361 uint32_t len;
00362 Header::write(key_vals, buffer, len);
00363
00364 uint32_t msg_len = len + 4;
00365 boost::shared_array<uint8_t> full_msg(new uint8_t[msg_len]);
00366 memcpy(full_msg.get() + 4, buffer.get(), len);
00367 *((uint32_t*)full_msg.get()) = len;
00368
00369 write(full_msg, msg_len, boost::bind(&Connection::onHeaderWritten, this, _1), false);
00370 }
00371
00372 void Connection::sendHeaderError(const std::string& error_msg)
00373 {
00374 M_string m;
00375 m["error"] = error_msg;
00376
00377 writeHeader(m, boost::bind(&Connection::onErrorHeaderWritten, this, _1));
00378 sending_header_error_ = true;
00379 }
00380
00381 void Connection::onHeaderLengthRead(const ConnectionPtr& conn, const boost::shared_array<uint8_t>& buffer, uint32_t size, bool success)
00382 {
00383 ROS_ASSERT(conn.get() == this);
00384 ROS_ASSERT(size == 4);
00385
00386 if (!success)
00387 return;
00388
00389 uint32_t len = *((uint32_t*)buffer.get());
00390
00391 if (len > 1000000000)
00392 {
00393 ROS_ERROR("a header of over a gigabyte was " \
00394 "predicted in tcpros. that seems highly " \
00395 "unlikely, so I'll assume protocol " \
00396 "synchronization is lost.");
00397 conn->drop(HeaderError);
00398 }
00399
00400 read(len, boost::bind(&Connection::onHeaderRead, this, _1, _2, _3, _4));
00401 }
00402
00403 void Connection::onHeaderRead(const ConnectionPtr& conn, const boost::shared_array<uint8_t>& buffer, uint32_t size, bool success)
00404 {
00405 ROS_ASSERT(conn.get() == this);
00406
00407 if (!success)
00408 return;
00409
00410 std::string error_msg;
00411 if (!header_.parse(buffer, size, error_msg))
00412 {
00413 drop(HeaderError);
00414 }
00415 else
00416 {
00417 std::string error_val;
00418 if (header_.getValue("error", error_val))
00419 {
00420 ROSCPP_LOG_DEBUG("Received error message in header for connection to [%s]: [%s]", transport_->getTransportInfo().c_str(), error_val.c_str());
00421 drop(HeaderError);
00422 }
00423 else
00424 {
00425 ROS_ASSERT(header_func_);
00426
00427 transport_->parseHeader(header_);
00428
00429 header_func_(conn, header_);
00430 }
00431 }
00432
00433 }
00434
00435 void Connection::onHeaderWritten(const ConnectionPtr& conn)
00436 {
00437 ROS_ASSERT(conn.get() == this);
00438 ROS_ASSERT(header_written_callback_);
00439
00440 header_written_callback_(conn);
00441 header_written_callback_ = WriteFinishedFunc();
00442 }
00443
00444 void Connection::onErrorHeaderWritten(const ConnectionPtr& conn)
00445 {
00446 drop(HeaderError);
00447 }
00448
00449 void Connection::setHeaderReceivedCallback(const HeaderReceivedFunc& func)
00450 {
00451 header_func_ = func;
00452
00453 if (transport_->requiresHeader())
00454 read(4, boost::bind(&Connection::onHeaderLengthRead, this, _1, _2, _3, _4));
00455 }
00456
00457 std::string Connection::getCallerId()
00458 {
00459 std::string callerid;
00460 if (header_.getValue("callerid", callerid))
00461 {
00462 return callerid;
00463 }
00464
00465 return std::string("unknown");
00466 }
00467
00468 std::string Connection::getRemoteString()
00469 {
00470 std::stringstream ss;
00471 ss << "callerid=[" << getCallerId() << "] address=[" << transport_->getTransportInfo() << "]";
00472 return ss.str();
00473 }
00474
00475 }