00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029 #include "ros/transport_subscriber_link.h"
00030 #include "ros/publication.h"
00031 #include "ros/header.h"
00032 #include "ros/connection.h"
00033 #include "ros/transport/transport.h"
00034 #include "ros/this_node.h"
00035 #include "ros/connection_manager.h"
00036 #include "ros/topic_manager.h"
00037 #include "ros/file_log.h"
00038
00039 #include <boost/bind.hpp>
00040
00041 namespace ros
00042 {
00043
00044 TransportSubscriberLink::TransportSubscriberLink()
00045 : writing_message_(false)
00046 , header_written_(false)
00047 , queue_full_(false)
00048 {
00049
00050 }
00051
00052 TransportSubscriberLink::~TransportSubscriberLink()
00053 {
00054 drop();
00055 }
00056
00057 bool TransportSubscriberLink::initialize(const ConnectionPtr& connection)
00058 {
00059 connection_ = connection;
00060 dropped_conn_ = connection_->addDropListener(boost::bind(&TransportSubscriberLink::onConnectionDropped, this, _1));
00061
00062 return true;
00063 }
00064
00065 bool TransportSubscriberLink::handleHeader(const Header& header)
00066 {
00067 std::string topic;
00068 if (!header.getValue("topic", topic))
00069 {
00070 std::string msg("Header from subscriber did not have the required element: topic");
00071
00072 ROS_ERROR("%s", msg.c_str());
00073 connection_->sendHeaderError(msg);
00074
00075 return false;
00076 }
00077
00078
00079 std::string client_callerid;
00080 header.getValue("callerid", client_callerid);
00081 PublicationPtr pt = TopicManager::instance()->lookupPublication(topic);
00082 if (!pt)
00083 {
00084 std::string msg = std::string("received a connection for a nonexistent topic [") +
00085 topic + std::string("] from [" + connection_->getTransport()->getTransportInfo() + "] [" + client_callerid +"].");
00086
00087 ROSCPP_LOG_DEBUG("%s", msg.c_str());
00088 connection_->sendHeaderError(msg);
00089
00090 return false;
00091 }
00092
00093 std::string error_msg;
00094 if (!pt->validateHeader(header, error_msg))
00095 {
00096 ROSCPP_LOG_DEBUG("%s", error_msg.c_str());
00097 connection_->sendHeaderError(error_msg);
00098
00099 return false;
00100 }
00101
00102 destination_caller_id_ = client_callerid;
00103 connection_id_ = ConnectionManager::instance()->getNewConnectionID();
00104 topic_ = pt->getName();
00105 parent_ = PublicationWPtr(pt);
00106
00107
00108 M_string m;
00109 m["type"] = pt->getDataType();
00110 m["md5sum"] = pt->getMD5Sum();
00111 m["message_definition"] = pt->getMessageDefinition();
00112 m["callerid"] = this_node::getName();
00113 m["latching"] = pt->isLatching() ? "1" : "0";
00114 connection_->writeHeader(m, boost::bind(&TransportSubscriberLink::onHeaderWritten, this, _1));
00115
00116 pt->addSubscriberLink(shared_from_this());
00117
00118 return true;
00119 }
00120
00121 void TransportSubscriberLink::onConnectionDropped(const ConnectionPtr& conn)
00122 {
00123 ROS_ASSERT(conn == connection_);
00124
00125 PublicationPtr parent = parent_.lock();
00126
00127 if (parent)
00128 {
00129 ROSCPP_LOG_DEBUG("Connection to subscriber [%s] to topic [%s] dropped", connection_->getRemoteString().c_str(), topic_.c_str());
00130
00131 parent->removeSubscriberLink(shared_from_this());
00132 }
00133 }
00134
00135 void TransportSubscriberLink::onHeaderWritten(const ConnectionPtr& conn)
00136 {
00137 header_written_ = true;
00138 startMessageWrite(true);
00139 }
00140
00141 void TransportSubscriberLink::onMessageWritten(const ConnectionPtr& conn)
00142 {
00143 writing_message_ = false;
00144 startMessageWrite(true);
00145 }
00146
00147 void TransportSubscriberLink::startMessageWrite(bool immediate_write)
00148 {
00149 boost::shared_array<uint8_t> dummy;
00150 SerializedMessage m(dummy, (uint32_t)0);
00151
00152 {
00153 boost::mutex::scoped_lock lock(outbox_mutex_);
00154 if (writing_message_ || !header_written_)
00155 {
00156 return;
00157 }
00158
00159 if (!outbox_.empty())
00160 {
00161 writing_message_ = true;
00162 m = outbox_.front();
00163 outbox_.pop();
00164 }
00165 }
00166
00167 if (m.num_bytes > 0)
00168 {
00169 connection_->write(m.buf, m.num_bytes, boost::bind(&TransportSubscriberLink::onMessageWritten, this, _1), immediate_write);
00170 }
00171 }
00172
00173 void TransportSubscriberLink::enqueueMessage(const SerializedMessage& m, bool ser, bool nocopy)
00174 {
00175 if (!ser)
00176 {
00177 return;
00178 }
00179
00180 {
00181 boost::mutex::scoped_lock lock(outbox_mutex_);
00182
00183 int max_queue = 0;
00184 if (PublicationPtr parent = parent_.lock())
00185 {
00186 max_queue = parent->getMaxQueue();
00187 }
00188
00189 ROS_DEBUG_NAMED("superdebug", "TransportSubscriberLink on topic [%s] to caller [%s], queueing message (queue size [%d])", topic_.c_str(), destination_caller_id_.c_str(), (int)outbox_.size());
00190
00191 if (max_queue > 0 && (int)outbox_.size() >= max_queue)
00192 {
00193 if (!queue_full_)
00194 {
00195 ROS_DEBUG("Outgoing queue full for topic [%s]. "
00196 "Discarding oldest message\n",
00197 topic_.c_str());
00198 }
00199
00200 outbox_.pop();
00201 queue_full_ = true;
00202 }
00203 else
00204 {
00205 queue_full_ = false;
00206 }
00207
00208 outbox_.push(m);
00209 }
00210
00211 startMessageWrite(false);
00212
00213 stats_.messages_sent_++;
00214 stats_.bytes_sent_ += m.num_bytes;
00215 stats_.message_data_sent_ += m.num_bytes;
00216 }
00217
00218 std::string TransportSubscriberLink::getTransportType()
00219 {
00220 return connection_->getTransport()->getType();
00221 }
00222
00223 void TransportSubscriberLink::drop()
00224 {
00225
00226
00227 if (connection_->isSendingHeaderError())
00228 {
00229 connection_->removeDropListener(dropped_conn_);
00230 }
00231 else
00232 {
00233 connection_->drop(Connection::Destructing);
00234 }
00235 }
00236
00237 }