transport_subscriber_link.cpp
Go to the documentation of this file.
00001 
00002 /*
00003  * Copyright (C) 2008, Morgan Quigley and Willow Garage, Inc.
00004  *
00005  * Redistribution and use in source and binary forms, with or without
00006  * modification, are permitted provided that the following conditions are met:
00007  *   * Redistributions of source code must retain the above copyright notice,
00008  *     this list of conditions and the following disclaimer.
00009  *   * Redistributions in binary form must reproduce the above copyright
00010  *     notice, this list of conditions and the following disclaimer in the
00011  *     documentation and/or other materials provided with the distribution.
00012  *   * Neither the names of Stanford University or Willow Garage, Inc. nor the names of its
00013  *     contributors may be used to endorse or promote products derived from
00014  *     this software without specific prior written permission.
00015  *
00016  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
00017  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
00018  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
00019  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
00020  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
00021  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
00022  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
00023  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
00024  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
00025  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
00026  * POSSIBILITY OF SUCH DAMAGE.
00027  */
00028 
00029 #include "ros/transport_subscriber_link.h"
00030 #include "ros/publication.h"
00031 #include "ros/header.h"
00032 #include "ros/connection.h"
00033 #include "ros/transport/transport.h"
00034 #include "ros/this_node.h"
00035 #include "ros/connection_manager.h"
00036 #include "ros/topic_manager.h"
00037 #include "ros/file_log.h"
00038 
00039 #include <boost/bind.hpp>
00040 
00041 namespace ros
00042 {
00043 
00044 TransportSubscriberLink::TransportSubscriberLink()
00045 : writing_message_(false)
00046 , header_written_(false)
00047 , queue_full_(false)
00048 {
00049 
00050 }
00051 
00052 TransportSubscriberLink::~TransportSubscriberLink()
00053 {
00054   drop();
00055 }
00056 
00057 bool TransportSubscriberLink::initialize(const ConnectionPtr& connection)
00058 {
00059   connection_ = connection;
00060   dropped_conn_ = connection_->addDropListener(boost::bind(&TransportSubscriberLink::onConnectionDropped, this, _1));
00061 
00062   return true;
00063 }
00064 
00065 bool TransportSubscriberLink::handleHeader(const Header& header)
00066 {
00067   std::string topic;
00068   if (!header.getValue("topic", topic))
00069   {
00070     std::string msg("Header from subscriber did not have the required element: topic");
00071 
00072     ROS_ERROR("%s", msg.c_str());
00073     connection_->sendHeaderError(msg);
00074 
00075     return false;
00076   }
00077 
00078   // This will get validated by validateHeader below
00079   std::string client_callerid;
00080   header.getValue("callerid", client_callerid);
00081   PublicationPtr pt = TopicManager::instance()->lookupPublication(topic);
00082   if (!pt)
00083   {
00084     std::string msg = std::string("received a connection for a nonexistent topic [") +
00085                     topic + std::string("] from [" + connection_->getTransport()->getTransportInfo() + "] [" + client_callerid +"].");
00086 
00087     ROSCPP_LOG_DEBUG("%s", msg.c_str());
00088     connection_->sendHeaderError(msg);
00089 
00090     return false;
00091   }
00092 
00093   std::string error_msg;
00094   if (!pt->validateHeader(header, error_msg))
00095   {
00096     ROSCPP_LOG_DEBUG("%s", error_msg.c_str());
00097     connection_->sendHeaderError(error_msg);
00098 
00099     return false;
00100   }
00101 
00102   destination_caller_id_ = client_callerid;
00103   connection_id_ = ConnectionManager::instance()->getNewConnectionID();
00104   topic_ = pt->getName();
00105   parent_ = PublicationWPtr(pt);
00106 
00107   // Send back a success, with info
00108   M_string m;
00109   m["type"] = pt->getDataType();
00110   m["md5sum"] = pt->getMD5Sum();
00111   m["message_definition"] = pt->getMessageDefinition();
00112   m["callerid"] = this_node::getName();
00113   m["latching"] = pt->isLatching() ? "1" : "0";
00114   connection_->writeHeader(m, boost::bind(&TransportSubscriberLink::onHeaderWritten, this, _1));
00115 
00116   pt->addSubscriberLink(shared_from_this());
00117 
00118   return true;
00119 }
00120 
00121 void TransportSubscriberLink::onConnectionDropped(const ConnectionPtr& conn)
00122 {
00123   ROS_ASSERT(conn == connection_);
00124 
00125   PublicationPtr parent = parent_.lock();
00126 
00127   if (parent)
00128   {
00129     ROSCPP_LOG_DEBUG("Connection to subscriber [%s] to topic [%s] dropped", connection_->getRemoteString().c_str(), topic_.c_str());
00130 
00131     parent->removeSubscriberLink(shared_from_this());
00132   }
00133 }
00134 
00135 void TransportSubscriberLink::onHeaderWritten(const ConnectionPtr& conn)
00136 {
00137   header_written_ = true;
00138   startMessageWrite(true);
00139 }
00140 
00141 void TransportSubscriberLink::onMessageWritten(const ConnectionPtr& conn)
00142 {
00143   writing_message_ = false;
00144   startMessageWrite(true);
00145 }
00146 
00147 void TransportSubscriberLink::startMessageWrite(bool immediate_write)
00148 {
00149   boost::shared_array<uint8_t> dummy;
00150   SerializedMessage m(dummy, (uint32_t)0);
00151 
00152   {
00153     boost::mutex::scoped_lock lock(outbox_mutex_);
00154     if (writing_message_ || !header_written_)
00155     {
00156       return;
00157     }
00158 
00159     if (!outbox_.empty())
00160     {
00161       writing_message_ = true;
00162       m = outbox_.front();
00163       outbox_.pop();
00164     }
00165   }
00166 
00167   if (m.num_bytes > 0)
00168   {
00169     connection_->write(m.buf, m.num_bytes, boost::bind(&TransportSubscriberLink::onMessageWritten, this, _1), immediate_write);
00170   }
00171 }
00172 
00173 void TransportSubscriberLink::enqueueMessage(const SerializedMessage& m, bool ser, bool nocopy)
00174 {
00175   if (!ser)
00176   {
00177     return;
00178   }
00179 
00180   {
00181     boost::mutex::scoped_lock lock(outbox_mutex_);
00182 
00183     int max_queue = 0;
00184     if (PublicationPtr parent = parent_.lock())
00185     {
00186       max_queue = parent->getMaxQueue();
00187     }
00188 
00189     ROS_DEBUG_NAMED("superdebug", "TransportSubscriberLink on topic [%s] to caller [%s], queueing message (queue size [%d])", topic_.c_str(), destination_caller_id_.c_str(), (int)outbox_.size());
00190 
00191     if (max_queue > 0 && (int)outbox_.size() >= max_queue)
00192     {
00193       if (!queue_full_)
00194       {
00195         ROS_DEBUG("Outgoing queue full for topic [%s].  "
00196                "Discarding oldest message\n",
00197                topic_.c_str());
00198       }
00199 
00200       outbox_.pop(); // toss out the oldest thing in the queue to make room for us
00201       queue_full_ = true;
00202     }
00203     else
00204     {
00205       queue_full_ = false;
00206     }
00207 
00208     outbox_.push(m);
00209   }
00210 
00211   startMessageWrite(false);
00212 
00213   stats_.messages_sent_++;
00214   stats_.bytes_sent_ += m.num_bytes;
00215   stats_.message_data_sent_ += m.num_bytes;
00216 }
00217 
00218 std::string TransportSubscriberLink::getTransportType()
00219 {
00220   return connection_->getTransport()->getType();
00221 }
00222 
00223 void TransportSubscriberLink::drop()
00224 {
00225   // Only drop the connection if it's not already sending a header error
00226   // If it is, it will automatically drop itself
00227   if (connection_->isSendingHeaderError())
00228   {
00229     connection_->removeDropListener(dropped_conn_);
00230   }
00231   else
00232   {
00233     connection_->drop(Connection::Destructing);
00234   }
00235 }
00236 
00237 } // namespace ros


roscpp
Author(s): Morgan Quigley mquigley@cs.stanford.edu, Josh Faust jfaust@willowgarage.com, Brian Gerkey gerkey@willowgarage.com, Troy Straszheim straszheim@willowgarage.com
autogenerated on Sat Dec 28 2013 17:35:52