transport_subscriber_link.cpp
Go to the documentation of this file.
00001 
00002 /*
00003  * Copyright (C) 2008, Morgan Quigley and Willow Garage, Inc.
00004  *
00005  * Redistribution and use in source and binary forms, with or without
00006  * modification, are permitted provided that the following conditions are met:
00007  *   * Redistributions of source code must retain the above copyright notice,
00008  *     this list of conditions and the following disclaimer.
00009  *   * Redistributions in binary form must reproduce the above copyright
00010  *     notice, this list of conditions and the following disclaimer in the
00011  *     documentation and/or other materials provided with the distribution.
00012  *   * Neither the names of Stanford University or Willow Garage, Inc. nor the names of its
00013  *     contributors may be used to endorse or promote products derived from
00014  *     this software without specific prior written permission.
00015  *
00016  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
00017  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
00018  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
00019  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
00020  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
00021  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
00022  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
00023  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
00024  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
00025  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
00026  * POSSIBILITY OF SUCH DAMAGE.
00027  */
00028 
00029 #include "ros/transport_subscriber_link.h"
00030 #include "ros/publication.h"
00031 #include "ros/header.h"
00032 #include "ros/connection.h"
00033 #include "ros/transport/transport.h"
00034 #include "ros/this_node.h"
00035 #include "ros/connection_manager.h"
00036 #include "ros/topic_manager.h"
00037 #include "ros/file_log.h"
00038 
00039 #include <boost/bind.hpp>
00040 
00041 namespace ros
00042 {
00043 
00044 TransportSubscriberLink::TransportSubscriberLink()
00045 : writing_message_(false)
00046 , header_written_(false)
00047 , queue_full_(false)
00048 {
00049 
00050 }
00051 
00052 TransportSubscriberLink::~TransportSubscriberLink()
00053 {
00054   drop();
00055 }
00056 
00057 bool TransportSubscriberLink::initialize(const ConnectionPtr& connection)
00058 {
00059   connection_ = connection;
00060   dropped_conn_ = connection_->addDropListener(boost::bind(&TransportSubscriberLink::onConnectionDropped, this, _1));
00061 
00062   return true;
00063 }
00064 
00065 bool TransportSubscriberLink::handleHeader(const Header& header)
00066 {
00067   std::string topic;
00068   if (!header.getValue("topic", topic))
00069   {
00070     std::string msg("Header from subscriber did not have the required element: topic");
00071 
00072     ROS_ERROR("%s", msg.c_str());
00073     connection_->sendHeaderError(msg);
00074 
00075     return false;
00076   }
00077 
00078   // This will get validated by validateHeader below
00079   std::string client_callerid;
00080   header.getValue("callerid", client_callerid);
00081   PublicationPtr pt = TopicManager::instance()->lookupPublication(topic);
00082   if (!pt)
00083   {
00084     std::string msg = std::string("received a connection for a nonexistent topic [") +
00085                     topic + std::string("] from [" + connection_->getTransport()->getTransportInfo() + "] [" + client_callerid +"].");
00086 
00087     ROSCPP_LOG_DEBUG("%s", msg.c_str());
00088     connection_->sendHeaderError(msg);
00089 
00090     return false;
00091   }
00092 
00093   std::string error_msg;
00094   if (!pt->validateHeader(header, error_msg))
00095   {
00096     ROSCPP_LOG_DEBUG("%s", error_msg.c_str());
00097     connection_->sendHeaderError(error_msg);
00098 
00099     return false;
00100   }
00101 
00102   destination_caller_id_ = client_callerid;
00103   connection_id_ = ConnectionManager::instance()->getNewConnectionID();
00104   topic_ = pt->getName();
00105   parent_ = PublicationWPtr(pt);
00106 
00107   // Send back a success, with info
00108   M_string m;
00109   m["type"] = pt->getDataType();
00110   m["md5sum"] = pt->getMD5Sum();
00111   m["message_definition"] = pt->getMessageDefinition();
00112   m["callerid"] = this_node::getName();
00113   m["latching"] = pt->isLatching() ? "1" : "0";
00114   m["topic"] = topic_;
00115   connection_->writeHeader(m, boost::bind(&TransportSubscriberLink::onHeaderWritten, this, _1));
00116 
00117   pt->addSubscriberLink(shared_from_this());
00118 
00119   return true;
00120 }
00121 
00122 void TransportSubscriberLink::onConnectionDropped(const ConnectionPtr& conn)
00123 {
00124   ROS_ASSERT(conn == connection_);
00125 
00126   PublicationPtr parent = parent_.lock();
00127 
00128   if (parent)
00129   {
00130     ROSCPP_LOG_DEBUG("Connection to subscriber [%s] to topic [%s] dropped", connection_->getRemoteString().c_str(), topic_.c_str());
00131 
00132     parent->removeSubscriberLink(shared_from_this());
00133   }
00134 }
00135 
00136 void TransportSubscriberLink::onHeaderWritten(const ConnectionPtr& conn)
00137 {
00138   (void)conn;
00139   header_written_ = true;
00140   startMessageWrite(true);
00141 }
00142 
00143 void TransportSubscriberLink::onMessageWritten(const ConnectionPtr& conn)
00144 {
00145   (void)conn;
00146   writing_message_ = false;
00147   startMessageWrite(true);
00148 }
00149 
00150 void TransportSubscriberLink::startMessageWrite(bool immediate_write)
00151 {
00152   boost::shared_array<uint8_t> dummy;
00153   SerializedMessage m(dummy, (uint32_t)0);
00154 
00155   {
00156     boost::mutex::scoped_lock lock(outbox_mutex_);
00157     if (writing_message_ || !header_written_)
00158     {
00159       return;
00160     }
00161 
00162     if (!outbox_.empty())
00163     {
00164       writing_message_ = true;
00165       m = outbox_.front();
00166       outbox_.pop();
00167     }
00168   }
00169 
00170   if (m.num_bytes > 0)
00171   {
00172     connection_->write(m.buf, m.num_bytes, boost::bind(&TransportSubscriberLink::onMessageWritten, this, _1), immediate_write);
00173   }
00174 }
00175 
00176 void TransportSubscriberLink::enqueueMessage(const SerializedMessage& m, bool ser, bool nocopy)
00177 {
00178   (void)nocopy;
00179   if (!ser)
00180   {
00181     return;
00182   }
00183 
00184   {
00185     boost::mutex::scoped_lock lock(outbox_mutex_);
00186 
00187     int max_queue = 0;
00188     if (PublicationPtr parent = parent_.lock())
00189     {
00190       max_queue = parent->getMaxQueue();
00191     }
00192 
00193     ROS_DEBUG_NAMED("superdebug", "TransportSubscriberLink on topic [%s] to caller [%s], queueing message (queue size [%d])", topic_.c_str(), destination_caller_id_.c_str(), (int)outbox_.size());
00194 
00195     if (max_queue > 0 && (int)outbox_.size() >= max_queue)
00196     {
00197       if (!queue_full_)
00198       {
00199         ROS_DEBUG("Outgoing queue full for topic [%s].  "
00200                "Discarding oldest message\n",
00201                topic_.c_str());
00202       }
00203 
00204       outbox_.pop(); // toss out the oldest thing in the queue to make room for us
00205       queue_full_ = true;
00206     }
00207     else
00208     {
00209       queue_full_ = false;
00210     }
00211 
00212     outbox_.push(m);
00213   }
00214 
00215   startMessageWrite(false);
00216 
00217   stats_.messages_sent_++;
00218   stats_.bytes_sent_ += m.num_bytes;
00219   stats_.message_data_sent_ += m.num_bytes;
00220 }
00221 
00222 std::string TransportSubscriberLink::getTransportType()
00223 {
00224   return connection_->getTransport()->getType();
00225 }
00226 
00227 std::string TransportSubscriberLink::getTransportInfo()
00228 {
00229   return connection_->getTransport()->getTransportInfo();
00230 }
00231 
00232 void TransportSubscriberLink::drop()
00233 {
00234   // Only drop the connection if it's not already sending a header error
00235   // If it is, it will automatically drop itself
00236   if (connection_->isSendingHeaderError())
00237   {
00238     connection_->removeDropListener(dropped_conn_);
00239   }
00240   else
00241   {
00242     connection_->drop(Connection::Destructing);
00243   }
00244 }
00245 
00246 } // namespace ros


roscpp
Author(s): Morgan Quigley, Josh Faust, Brian Gerkey, Troy Straszheim
autogenerated on Thu Jun 6 2019 21:10:05