transport_subscriber_link.cpp
Go to the documentation of this file.
1 
2 /*
3  * Copyright (C) 2008, Morgan Quigley and Willow Garage, Inc.
4  *
5  * Redistribution and use in source and binary forms, with or without
6  * modification, are permitted provided that the following conditions are met:
7  * * Redistributions of source code must retain the above copyright notice,
8  * this list of conditions and the following disclaimer.
9  * * Redistributions in binary form must reproduce the above copyright
10  * notice, this list of conditions and the following disclaimer in the
11  * documentation and/or other materials provided with the distribution.
12  * * Neither the names of Stanford University or Willow Garage, Inc. nor the names of its
13  * contributors may be used to endorse or promote products derived from
14  * this software without specific prior written permission.
15  *
16  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
17  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
18  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
19  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
20  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
21  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
22  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
23  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
24  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
25  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
26  * POSSIBILITY OF SUCH DAMAGE.
27  */
28 
30 #include "ros/publication.h"
31 #include "ros/header.h"
32 #include "ros/connection.h"
34 #include "ros/this_node.h"
35 #include "ros/connection_manager.h"
36 #include "ros/topic_manager.h"
37 #include "ros/file_log.h"
38 
39 #include <boost/bind.hpp>
40 
41 namespace ros
42 {
43 
45 : writing_message_(false)
46 , header_written_(false)
47 , queue_full_(false)
48 {
49 
50 }
51 
53 {
54  drop();
55 }
56 
58 {
59  connection_ = connection;
60  dropped_conn_ = connection_->addDropListener(boost::bind(&TransportSubscriberLink::onConnectionDropped, this, _1));
61 
62  return true;
63 }
64 
66 {
67  std::string topic;
68  if (!header.getValue("topic", topic))
69  {
70  std::string msg("Header from subscriber did not have the required element: topic");
71 
72  ROS_ERROR("%s", msg.c_str());
73  connection_->sendHeaderError(msg);
74 
75  return false;
76  }
77 
78  // This will get validated by validateHeader below
79  std::string client_callerid;
80  header.getValue("callerid", client_callerid);
81  PublicationPtr pt = TopicManager::instance()->lookupPublication(topic);
82  if (!pt)
83  {
84  std::string msg = std::string("received a connection for a nonexistent topic [") +
85  topic + std::string("] from [" + connection_->getTransport()->getTransportInfo() + "] [" + client_callerid +"].");
86 
87  ROSCPP_LOG_DEBUG("%s", msg.c_str());
88  connection_->sendHeaderError(msg);
89 
90  return false;
91  }
92 
93  std::string error_msg;
94  if (!pt->validateHeader(header, error_msg))
95  {
96  ROSCPP_LOG_DEBUG("%s", error_msg.c_str());
97  connection_->sendHeaderError(error_msg);
98 
99  return false;
100  }
101 
102  destination_caller_id_ = client_callerid;
103  connection_id_ = ConnectionManager::instance()->getNewConnectionID();
104  topic_ = pt->getName();
105  parent_ = PublicationWPtr(pt);
106 
107  // Send back a success, with info
108  M_string m;
109  m["type"] = pt->getDataType();
110  m["md5sum"] = pt->getMD5Sum();
111  m["message_definition"] = pt->getMessageDefinition();
112  m["callerid"] = this_node::getName();
113  m["latching"] = pt->isLatching() ? "1" : "0";
114  m["topic"] = topic_;
115  connection_->writeHeader(m, boost::bind(&TransportSubscriberLink::onHeaderWritten, this, _1));
116 
117  pt->addSubscriberLink(shared_from_this());
118 
119  return true;
120 }
121 
123 {
124  (void)conn;
125  ROS_ASSERT(conn == connection_);
126 
127  PublicationPtr parent = parent_.lock();
128 
129  if (parent)
130  {
131  ROSCPP_CONN_LOG_DEBUG("Connection to subscriber [%s] to topic [%s] dropped", connection_->getRemoteString().c_str(), topic_.c_str());
132 
133  parent->removeSubscriberLink(shared_from_this());
134  }
135 }
136 
138 {
139  (void)conn;
140  header_written_ = true;
141  startMessageWrite(true);
142 }
143 
145 {
146  (void)conn;
147  writing_message_ = false;
148  startMessageWrite(true);
149 }
150 
152 {
154  SerializedMessage m(dummy, (uint32_t)0);
155 
156  {
157  boost::mutex::scoped_lock lock(outbox_mutex_);
159  {
160  return;
161  }
162 
163  if (!outbox_.empty())
164  {
165  writing_message_ = true;
166  m = outbox_.front();
167  outbox_.pop();
168  }
169  }
170 
171  if (m.num_bytes > 0)
172  {
173  connection_->write(m.buf, m.num_bytes, boost::bind(&TransportSubscriberLink::onMessageWritten, this, _1), immediate_write);
174  }
175 }
176 
177 void TransportSubscriberLink::enqueueMessage(const SerializedMessage& m, bool ser, bool nocopy)
178 {
179  (void)nocopy;
180  if (!ser)
181  {
182  return;
183  }
184 
185  {
186  boost::mutex::scoped_lock lock(outbox_mutex_);
187 
188  int max_queue = 0;
189  if (PublicationPtr parent = parent_.lock())
190  {
191  max_queue = parent->getMaxQueue();
192  }
193 
194  ROS_DEBUG_NAMED("superdebug", "TransportSubscriberLink on topic [%s] to caller [%s], queueing message (queue size [%d])", topic_.c_str(), destination_caller_id_.c_str(), (int)outbox_.size());
195 
196  if (max_queue > 0 && (int)outbox_.size() >= max_queue)
197  {
198  if (!queue_full_)
199  {
200  ROS_DEBUG("Outgoing queue full for topic [%s]. "
201  "Discarding oldest message",
202  topic_.c_str());
203  }
204 
205  outbox_.pop(); // toss out the oldest thing in the queue to make room for us
206  queue_full_ = true;
207  }
208  else
209  {
210  queue_full_ = false;
211  }
212 
213  outbox_.push(m);
214  }
215 
216  startMessageWrite(false);
217 
221 }
222 
224 {
225  return connection_->getTransport()->getType();
226 }
227 
229 {
230  return connection_->getTransport()->getTransportInfo();
231 }
232 
234 {
235  // Only drop the connection if it's not already sending a header error
236  // If it is, it will automatically drop itself
237  if (connection_->isSendingHeaderError())
238  {
239  connection_->removeDropListener(dropped_conn_);
240  }
241  else
242  {
244  }
245 }
246 
247 } // namespace ros
ROSCPP_DECL const std::string & getName()
Returns the name of the current node.
Definition: this_node.cpp:74
std::map< std::string, std::string > M_string
#define ROS_DEBUG_NAMED(name,...)
#define ROSCPP_LOG_DEBUG(...)
Definition: file_log.h:35
static const ConnectionManagerPtr & instance()
boost::shared_array< uint8_t > buf
#define ROSCPP_CONN_LOG_DEBUG(...)
Definition: file_log.h:36
bool getValue(const std::string &key, std::string &value) const
boost::weak_ptr< Publication > PublicationWPtr
#define ROS_ASSERT(cond)
#define ROS_ERROR(...)
static const TopicManagerPtr & instance()
#define ROS_DEBUG(...)


roscpp
Author(s): Morgan Quigley, Josh Faust, Brian Gerkey, Troy Straszheim, Dirk Thomas
autogenerated on Mon Feb 28 2022 23:33:27