Go to the documentation of this file.00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029 #include "ros/transport_subscriber_link.h"
00030 #include "ros/publication.h"
00031 #include "ros/header.h"
00032 #include "ros/connection.h"
00033 #include "ros/transport/transport.h"
00034 #include "ros/this_node.h"
00035 #include "ros/connection_manager.h"
00036 #include "ros/topic_manager.h"
00037 #include "ros/file_log.h"
00038
00039 #include <boost/bind.hpp>
00040
00041 namespace ros
00042 {
00043
00044 TransportSubscriberLink::TransportSubscriberLink()
00045 : writing_message_(false)
00046 , header_written_(false)
00047 , queue_full_(false)
00048 {
00049
00050 }
00051
00052 TransportSubscriberLink::~TransportSubscriberLink()
00053 {
00054 drop();
00055 }
00056
00057 bool TransportSubscriberLink::initialize(const ConnectionPtr& connection)
00058 {
00059 connection_ = connection;
00060 dropped_conn_ = connection_->addDropListener(boost::bind(&TransportSubscriberLink::onConnectionDropped, this, _1));
00061
00062 return true;
00063 }
00064
00065 bool TransportSubscriberLink::handleHeader(const Header& header)
00066 {
00067 std::string topic;
00068 if (!header.getValue("topic", topic))
00069 {
00070 std::string msg("Header from subscriber did not have the required element: topic");
00071
00072 ROS_ERROR("%s", msg.c_str());
00073 connection_->sendHeaderError(msg);
00074
00075 return false;
00076 }
00077
00078
00079 std::string client_callerid;
00080 header.getValue("callerid", client_callerid);
00081 PublicationPtr pt = TopicManager::instance()->lookupPublication(topic);
00082 if (!pt)
00083 {
00084 std::string msg = std::string("received a connection for a nonexistent topic [") +
00085 topic + std::string("] from [" + connection_->getTransport()->getTransportInfo() + "] [" + client_callerid +"].");
00086
00087 ROSCPP_LOG_DEBUG("%s", msg.c_str());
00088 connection_->sendHeaderError(msg);
00089
00090 return false;
00091 }
00092
00093 std::string error_msg;
00094 if (!pt->validateHeader(header, error_msg))
00095 {
00096 ROSCPP_LOG_DEBUG("%s", error_msg.c_str());
00097 connection_->sendHeaderError(error_msg);
00098
00099 return false;
00100 }
00101
00102 destination_caller_id_ = client_callerid;
00103 connection_id_ = ConnectionManager::instance()->getNewConnectionID();
00104 topic_ = pt->getName();
00105 parent_ = PublicationWPtr(pt);
00106
00107
00108 M_string m;
00109 m["type"] = pt->getDataType();
00110 m["md5sum"] = pt->getMD5Sum();
00111 m["message_definition"] = pt->getMessageDefinition();
00112 m["callerid"] = this_node::getName();
00113 m["latching"] = pt->isLatching() ? "1" : "0";
00114 m["topic"] = topic_;
00115 connection_->writeHeader(m, boost::bind(&TransportSubscriberLink::onHeaderWritten, this, _1));
00116
00117 pt->addSubscriberLink(shared_from_this());
00118
00119 return true;
00120 }
00121
00122 void TransportSubscriberLink::onConnectionDropped(const ConnectionPtr& conn)
00123 {
00124 ROS_ASSERT(conn == connection_);
00125
00126 PublicationPtr parent = parent_.lock();
00127
00128 if (parent)
00129 {
00130 ROSCPP_LOG_DEBUG("Connection to subscriber [%s] to topic [%s] dropped", connection_->getRemoteString().c_str(), topic_.c_str());
00131
00132 parent->removeSubscriberLink(shared_from_this());
00133 }
00134 }
00135
00136 void TransportSubscriberLink::onHeaderWritten(const ConnectionPtr& conn)
00137 {
00138 (void)conn;
00139 header_written_ = true;
00140 startMessageWrite(true);
00141 }
00142
00143 void TransportSubscriberLink::onMessageWritten(const ConnectionPtr& conn)
00144 {
00145 (void)conn;
00146 writing_message_ = false;
00147 startMessageWrite(true);
00148 }
00149
00150 void TransportSubscriberLink::startMessageWrite(bool immediate_write)
00151 {
00152 boost::shared_array<uint8_t> dummy;
00153 SerializedMessage m(dummy, (uint32_t)0);
00154
00155 {
00156 boost::mutex::scoped_lock lock(outbox_mutex_);
00157 if (writing_message_ || !header_written_)
00158 {
00159 return;
00160 }
00161
00162 if (!outbox_.empty())
00163 {
00164 writing_message_ = true;
00165 m = outbox_.front();
00166 outbox_.pop();
00167 }
00168 }
00169
00170 if (m.num_bytes > 0)
00171 {
00172 connection_->write(m.buf, m.num_bytes, boost::bind(&TransportSubscriberLink::onMessageWritten, this, _1), immediate_write);
00173 }
00174 }
00175
00176 void TransportSubscriberLink::enqueueMessage(const SerializedMessage& m, bool ser, bool nocopy)
00177 {
00178 (void)nocopy;
00179 if (!ser)
00180 {
00181 return;
00182 }
00183
00184 {
00185 boost::mutex::scoped_lock lock(outbox_mutex_);
00186
00187 int max_queue = 0;
00188 if (PublicationPtr parent = parent_.lock())
00189 {
00190 max_queue = parent->getMaxQueue();
00191 }
00192
00193 ROS_DEBUG_NAMED("superdebug", "TransportSubscriberLink on topic [%s] to caller [%s], queueing message (queue size [%d])", topic_.c_str(), destination_caller_id_.c_str(), (int)outbox_.size());
00194
00195 if (max_queue > 0 && (int)outbox_.size() >= max_queue)
00196 {
00197 if (!queue_full_)
00198 {
00199 ROS_DEBUG("Outgoing queue full for topic [%s]. "
00200 "Discarding oldest message\n",
00201 topic_.c_str());
00202 }
00203
00204 outbox_.pop();
00205 queue_full_ = true;
00206 }
00207 else
00208 {
00209 queue_full_ = false;
00210 }
00211
00212 outbox_.push(m);
00213 }
00214
00215 startMessageWrite(false);
00216
00217 stats_.messages_sent_++;
00218 stats_.bytes_sent_ += m.num_bytes;
00219 stats_.message_data_sent_ += m.num_bytes;
00220 }
00221
00222 std::string TransportSubscriberLink::getTransportType()
00223 {
00224 return connection_->getTransport()->getType();
00225 }
00226
00227 std::string TransportSubscriberLink::getTransportInfo()
00228 {
00229 return connection_->getTransport()->getTransportInfo();
00230 }
00231
00232 void TransportSubscriberLink::drop()
00233 {
00234
00235
00236 if (connection_->isSendingHeaderError())
00237 {
00238 connection_->removeDropListener(dropped_conn_);
00239 }
00240 else
00241 {
00242 connection_->drop(Connection::Destructing);
00243 }
00244 }
00245
00246 }
roscpp
Author(s): Morgan Quigley, Josh Faust, Brian Gerkey, Troy Straszheim
autogenerated on Thu Jun 6 2019 21:10:05