transport_subscriber_link.cpp
Go to the documentation of this file.
1 
2 /*
3  * Copyright (C) 2008, Morgan Quigley and Willow Garage, Inc.
4  *
5  * Redistribution and use in source and binary forms, with or without
6  * modification, are permitted provided that the following conditions are met:
7  * * Redistributions of source code must retain the above copyright notice,
8  * this list of conditions and the following disclaimer.
9  * * Redistributions in binary form must reproduce the above copyright
10  * notice, this list of conditions and the following disclaimer in the
11  * documentation and/or other materials provided with the distribution.
12  * * Neither the names of Stanford University or Willow Garage, Inc. nor the names of its
13  * contributors may be used to endorse or promote products derived from
14  * this software without specific prior written permission.
15  *
16  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
17  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
18  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
19  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
20  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
21  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
22  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
23  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
24  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
25  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
26  * POSSIBILITY OF SUCH DAMAGE.
27  */
28 
30 #include "ros/publication.h"
31 #include "ros/header.h"
32 #include "ros/connection.h"
34 #include "ros/this_node.h"
35 #include "ros/connection_manager.h"
36 #include "ros/topic_manager.h"
37 #include "ros/file_log.h"
38 
39 #include <boost/bind.hpp>
40 
41 namespace ros
42 {
43 
45 : writing_message_(false)
46 , header_written_(false)
47 , queue_full_(false)
48 {
49 
50 }
51 
53 {
54  drop();
55  connection_->removeDropListener(dropped_conn_);
56 }
57 
59 {
60  connection_ = connection;
61  dropped_conn_ = connection_->addDropListener(boost::bind(&TransportSubscriberLink::onConnectionDropped, this, boost::placeholders::_1));
62 
63  return true;
64 }
65 
67 {
68  std::string topic;
69  if (!header.getValue("topic", topic))
70  {
71  std::string msg("Header from subscriber did not have the required element: topic");
72 
73  ROS_ERROR("%s", msg.c_str());
74  connection_->sendHeaderError(msg);
75 
76  return false;
77  }
78 
79  // This will get validated by validateHeader below
80  std::string client_callerid;
81  header.getValue("callerid", client_callerid);
82  PublicationPtr pt = TopicManager::instance()->lookupPublication(topic);
83  if (!pt)
84  {
85  std::string msg = std::string("received a connection for a nonexistent topic [") +
86  topic + std::string("] from [" + connection_->getTransport()->getTransportInfo() + "] [" + client_callerid +"].");
87 
88  ROSCPP_LOG_DEBUG("%s", msg.c_str());
89  connection_->sendHeaderError(msg);
90 
91  return false;
92  }
93 
94  std::string error_msg;
95  if (!pt->validateHeader(header, error_msg))
96  {
97  ROSCPP_LOG_DEBUG("%s", error_msg.c_str());
98  connection_->sendHeaderError(error_msg);
99 
100  return false;
101  }
102 
103  destination_caller_id_ = client_callerid;
104  connection_id_ = ConnectionManager::instance()->getNewConnectionID();
105  topic_ = pt->getName();
106  parent_ = PublicationWPtr(pt);
107 
108  // Send back a success, with info
109  M_string m;
110  m["type"] = pt->getDataType();
111  m["md5sum"] = pt->getMD5Sum();
112  m["message_definition"] = pt->getMessageDefinition();
113  m["callerid"] = this_node::getName();
114  m["latching"] = pt->isLatching() ? "1" : "0";
115  m["topic"] = topic_;
116  connection_->writeHeader(m, boost::bind(&TransportSubscriberLink::onHeaderWritten, this, boost::placeholders::_1));
117 
118  pt->addSubscriberLink(shared_from_this());
119 
120  return true;
121 }
122 
124 {
125  (void)conn;
126  ROS_ASSERT(conn == connection_);
127 
128  PublicationPtr parent = parent_.lock();
129 
130  if (parent)
131  {
132  ROSCPP_CONN_LOG_DEBUG("Connection to subscriber [%s] to topic [%s] dropped", connection_->getRemoteString().c_str(), topic_.c_str());
133 
134  parent->removeSubscriberLink(shared_from_this());
135  }
136 }
137 
139 {
140  (void)conn;
141  header_written_ = true;
142  startMessageWrite(true);
143 }
144 
146 {
147  (void)conn;
148  writing_message_ = false;
149  startMessageWrite(true);
150 }
151 
153 {
155  SerializedMessage m(dummy, (uint32_t)0);
156 
157  {
158  boost::mutex::scoped_lock lock(outbox_mutex_);
160  {
161  return;
162  }
163 
164  if (!outbox_.empty())
165  {
166  writing_message_ = true;
167  m = outbox_.front();
168  outbox_.pop();
169  }
170  }
171 
172  if (m.num_bytes > 0)
173  {
174  connection_->write(m.buf, m.num_bytes, boost::bind(&TransportSubscriberLink::onMessageWritten, this, boost::placeholders::_1), immediate_write);
175  }
176 }
177 
178 void TransportSubscriberLink::enqueueMessage(const SerializedMessage& m, bool ser, bool nocopy)
179 {
180  (void)nocopy;
181  if (!ser)
182  {
183  return;
184  }
185 
186  {
187  boost::mutex::scoped_lock lock(outbox_mutex_);
188 
189  int max_queue = 0;
190  if (PublicationPtr parent = parent_.lock())
191  {
192  max_queue = parent->getMaxQueue();
193  }
194 
195  ROS_DEBUG_NAMED("superdebug", "TransportSubscriberLink on topic [%s] to caller [%s], queueing message (queue size [%d])", topic_.c_str(), destination_caller_id_.c_str(), (int)outbox_.size());
196 
197  if (max_queue > 0 && (int)outbox_.size() >= max_queue)
198  {
199  if (!queue_full_)
200  {
201  ROS_DEBUG("Outgoing queue full for topic [%s]. "
202  "Discarding oldest message",
203  topic_.c_str());
204  }
205 
206  outbox_.pop(); // toss out the oldest thing in the queue to make room for us
207  queue_full_ = true;
208  }
209  else
210  {
211  queue_full_ = false;
212  }
213 
214  outbox_.push(m);
215  }
216 
217  startMessageWrite(false);
218 
222 }
223 
225 {
226  return connection_->getTransport()->getType();
227 }
228 
230 {
231  return connection_->getTransport()->getTransportInfo();
232 }
233 
235 {
236  // Only drop the connection if it's not already sending a header error
237  // If it is, it will automatically drop itself
238  if (connection_->isSendingHeaderError())
239  {
240  connection_->removeDropListener(dropped_conn_);
241  }
242  else
243  {
245  }
246 }
247 
248 } // namespace ros
connection_manager.h
this_node.h
ros::SerializedMessage
ROSCPP_CONN_LOG_DEBUG
#define ROSCPP_CONN_LOG_DEBUG(...)
Definition: file_log.h:36
boost::shared_ptr< Connection >
ros
ros::Header
topic_manager.h
ros::PublicationWPtr
boost::weak_ptr< Publication > PublicationWPtr
Definition: rosout_appender.h:57
ROS_DEBUG
#define ROS_DEBUG(...)
ROS_DEBUG_NAMED
#define ROS_DEBUG_NAMED(name,...)
boost::shared_array< uint8_t >
ros::SerializedMessage::num_bytes
size_t num_bytes
ros::Connection::Destructing
@ Destructing
Definition: connection.h:77
ros::this_node::getName
const ROSCPP_DECL std::string & getName()
Returns the name of the current node.
Definition: this_node.cpp:74
connection.h
ros::TopicManager::instance
static const TopicManagerPtr & instance()
Definition: topic_manager.cpp:56
publication.h
ROS_ERROR
#define ROS_ERROR(...)
ros::ConnectionManager::instance
static const ConnectionManagerPtr & instance()
Definition: connection_manager.cpp:43
header.h
transport.h
header
const std::string header
ROS_ASSERT
#define ROS_ASSERT(cond)
ROSCPP_LOG_DEBUG
#define ROSCPP_LOG_DEBUG(...)
Definition: file_log.h:35
ros::SerializedMessage::buf
boost::shared_array< uint8_t > buf
file_log.h
ros::M_string
std::map< std::string, std::string > M_string


roscpp
Author(s): Morgan Quigley, Josh Faust, Brian Gerkey, Troy Straszheim, Dirk Thomas , Jacob Perron
autogenerated on Thu Nov 23 2023 04:01:44