$search
00001 00002 /* 00003 * Copyright (C) 2008, Morgan Quigley and Willow Garage, Inc. 00004 * 00005 * Redistribution and use in source and binary forms, with or without 00006 * modification, are permitted provided that the following conditions are met: 00007 * * Redistributions of source code must retain the above copyright notice, 00008 * this list of conditions and the following disclaimer. 00009 * * Redistributions in binary form must reproduce the above copyright 00010 * notice, this list of conditions and the following disclaimer in the 00011 * documentation and/or other materials provided with the distribution. 00012 * * Neither the names of Stanford University or Willow Garage, Inc. nor the names of its 00013 * contributors may be used to endorse or promote products derived from 00014 * this software without specific prior written permission. 00015 * 00016 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" 00017 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 00018 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 00019 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE 00020 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 00021 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 00022 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 00023 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 00024 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) 00025 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 00026 * POSSIBILITY OF SUCH DAMAGE. 00027 */ 00028 00029 #include "ros/transport_subscriber_link.h" 00030 #include "ros/publication.h" 00031 #include "ros/header.h" 00032 #include "ros/connection.h" 00033 #include "ros/transport/transport.h" 00034 #include "ros/this_node.h" 00035 #include "ros/connection_manager.h" 00036 #include "ros/topic_manager.h" 00037 #include "ros/file_log.h" 00038 00039 #include <boost/bind.hpp> 00040 00041 namespace ros 00042 { 00043 00044 TransportSubscriberLink::TransportSubscriberLink() 00045 : writing_message_(false) 00046 , header_written_(false) 00047 , queue_full_(false) 00048 { 00049 00050 } 00051 00052 TransportSubscriberLink::~TransportSubscriberLink() 00053 { 00054 drop(); 00055 } 00056 00057 bool TransportSubscriberLink::initialize(const ConnectionPtr& connection) 00058 { 00059 connection_ = connection; 00060 dropped_conn_ = connection_->addDropListener(boost::bind(&TransportSubscriberLink::onConnectionDropped, this, _1)); 00061 00062 return true; 00063 } 00064 00065 bool TransportSubscriberLink::handleHeader(const Header& header) 00066 { 00067 std::string topic; 00068 if (!header.getValue("topic", topic)) 00069 { 00070 std::string msg("Header from subscriber did not have the required element: topic"); 00071 00072 ROS_ERROR("%s", msg.c_str()); 00073 connection_->sendHeaderError(msg); 00074 00075 return false; 00076 } 00077 00078 // This will get validated by validateHeader below 00079 std::string client_callerid; 00080 header.getValue("callerid", client_callerid); 00081 PublicationPtr pt = TopicManager::instance()->lookupPublication(topic); 00082 if (!pt) 00083 { 00084 std::string msg = std::string("received a connection for a nonexistent topic [") + 00085 topic + std::string("] from [" + connection_->getTransport()->getTransportInfo() + "] [" + client_callerid +"]."); 00086 00087 ROSCPP_LOG_DEBUG("%s", msg.c_str()); 00088 connection_->sendHeaderError(msg); 00089 00090 return false; 00091 } 00092 00093 std::string error_msg; 00094 if (!pt->validateHeader(header, error_msg)) 00095 { 00096 ROSCPP_LOG_DEBUG("%s", error_msg.c_str()); 00097 connection_->sendHeaderError(error_msg); 00098 00099 return false; 00100 } 00101 00102 destination_caller_id_ = client_callerid; 00103 connection_id_ = ConnectionManager::instance()->getNewConnectionID(); 00104 topic_ = pt->getName(); 00105 parent_ = PublicationWPtr(pt); 00106 00107 // Send back a success, with info 00108 M_string m; 00109 m["type"] = pt->getDataType(); 00110 m["md5sum"] = pt->getMD5Sum(); 00111 m["message_definition"] = pt->getMessageDefinition(); 00112 m["callerid"] = this_node::getName(); 00113 m["latching"] = pt->isLatching() ? "1" : "0"; 00114 connection_->writeHeader(m, boost::bind(&TransportSubscriberLink::onHeaderWritten, this, _1)); 00115 00116 pt->addSubscriberLink(shared_from_this()); 00117 00118 return true; 00119 } 00120 00121 void TransportSubscriberLink::onConnectionDropped(const ConnectionPtr& conn) 00122 { 00123 ROS_ASSERT(conn == connection_); 00124 00125 PublicationPtr parent = parent_.lock(); 00126 00127 if (parent) 00128 { 00129 ROSCPP_LOG_DEBUG("Connection to subscriber [%s] to topic [%s] dropped", connection_->getRemoteString().c_str(), topic_.c_str()); 00130 00131 parent->removeSubscriberLink(shared_from_this()); 00132 } 00133 } 00134 00135 void TransportSubscriberLink::onHeaderWritten(const ConnectionPtr& conn) 00136 { 00137 header_written_ = true; 00138 startMessageWrite(true); 00139 } 00140 00141 void TransportSubscriberLink::onMessageWritten(const ConnectionPtr& conn) 00142 { 00143 writing_message_ = false; 00144 startMessageWrite(true); 00145 } 00146 00147 void TransportSubscriberLink::startMessageWrite(bool immediate_write) 00148 { 00149 boost::shared_array<uint8_t> dummy; 00150 SerializedMessage m(dummy, (uint32_t)0); 00151 00152 { 00153 boost::mutex::scoped_lock lock(outbox_mutex_); 00154 if (writing_message_ || !header_written_) 00155 { 00156 return; 00157 } 00158 00159 if (!outbox_.empty()) 00160 { 00161 writing_message_ = true; 00162 m = outbox_.front(); 00163 outbox_.pop(); 00164 } 00165 } 00166 00167 if (m.num_bytes > 0) 00168 { 00169 connection_->write(m.buf, m.num_bytes, boost::bind(&TransportSubscriberLink::onMessageWritten, this, _1), immediate_write); 00170 } 00171 } 00172 00173 void TransportSubscriberLink::enqueueMessage(const SerializedMessage& m, bool ser, bool nocopy) 00174 { 00175 if (!ser) 00176 { 00177 return; 00178 } 00179 00180 { 00181 boost::mutex::scoped_lock lock(outbox_mutex_); 00182 00183 int max_queue = 0; 00184 if (PublicationPtr parent = parent_.lock()) 00185 { 00186 max_queue = parent->getMaxQueue(); 00187 } 00188 00189 ROS_DEBUG_NAMED("superdebug", "TransportSubscriberLink on topic [%s] to caller [%s], queueing message (queue size [%d])", topic_.c_str(), destination_caller_id_.c_str(), (int)outbox_.size()); 00190 00191 if (max_queue > 0 && (int)outbox_.size() >= max_queue) 00192 { 00193 if (!queue_full_) 00194 { 00195 ROS_DEBUG("Outgoing queue full for topic [%s]. " 00196 "Discarding oldest message\n", 00197 topic_.c_str()); 00198 } 00199 00200 outbox_.pop(); // toss out the oldest thing in the queue to make room for us 00201 queue_full_ = true; 00202 } 00203 else 00204 { 00205 queue_full_ = false; 00206 } 00207 00208 outbox_.push(m); 00209 } 00210 00211 startMessageWrite(false); 00212 00213 stats_.messages_sent_++; 00214 stats_.bytes_sent_ += m.num_bytes; 00215 stats_.message_data_sent_ += m.num_bytes; 00216 } 00217 00218 std::string TransportSubscriberLink::getTransportType() 00219 { 00220 return connection_->getTransport()->getType(); 00221 } 00222 00223 void TransportSubscriberLink::drop() 00224 { 00225 // Only drop the connection if it's not already sending a header error 00226 // If it is, it will automatically drop itself 00227 if (connection_->isSendingHeaderError()) 00228 { 00229 connection_->removeDropListener(dropped_conn_); 00230 } 00231 else 00232 { 00233 connection_->drop(Connection::Destructing); 00234 } 00235 } 00236 00237 } // namespace ros