connection.cpp
Go to the documentation of this file.
00001 /*
00002  * Software License Agreement (BSD License)
00003  *
00004  *  Copyright (c) 2008, Willow Garage, Inc.
00005  *  All rights reserved.
00006  *
00007  *  Redistribution and use in source and binary forms, with or without
00008  *  modification, are permitted provided that the following conditions
00009  *  are met:
00010  *
00011  *   * Redistributions of source code must retain the above copyright
00012  *     notice, this list of conditions and the following disclaimer.
00013  *   * Redistributions in binary form must reproduce the above
00014  *     copyright notice, this list of conditions and the following
00015  *     disclaimer in the documentation and/or other materials provided
00016  *     with the distribution.
00017  *   * Neither the name of Willow Garage, Inc. nor the names of its
00018  *     contributors may be used to endorse or promote products derived
00019  *     from this software without specific prior written permission.
00020  *
00021  *  THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
00022  *  "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
00023  *  LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
00024  *  FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
00025  *  COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
00026  *  INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
00027  *  BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
00028  *  LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
00029  *  CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
00030  *  LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
00031  *  ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
00032  *  POSSIBILITY OF SUCH DAMAGE.
00033  */
00034 
00035 #include "ros/connection.h"
00036 #include "ros/transport/transport.h"
00037 #include "ros/file_log.h"
00038 
00039 #include <ros/assert.h>
00040 
00041 #include <boost/shared_array.hpp>
00042 #include <boost/bind.hpp>
00043 
00044 namespace ros
00045 {
00046 
00047 Connection::Connection()
00048 : is_server_(false)
00049 , dropped_(false)
00050 , read_filled_(0)
00051 , read_size_(0)
00052 , reading_(false)
00053 , has_read_callback_(0)
00054 , write_sent_(0)
00055 , write_size_(0)
00056 , writing_(false)
00057 , has_write_callback_(0)
00058 , sending_header_error_(false)
00059 {
00060 }
00061 
00062 Connection::~Connection()
00063 {
00064   ROS_DEBUG_NAMED("superdebug", "Connection destructing, dropped=%s", dropped_ ? "true" : "false");
00065 
00066   drop(Destructing);
00067 }
00068 
00069 void Connection::initialize(const TransportPtr& transport, bool is_server, const HeaderReceivedFunc& header_func)
00070 {
00071   ROS_ASSERT(transport);
00072 
00073   transport_ = transport;
00074   header_func_ = header_func;
00075   is_server_ = is_server;
00076 
00077   transport_->setReadCallback(boost::bind(&Connection::onReadable, this, _1));
00078   transport_->setWriteCallback(boost::bind(&Connection::onWriteable, this, _1));
00079   transport_->setDisconnectCallback(boost::bind(&Connection::onDisconnect, this, _1));
00080 
00081   if (header_func)
00082   {
00083     read(4, boost::bind(&Connection::onHeaderLengthRead, this, _1, _2, _3, _4));
00084   }
00085 }
00086 
00087 boost::signals::connection Connection::addDropListener(const DropFunc& slot)
00088 {
00089   boost::recursive_mutex::scoped_lock lock(drop_mutex_);
00090   return drop_signal_.connect(slot);
00091 }
00092 
00093 void Connection::removeDropListener(const boost::signals::connection& c)
00094 {
00095   boost::recursive_mutex::scoped_lock lock(drop_mutex_);
00096   c.disconnect();
00097 }
00098 
00099 void Connection::onReadable(const TransportPtr& transport)
00100 {
00101   ROS_ASSERT(transport == transport_);
00102 
00103   readTransport();
00104 }
00105 
00106 void Connection::readTransport()
00107 {
00108   boost::recursive_mutex::scoped_try_lock lock(read_mutex_);
00109 
00110   if (!lock.owns_lock() || dropped_ || reading_)
00111   {
00112     return;
00113   }
00114 
00115   reading_ = true;
00116 
00117   while (!dropped_ && has_read_callback_)
00118   {
00119     ROS_ASSERT(read_buffer_);
00120     uint32_t to_read = read_size_ - read_filled_;
00121     if (to_read > 0)
00122     {
00123       int32_t bytes_read = transport_->read(read_buffer_.get() + read_filled_, to_read);
00124       ROS_DEBUG_NAMED("superdebug", "Connection read %d bytes", bytes_read);
00125       if (dropped_)
00126       {
00127         return;
00128       }
00129       else if (bytes_read < 0)
00130       {
00131         // Bad read, throw away results and report error
00132         ReadFinishedFunc callback;
00133         callback = read_callback_;
00134         read_callback_.clear();
00135         read_buffer_.reset();
00136         uint32_t size = read_size_;
00137         read_size_ = 0;
00138         read_filled_ = 0;
00139         has_read_callback_ = 0;
00140 
00141         if (callback)
00142         {
00143           callback(shared_from_this(), read_buffer_, size, false);
00144         }
00145 
00146         break;
00147       }
00148 
00149       read_filled_ += bytes_read;
00150     }
00151 
00152     ROS_ASSERT((int32_t)read_size_ >= 0);
00153     ROS_ASSERT((int32_t)read_filled_ >= 0);
00154     ROS_ASSERT_MSG(read_filled_ <= read_size_, "read_filled_ = %d, read_size_ = %d", read_filled_, read_size_);
00155 
00156     if (read_filled_ == read_size_ && !dropped_)
00157     {
00158       ReadFinishedFunc callback;
00159       uint32_t size;
00160       boost::shared_array<uint8_t> buffer;
00161 
00162       ROS_ASSERT(has_read_callback_);
00163 
00164       // store off the read info in case another read() call is made from within the callback
00165       callback = read_callback_;
00166       size = read_size_;
00167       buffer = read_buffer_;
00168       read_callback_.clear();
00169       read_buffer_.reset();
00170       read_size_ = 0;
00171       read_filled_ = 0;
00172       has_read_callback_ = 0;
00173 
00174       ROS_DEBUG_NAMED("superdebug", "Calling read callback");
00175       callback(shared_from_this(), buffer, size, true);
00176     }
00177     else
00178     {
00179       break;
00180     }
00181   }
00182 
00183   if (!has_read_callback_)
00184   {
00185     transport_->disableRead();
00186   }
00187 
00188   reading_ = false;
00189 }
00190 
00191 void Connection::writeTransport()
00192 {
00193   boost::recursive_mutex::scoped_try_lock lock(write_mutex_);
00194 
00195   if (!lock.owns_lock() || dropped_ || writing_)
00196   {
00197     return;
00198   }
00199 
00200   writing_ = true;
00201   bool can_write_more = true;
00202 
00203   while (has_write_callback_ && can_write_more && !dropped_)
00204   {
00205     uint32_t to_write = write_size_ - write_sent_;
00206     ROS_DEBUG_NAMED("superdebug", "Connection writing %d bytes", to_write);
00207     int32_t bytes_sent = transport_->write(write_buffer_.get() + write_sent_, to_write);
00208     ROS_DEBUG_NAMED("superdebug", "Connection wrote %d bytes", bytes_sent);
00209 
00210     if (bytes_sent < 0)
00211     {
00212       writing_ = false;
00213       return;
00214     }
00215 
00216     write_sent_ += bytes_sent;
00217 
00218     if (bytes_sent < (int)write_size_ - (int)write_sent_)
00219     {
00220       can_write_more = false;
00221     }
00222 
00223     if (write_sent_ == write_size_ && !dropped_)
00224     {
00225       WriteFinishedFunc callback;
00226 
00227       {
00228         boost::mutex::scoped_lock lock(write_callback_mutex_);
00229         ROS_ASSERT(has_write_callback_);
00230         // Store off a copy of the callback in case another write() call happens in it
00231         callback = write_callback_;
00232         write_callback_ = WriteFinishedFunc();
00233         write_buffer_ = boost::shared_array<uint8_t>();
00234         write_sent_ = 0;
00235         write_size_ = 0;
00236         has_write_callback_ = 0;
00237       }
00238 
00239       ROS_DEBUG_NAMED("superdebug", "Calling write callback");
00240       callback(shared_from_this());
00241     }
00242   }
00243 
00244   {
00245     boost::mutex::scoped_lock lock(write_callback_mutex_);
00246     if (!has_write_callback_)
00247     {
00248       transport_->disableWrite();
00249     }
00250   }
00251 
00252   writing_ = false;
00253 }
00254 
00255 void Connection::onWriteable(const TransportPtr& transport)
00256 {
00257   ROS_ASSERT(transport == transport_);
00258 
00259   writeTransport();
00260 }
00261 
00262 void Connection::read(uint32_t size, const ReadFinishedFunc& callback)
00263 {
00264   if (dropped_ || sending_header_error_)
00265   {
00266     return;
00267   }
00268 
00269   {
00270     boost::recursive_mutex::scoped_lock lock(read_mutex_);
00271 
00272     ROS_ASSERT(!read_callback_);
00273 
00274     read_callback_ = callback;
00275     read_buffer_ = boost::shared_array<uint8_t>(new uint8_t[size]);
00276     read_size_ = size;
00277     read_filled_ = 0;
00278     has_read_callback_ = 1;
00279   }
00280 
00281   transport_->enableRead();
00282 
00283   // read immediately if possible
00284   readTransport();
00285 }
00286 
00287 void Connection::write(const boost::shared_array<uint8_t>& buffer, uint32_t size, const WriteFinishedFunc& callback, bool immediate)
00288 {
00289   if (dropped_ || sending_header_error_)
00290   {
00291     return;
00292   }
00293 
00294   {
00295     boost::mutex::scoped_lock lock(write_callback_mutex_);
00296 
00297     ROS_ASSERT(!write_callback_);
00298 
00299     write_callback_ = callback;
00300     write_buffer_ = buffer;
00301     write_size_ = size;
00302     write_sent_ = 0;
00303     has_write_callback_ = 1;
00304   }
00305 
00306   transport_->enableWrite();
00307 
00308   if (immediate)
00309   {
00310     // write immediately if possible
00311     writeTransport();
00312   }
00313 }
00314 
00315 void Connection::onDisconnect(const TransportPtr& transport)
00316 {
00317   ROS_ASSERT(transport == transport_);
00318 
00319   drop(TransportDisconnect);
00320 }
00321 
00322 void Connection::drop(DropReason reason)
00323 {
00324   ROSCPP_LOG_DEBUG("Connection::drop(%u)", reason);
00325   bool did_drop = false;
00326   {
00327     boost::recursive_mutex::scoped_lock lock(drop_mutex_);
00328     if (!dropped_)
00329     {
00330       dropped_ = true;
00331       did_drop = true;
00332 
00333       drop_signal_(shared_from_this(), reason);
00334     }
00335   }
00336 
00337   if (did_drop)
00338   {
00339     transport_->close();
00340   }
00341 }
00342 
00343 bool Connection::isDropped()
00344 {
00345   boost::recursive_mutex::scoped_lock lock(drop_mutex_);
00346   return dropped_;
00347 }
00348 
00349 void Connection::writeHeader(const M_string& key_vals, const WriteFinishedFunc& finished_callback)
00350 {
00351   ROS_ASSERT(!header_written_callback_);
00352   header_written_callback_ = finished_callback;
00353 
00354   if (!transport_->requiresHeader())
00355   {
00356     onHeaderWritten(shared_from_this());
00357     return;
00358   }
00359 
00360   boost::shared_array<uint8_t> buffer;
00361   uint32_t len;
00362   Header::write(key_vals, buffer, len);
00363 
00364   uint32_t msg_len = len + 4;
00365   boost::shared_array<uint8_t> full_msg(new uint8_t[msg_len]);
00366   memcpy(full_msg.get() + 4, buffer.get(), len);
00367   *((uint32_t*)full_msg.get()) = len;
00368 
00369   write(full_msg, msg_len, boost::bind(&Connection::onHeaderWritten, this, _1), false);
00370 }
00371 
00372 void Connection::sendHeaderError(const std::string& error_msg)
00373 {
00374   M_string m;
00375   m["error"] = error_msg;
00376 
00377   writeHeader(m, boost::bind(&Connection::onErrorHeaderWritten, this, _1));
00378   sending_header_error_ = true;
00379 }
00380 
00381 void Connection::onHeaderLengthRead(const ConnectionPtr& conn, const boost::shared_array<uint8_t>& buffer, uint32_t size, bool success)
00382 {
00383   ROS_ASSERT(conn.get() == this);
00384   ROS_ASSERT(size == 4);
00385 
00386   if (!success)
00387     return;
00388 
00389   uint32_t len = *((uint32_t*)buffer.get());
00390 
00391   if (len > 1000000000)
00392   {
00393     ROS_ERROR("a header of over a gigabyte was " \
00394                 "predicted in tcpros. that seems highly " \
00395                 "unlikely, so I'll assume protocol " \
00396                 "synchronization is lost.");
00397     conn->drop(HeaderError);
00398   }
00399 
00400   read(len, boost::bind(&Connection::onHeaderRead, this, _1, _2, _3, _4));
00401 }
00402 
00403 void Connection::onHeaderRead(const ConnectionPtr& conn, const boost::shared_array<uint8_t>& buffer, uint32_t size, bool success)
00404 {
00405   ROS_ASSERT(conn.get() == this);
00406 
00407   if (!success)
00408     return;
00409 
00410   std::string error_msg;
00411   if (!header_.parse(buffer, size, error_msg))
00412   {
00413     drop(HeaderError);
00414   }
00415   else
00416   {
00417     std::string error_val;
00418     if (header_.getValue("error", error_val))
00419     {
00420       ROSCPP_LOG_DEBUG("Received error message in header for connection to [%s]: [%s]", transport_->getTransportInfo().c_str(), error_val.c_str());
00421       drop(HeaderError);
00422     }
00423     else
00424     {
00425       ROS_ASSERT(header_func_);
00426 
00427       transport_->parseHeader(header_);
00428 
00429       header_func_(conn, header_);
00430     }
00431   }
00432 
00433 }
00434 
00435 void Connection::onHeaderWritten(const ConnectionPtr& conn)
00436 {
00437   ROS_ASSERT(conn.get() == this);
00438   ROS_ASSERT(header_written_callback_);
00439 
00440   header_written_callback_(conn);
00441   header_written_callback_ = WriteFinishedFunc();
00442 }
00443 
00444 void Connection::onErrorHeaderWritten(const ConnectionPtr& conn)
00445 {
00446   drop(HeaderError);
00447 }
00448 
00449 void Connection::setHeaderReceivedCallback(const HeaderReceivedFunc& func)
00450 {
00451   header_func_ = func;
00452 
00453   if (transport_->requiresHeader())
00454     read(4, boost::bind(&Connection::onHeaderLengthRead, this, _1, _2, _3, _4));
00455 }
00456 
00457 std::string Connection::getCallerId()
00458 {
00459   std::string callerid;
00460   if (header_.getValue("callerid", callerid))
00461   {
00462     return callerid;
00463   }
00464 
00465   return std::string("unknown");
00466 }
00467 
00468 std::string Connection::getRemoteString()
00469 {
00470   std::stringstream ss;
00471   ss << "callerid=[" << getCallerId() << "] address=[" << transport_->getTransportInfo() << "]";
00472   return ss.str();
00473 }
00474 
00475 }


roscpp
Author(s): Morgan Quigley, Josh Faust, Brian Gerkey, Troy Straszheim
autogenerated on Fri Aug 28 2015 12:33:10