$search
00001 /* 00002 * Software License Agreement (BSD License) 00003 * 00004 * Copyright (c) 2008, Willow Garage, Inc. 00005 * All rights reserved. 00006 * 00007 * Redistribution and use in source and binary forms, with or without 00008 * modification, are permitted provided that the following conditions 00009 * are met: 00010 * 00011 * * Redistributions of source code must retain the above copyright 00012 * notice, this list of conditions and the following disclaimer. 00013 * * Redistributions in binary form must reproduce the above 00014 * copyright notice, this list of conditions and the following 00015 * disclaimer in the documentation and/or other materials provided 00016 * with the distribution. 00017 * * Neither the name of Willow Garage, Inc. nor the names of its 00018 * contributors may be used to endorse or promote products derived 00019 * from this software without specific prior written permission. 00020 * 00021 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 00022 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 00023 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS 00024 * FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE 00025 * COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, 00026 * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, 00027 * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 00028 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER 00029 * CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 00030 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN 00031 * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 00032 * POSSIBILITY OF SUCH DAMAGE. 00033 */ 00034 00035 #include "ros/connection.h" 00036 #include "ros/transport/transport.h" 00037 #include "ros/file_log.h" 00038 00039 #include <ros/assert.h> 00040 00041 #include <boost/shared_array.hpp> 00042 #include <boost/bind.hpp> 00043 00044 namespace ros 00045 { 00046 00047 Connection::Connection() 00048 : is_server_(false) 00049 , dropped_(false) 00050 , read_filled_(0) 00051 , read_size_(0) 00052 , reading_(false) 00053 , has_read_callback_(0) 00054 , write_sent_(0) 00055 , write_size_(0) 00056 , writing_(false) 00057 , has_write_callback_(0) 00058 , sending_header_error_(false) 00059 { 00060 } 00061 00062 Connection::~Connection() 00063 { 00064 ROS_DEBUG_NAMED("superdebug", "Connection destructing, dropped=%s", dropped_ ? "true" : "false"); 00065 00066 drop(Destructing); 00067 } 00068 00069 void Connection::initialize(const TransportPtr& transport, bool is_server, const HeaderReceivedFunc& header_func) 00070 { 00071 ROS_ASSERT(transport); 00072 00073 transport_ = transport; 00074 header_func_ = header_func; 00075 is_server_ = is_server; 00076 00077 transport_->setReadCallback(boost::bind(&Connection::onReadable, this, _1)); 00078 transport_->setWriteCallback(boost::bind(&Connection::onWriteable, this, _1)); 00079 transport_->setDisconnectCallback(boost::bind(&Connection::onDisconnect, this, _1)); 00080 00081 if (header_func) 00082 { 00083 read(4, boost::bind(&Connection::onHeaderLengthRead, this, _1, _2, _3, _4)); 00084 } 00085 } 00086 00087 boost::signals::connection Connection::addDropListener(const DropFunc& slot) 00088 { 00089 boost::recursive_mutex::scoped_lock lock(drop_mutex_); 00090 return drop_signal_.connect(slot); 00091 } 00092 00093 void Connection::removeDropListener(const boost::signals::connection& c) 00094 { 00095 boost::recursive_mutex::scoped_lock lock(drop_mutex_); 00096 c.disconnect(); 00097 } 00098 00099 void Connection::onReadable(const TransportPtr& transport) 00100 { 00101 ROS_ASSERT(transport == transport_); 00102 00103 readTransport(); 00104 } 00105 00106 void Connection::readTransport() 00107 { 00108 boost::recursive_mutex::scoped_try_lock lock(read_mutex_); 00109 00110 if (!lock.owns_lock() || dropped_ || reading_) 00111 { 00112 return; 00113 } 00114 00115 reading_ = true; 00116 00117 while (!dropped_ && has_read_callback_) 00118 { 00119 ROS_ASSERT(read_buffer_); 00120 uint32_t to_read = read_size_ - read_filled_; 00121 if (to_read > 0) 00122 { 00123 int32_t bytes_read = transport_->read(read_buffer_.get() + read_filled_, to_read); 00124 ROS_DEBUG_NAMED("superdebug", "Connection read %d bytes", bytes_read); 00125 if (dropped_) 00126 { 00127 return; 00128 } 00129 else if (bytes_read < 0) 00130 { 00131 // Bad read, throw away results and report error 00132 ReadFinishedFunc callback; 00133 callback = read_callback_; 00134 read_callback_.clear(); 00135 read_buffer_.reset(); 00136 uint32_t size = read_size_; 00137 read_size_ = 0; 00138 read_filled_ = 0; 00139 has_read_callback_ = 0; 00140 00141 if (callback) 00142 { 00143 callback(shared_from_this(), read_buffer_, size, false); 00144 } 00145 00146 break; 00147 } 00148 00149 read_filled_ += bytes_read; 00150 } 00151 00152 ROS_ASSERT((int32_t)read_size_ >= 0); 00153 ROS_ASSERT((int32_t)read_filled_ >= 0); 00154 ROS_ASSERT_MSG(read_filled_ <= read_size_, "read_filled_ = %d, read_size_ = %d", read_filled_, read_size_); 00155 00156 if (read_filled_ == read_size_ && !dropped_) 00157 { 00158 ReadFinishedFunc callback; 00159 uint32_t size; 00160 boost::shared_array<uint8_t> buffer; 00161 00162 ROS_ASSERT(has_read_callback_); 00163 00164 // store off the read info in case another read() call is made from within the callback 00165 callback = read_callback_; 00166 size = read_size_; 00167 buffer = read_buffer_; 00168 read_callback_.clear(); 00169 read_buffer_.reset(); 00170 read_size_ = 0; 00171 read_filled_ = 0; 00172 has_read_callback_ = 0; 00173 00174 ROS_DEBUG_NAMED("superdebug", "Calling read callback"); 00175 callback(shared_from_this(), buffer, size, true); 00176 } 00177 else 00178 { 00179 break; 00180 } 00181 } 00182 00183 if (!has_read_callback_) 00184 { 00185 transport_->disableRead(); 00186 } 00187 00188 reading_ = false; 00189 } 00190 00191 void Connection::writeTransport() 00192 { 00193 boost::recursive_mutex::scoped_try_lock lock(write_mutex_); 00194 00195 if (!lock.owns_lock() || dropped_ || writing_) 00196 { 00197 return; 00198 } 00199 00200 writing_ = true; 00201 bool can_write_more = true; 00202 00203 while (has_write_callback_ && can_write_more && !dropped_) 00204 { 00205 uint32_t to_write = write_size_ - write_sent_; 00206 ROS_DEBUG_NAMED("superdebug", "Connection writing %d bytes", to_write); 00207 int32_t bytes_sent = transport_->write(write_buffer_.get() + write_sent_, to_write); 00208 ROS_DEBUG_NAMED("superdebug", "Connection wrote %d bytes", bytes_sent); 00209 00210 if (bytes_sent < 0) 00211 { 00212 writing_ = false; 00213 return; 00214 } 00215 00216 write_sent_ += bytes_sent; 00217 00218 if (bytes_sent < (int)write_size_ - (int)write_sent_) 00219 { 00220 can_write_more = false; 00221 } 00222 00223 if (write_sent_ == write_size_ && !dropped_) 00224 { 00225 WriteFinishedFunc callback; 00226 00227 { 00228 boost::mutex::scoped_lock lock(write_callback_mutex_); 00229 ROS_ASSERT(has_write_callback_); 00230 // Store off a copy of the callback in case another write() call happens in it 00231 callback = write_callback_; 00232 write_callback_ = WriteFinishedFunc(); 00233 write_buffer_ = boost::shared_array<uint8_t>(); 00234 write_sent_ = 0; 00235 write_size_ = 0; 00236 has_write_callback_ = 0; 00237 } 00238 00239 ROS_DEBUG_NAMED("superdebug", "Calling write callback"); 00240 callback(shared_from_this()); 00241 } 00242 } 00243 00244 { 00245 boost::mutex::scoped_lock lock(write_callback_mutex_); 00246 if (!has_write_callback_) 00247 { 00248 transport_->disableWrite(); 00249 } 00250 } 00251 00252 writing_ = false; 00253 } 00254 00255 void Connection::onWriteable(const TransportPtr& transport) 00256 { 00257 ROS_ASSERT(transport == transport_); 00258 00259 writeTransport(); 00260 } 00261 00262 void Connection::read(uint32_t size, const ReadFinishedFunc& callback) 00263 { 00264 if (dropped_ || sending_header_error_) 00265 { 00266 return; 00267 } 00268 00269 { 00270 boost::recursive_mutex::scoped_lock lock(read_mutex_); 00271 00272 ROS_ASSERT(!read_callback_); 00273 00274 read_callback_ = callback; 00275 read_buffer_ = boost::shared_array<uint8_t>(new uint8_t[size]); 00276 read_size_ = size; 00277 read_filled_ = 0; 00278 has_read_callback_ = 1; 00279 } 00280 00281 transport_->enableRead(); 00282 00283 // read immediately if possible 00284 readTransport(); 00285 } 00286 00287 void Connection::write(const boost::shared_array<uint8_t>& buffer, uint32_t size, const WriteFinishedFunc& callback, bool immediate) 00288 { 00289 if (dropped_ || sending_header_error_) 00290 { 00291 return; 00292 } 00293 00294 { 00295 boost::mutex::scoped_lock lock(write_callback_mutex_); 00296 00297 ROS_ASSERT(!write_callback_); 00298 00299 write_callback_ = callback; 00300 write_buffer_ = buffer; 00301 write_size_ = size; 00302 write_sent_ = 0; 00303 has_write_callback_ = 1; 00304 } 00305 00306 transport_->enableWrite(); 00307 00308 if (immediate) 00309 { 00310 // write immediately if possible 00311 writeTransport(); 00312 } 00313 } 00314 00315 void Connection::onDisconnect(const TransportPtr& transport) 00316 { 00317 ROS_ASSERT(transport == transport_); 00318 00319 drop(TransportDisconnect); 00320 } 00321 00322 void Connection::drop(DropReason reason) 00323 { 00324 ROSCPP_LOG_DEBUG("Connection::drop(%u)", reason); 00325 bool did_drop = false; 00326 { 00327 boost::recursive_mutex::scoped_lock lock(drop_mutex_); 00328 if (!dropped_) 00329 { 00330 dropped_ = true; 00331 did_drop = true; 00332 00333 drop_signal_(shared_from_this(), reason); 00334 } 00335 } 00336 00337 if (did_drop) 00338 { 00339 transport_->close(); 00340 } 00341 } 00342 00343 bool Connection::isDropped() 00344 { 00345 boost::recursive_mutex::scoped_lock lock(drop_mutex_); 00346 return dropped_; 00347 } 00348 00349 void Connection::writeHeader(const M_string& key_vals, const WriteFinishedFunc& finished_callback) 00350 { 00351 ROS_ASSERT(!header_written_callback_); 00352 header_written_callback_ = finished_callback; 00353 00354 if (!transport_->requiresHeader()) 00355 { 00356 onHeaderWritten(shared_from_this()); 00357 return; 00358 } 00359 00360 boost::shared_array<uint8_t> buffer; 00361 uint32_t len; 00362 Header::write(key_vals, buffer, len); 00363 00364 uint32_t msg_len = len + 4; 00365 boost::shared_array<uint8_t> full_msg(new uint8_t[msg_len]); 00366 memcpy(full_msg.get() + 4, buffer.get(), len); 00367 *((uint32_t*)full_msg.get()) = len; 00368 00369 write(full_msg, msg_len, boost::bind(&Connection::onHeaderWritten, this, _1), false); 00370 } 00371 00372 void Connection::sendHeaderError(const std::string& error_msg) 00373 { 00374 M_string m; 00375 m["error"] = error_msg; 00376 00377 writeHeader(m, boost::bind(&Connection::onErrorHeaderWritten, this, _1)); 00378 sending_header_error_ = true; 00379 } 00380 00381 void Connection::onHeaderLengthRead(const ConnectionPtr& conn, const boost::shared_array<uint8_t>& buffer, uint32_t size, bool success) 00382 { 00383 ROS_ASSERT(conn.get() == this); 00384 ROS_ASSERT(size == 4); 00385 00386 if (!success) 00387 return; 00388 00389 uint32_t len = *((uint32_t*)buffer.get()); 00390 00391 if (len > 1000000000) 00392 { 00393 ROS_ERROR("a header of over a gigabyte was " \ 00394 "predicted in tcpros. that seems highly " \ 00395 "unlikely, so I'll assume protocol " \ 00396 "synchronization is lost."); 00397 conn->drop(HeaderError); 00398 } 00399 00400 read(len, boost::bind(&Connection::onHeaderRead, this, _1, _2, _3, _4)); 00401 } 00402 00403 void Connection::onHeaderRead(const ConnectionPtr& conn, const boost::shared_array<uint8_t>& buffer, uint32_t size, bool success) 00404 { 00405 ROS_ASSERT(conn.get() == this); 00406 00407 if (!success) 00408 return; 00409 00410 std::string error_msg; 00411 if (!header_.parse(buffer, size, error_msg)) 00412 { 00413 drop(HeaderError); 00414 } 00415 else 00416 { 00417 std::string error_val; 00418 if (header_.getValue("error", error_val)) 00419 { 00420 ROSCPP_LOG_DEBUG("Received error message in header for connection to [%s]: [%s]", transport_->getTransportInfo().c_str(), error_val.c_str()); 00421 drop(HeaderError); 00422 } 00423 else 00424 { 00425 ROS_ASSERT(header_func_); 00426 00427 transport_->parseHeader(header_); 00428 00429 header_func_(conn, header_); 00430 } 00431 } 00432 00433 } 00434 00435 void Connection::onHeaderWritten(const ConnectionPtr& conn) 00436 { 00437 ROS_ASSERT(conn.get() == this); 00438 ROS_ASSERT(header_written_callback_); 00439 00440 header_written_callback_(conn); 00441 header_written_callback_ = WriteFinishedFunc(); 00442 } 00443 00444 void Connection::onErrorHeaderWritten(const ConnectionPtr& conn) 00445 { 00446 drop(HeaderError); 00447 } 00448 00449 void Connection::setHeaderReceivedCallback(const HeaderReceivedFunc& func) 00450 { 00451 header_func_ = func; 00452 00453 if (transport_->requiresHeader()) 00454 read(4, boost::bind(&Connection::onHeaderLengthRead, this, _1, _2, _3, _4)); 00455 } 00456 00457 std::string Connection::getCallerId() 00458 { 00459 std::string callerid; 00460 if (header_.getValue("callerid", callerid)) 00461 { 00462 return callerid; 00463 } 00464 00465 return std::string("unknown"); 00466 } 00467 00468 std::string Connection::getRemoteString() 00469 { 00470 std::stringstream ss; 00471 ss << "callerid=[" << getCallerId() << "] address=[" << transport_->getTransportInfo() << "]"; 00472 return ss.str(); 00473 } 00474 00475 }