Go to the documentation of this file.00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029 #include "ros/transport_subscriber_link.h"
00030 #include "ros/publication.h"
00031 #include "ros/header.h"
00032 #include "ros/connection.h"
00033 #include "ros/transport/transport.h"
00034 #include "ros/this_node.h"
00035 #include "ros/connection_manager.h"
00036 #include "ros/topic_manager.h"
00037 #include "ros/file_log.h"
00038
00039 #include <boost/bind.hpp>
00040
00041 namespace ros
00042 {
00043
00044 TransportSubscriberLink::TransportSubscriberLink()
00045 : writing_message_(false)
00046 , header_written_(false)
00047 , queue_full_(false)
00048 {
00049
00050 }
00051
00052 TransportSubscriberLink::~TransportSubscriberLink()
00053 {
00054 drop();
00055 }
00056
00057 bool TransportSubscriberLink::initialize(const ConnectionPtr& connection)
00058 {
00059 connection_ = connection;
00060 dropped_conn_ = connection_->addDropListener(boost::bind(&TransportSubscriberLink::onConnectionDropped, this, _1));
00061
00062 return true;
00063 }
00064
00065 bool TransportSubscriberLink::handleHeader(const Header& header)
00066 {
00067 std::string topic;
00068 if (!header.getValue("topic", topic))
00069 {
00070 std::string msg("Header from subscriber did not have the required element: topic");
00071
00072 ROS_ERROR("%s", msg.c_str());
00073 connection_->sendHeaderError(msg);
00074
00075 return false;
00076 }
00077
00078
00079 std::string client_callerid;
00080 header.getValue("callerid", client_callerid);
00081 PublicationPtr pt = TopicManager::instance()->lookupPublication(topic);
00082 if (!pt)
00083 {
00084 std::string msg = std::string("received a connection for a nonexistent topic [") +
00085 topic + std::string("] from [" + connection_->getTransport()->getTransportInfo() + "] [" + client_callerid +"].");
00086
00087 ROSCPP_LOG_DEBUG("%s", msg.c_str());
00088 connection_->sendHeaderError(msg);
00089
00090 return false;
00091 }
00092
00093 std::string error_msg;
00094 if (!pt->validateHeader(header, error_msg))
00095 {
00096 ROSCPP_LOG_DEBUG("%s", error_msg.c_str());
00097 connection_->sendHeaderError(error_msg);
00098
00099 return false;
00100 }
00101
00102 destination_caller_id_ = client_callerid;
00103 connection_id_ = ConnectionManager::instance()->getNewConnectionID();
00104 topic_ = pt->getName();
00105 parent_ = PublicationWPtr(pt);
00106
00107
00108 M_string m;
00109 m["type"] = pt->getDataType();
00110 m["md5sum"] = pt->getMD5Sum();
00111 m["message_definition"] = pt->getMessageDefinition();
00112 m["callerid"] = this_node::getName();
00113 m["latching"] = pt->isLatching() ? "1" : "0";
00114 connection_->writeHeader(m, boost::bind(&TransportSubscriberLink::onHeaderWritten, this, _1));
00115
00116 pt->addSubscriberLink(shared_from_this());
00117
00118 return true;
00119 }
00120
00121 void TransportSubscriberLink::onConnectionDropped(const ConnectionPtr& conn)
00122 {
00123 ROS_ASSERT(conn == connection_);
00124
00125 PublicationPtr parent = parent_.lock();
00126
00127 if (parent)
00128 {
00129 ROSCPP_LOG_DEBUG("Connection to subscriber [%s] to topic [%s] dropped", connection_->getRemoteString().c_str(), topic_.c_str());
00130
00131 parent->removeSubscriberLink(shared_from_this());
00132 }
00133 }
00134
00135 void TransportSubscriberLink::onHeaderWritten(const ConnectionPtr& conn)
00136 {
00137 header_written_ = true;
00138 startMessageWrite(true);
00139 }
00140
00141 void TransportSubscriberLink::onMessageWritten(const ConnectionPtr& conn)
00142 {
00143 writing_message_ = false;
00144 startMessageWrite(true);
00145 }
00146
00147 void TransportSubscriberLink::startMessageWrite(bool immediate_write)
00148 {
00149 boost::shared_array<uint8_t> dummy;
00150 SerializedMessage m(dummy, (uint32_t)0);
00151
00152 {
00153 boost::mutex::scoped_lock lock(outbox_mutex_);
00154 if (writing_message_ || !header_written_)
00155 {
00156 return;
00157 }
00158
00159 if (!outbox_.empty())
00160 {
00161 writing_message_ = true;
00162 m = outbox_.front();
00163 outbox_.pop();
00164 }
00165 }
00166
00167 if (m.num_bytes > 0)
00168 {
00169 connection_->write(m.buf, m.num_bytes, boost::bind(&TransportSubscriberLink::onMessageWritten, this, _1), immediate_write);
00170 }
00171 }
00172
00173 void TransportSubscriberLink::enqueueMessage(const SerializedMessage& m, bool ser, bool nocopy)
00174 {
00175 if (!ser)
00176 {
00177 return;
00178 }
00179
00180 {
00181 boost::mutex::scoped_lock lock(outbox_mutex_);
00182
00183 int max_queue = 0;
00184 if (PublicationPtr parent = parent_.lock())
00185 {
00186 max_queue = parent->getMaxQueue();
00187 }
00188
00189 ROS_DEBUG_NAMED("superdebug", "TransportSubscriberLink on topic [%s] to caller [%s], queueing message (queue size [%d])", topic_.c_str(), destination_caller_id_.c_str(), (int)outbox_.size());
00190
00191 if (max_queue > 0 && (int)outbox_.size() >= max_queue)
00192 {
00193 if (!queue_full_)
00194 {
00195 ROS_DEBUG("Outgoing queue full for topic [%s]. "
00196 "Discarding oldest message\n",
00197 topic_.c_str());
00198 }
00199
00200 outbox_.pop();
00201 queue_full_ = true;
00202 }
00203 else
00204 {
00205 queue_full_ = false;
00206 }
00207
00208 outbox_.push(m);
00209 }
00210
00211 startMessageWrite(false);
00212
00213 stats_.messages_sent_++;
00214 stats_.bytes_sent_ += m.num_bytes;
00215 stats_.message_data_sent_ += m.num_bytes;
00216 }
00217
00218 std::string TransportSubscriberLink::getTransportType()
00219 {
00220 return connection_->getTransport()->getType();
00221 }
00222
00223 void TransportSubscriberLink::drop()
00224 {
00225
00226
00227 if (connection_->isSendingHeaderError())
00228 {
00229 connection_->removeDropListener(dropped_conn_);
00230 }
00231 else
00232 {
00233 connection_->drop(Connection::Destructing);
00234 }
00235 }
00236
00237 }
roscpp
Author(s): Morgan Quigley, Josh Faust, Brian Gerkey, Troy Straszheim
autogenerated on Fri Aug 28 2015 12:33:11