connection_manager.cpp
Go to the documentation of this file.
1 /*
2  * Copyright (C) 2009, Willow Garage, Inc.
3  *
4  * Redistribution and use in source and binary forms, with or without
5  * modification, are permitted provided that the following conditions are met:
6  * * Redistributions of source code must retain the above copyright notice,
7  * this list of conditions and the following disclaimer.
8  * * Redistributions in binary form must reproduce the above copyright
9  * notice, this list of conditions and the following disclaimer in the
10  * documentation and/or other materials provided with the distribution.
11  * * Neither the names of Willow Garage, Inc. nor the names of its
12  * contributors may be used to endorse or promote products derived from
13  * this software without specific prior written permission.
14  *
15  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
16  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
17  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
18  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
19  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
20  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
21  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
22  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
23  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
24  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
25  * POSSIBILITY OF SUCH DAMAGE.
26  */
27 
28 #include "ros/connection_manager.h"
29 #include "ros/poll_manager.h"
30 #include "ros/connection.h"
35 #include "ros/file_log.h"
36 #include "ros/network.h"
37 
38 #include <ros/assert.h>
39 
40 namespace ros
41 {
42 
44 {
45  static ConnectionManagerPtr connection_manager = boost::make_shared<ConnectionManager>();
46  return connection_manager;
47 }
48 
51 {
52 }
53 
55 {
56  shutdown();
57 }
58 
60 {
62  poll_conn_ = poll_manager_->addPollThreadListener(boost::bind(&ConnectionManager::removeDroppedConnections,
63  this));
64 
65  // Bring up the TCP listener socket
66  tcpserver_transport_ = boost::make_shared<TransportTCP>(&poll_manager_->getPollSet());
69  boost::bind(&ConnectionManager::tcprosAcceptConnection, this, _1)))
70  {
71  ROS_FATAL("Listen on port [%d] failed", network::getTCPROSPort());
72  ROS_BREAK();
73  }
74 
75  // Bring up the UDP listener socket
76  udpserver_transport_ = boost::make_shared<TransportUDP>(&poll_manager_->getPollSet());
77  if (!udpserver_transport_->createIncoming(0, true))
78  {
79  ROS_FATAL("Listen failed");
80  ROS_BREAK();
81  }
82 }
83 
85 {
87  {
88  udpserver_transport_->close();
89  udpserver_transport_.reset();
90  }
91 
93  {
94  tcpserver_transport_->close();
95  tcpserver_transport_.reset();
96  }
97 
98  poll_manager_->removePollThreadListener(poll_conn_);
99 
101 }
102 
104 {
105  S_Connection local_connections;
106  {
107  boost::mutex::scoped_lock conn_lock(connections_mutex_);
108  local_connections.swap(connections_);
109  }
110 
111  for(S_Connection::iterator itr = local_connections.begin();
112  itr != local_connections.end();
113  itr++)
114  {
115  const ConnectionPtr& conn = *itr;
116  conn->drop(reason);
117  }
118 
119  boost::mutex::scoped_lock dropped_lock(dropped_connections_mutex_);
120  dropped_connections_.clear();
121 }
122 
124 {
125  return tcpserver_transport_->getServerPort();
126 }
127 
129 {
130  return udpserver_transport_->getServerPort();
131 }
132 
134 {
135  boost::mutex::scoped_lock lock(connection_id_counter_mutex_);
136  uint32_t ret = connection_id_counter_++;
137  return ret;
138 }
139 
141 {
142  boost::mutex::scoped_lock lock(connections_mutex_);
143 
144  connections_.insert(conn);
145  conn->addDropListener(boost::bind(&ConnectionManager::onConnectionDropped, this, _1));
146 }
147 
149 {
150  boost::mutex::scoped_lock lock(dropped_connections_mutex_);
151  dropped_connections_.push_back(conn);
152 }
153 
155 {
156  V_Connection local_dropped;
157  {
158  boost::mutex::scoped_lock dropped_lock(dropped_connections_mutex_);
159  dropped_connections_.swap(local_dropped);
160  }
161 
162  boost::mutex::scoped_lock conn_lock(connections_mutex_);
163 
164  V_Connection::iterator conn_it = local_dropped.begin();
165  V_Connection::iterator conn_end = local_dropped.end();
166  for (;conn_it != conn_end; ++conn_it)
167  {
168  const ConnectionPtr& conn = *conn_it;
169  connections_.erase(conn);
170  }
171 }
172 
174 {
175  std::string client_uri = ""; // TODO: transport->getClientURI();
176  ROSCPP_LOG_DEBUG("UDPROS received a connection from [%s]", client_uri.c_str());
177 
178  ConnectionPtr conn(boost::make_shared<Connection>());
179  addConnection(conn);
180 
181  conn->initialize(transport, true, NULL);
182  onConnectionHeaderReceived(conn, header);
183 }
184 
186 {
187  std::string client_uri = transport->getClientURI();
188  ROSCPP_LOG_DEBUG("TCPROS received a connection from [%s]", client_uri.c_str());
189 
190  ConnectionPtr conn(boost::make_shared<Connection>());
191  addConnection(conn);
192 
193  conn->initialize(transport, true, boost::bind(&ConnectionManager::onConnectionHeaderReceived, this, _1, _2));
194 }
195 
197 {
198  bool ret = false;
199  std::string val;
200  if (header.getValue("topic", val))
201  {
202  ROSCPP_CONN_LOG_DEBUG("Connection: Creating TransportSubscriberLink for topic [%s] connected to [%s]",
203  val.c_str(), conn->getRemoteString().c_str());
204 
205  TransportSubscriberLinkPtr sub_link(boost::make_shared<TransportSubscriberLink>());
206  sub_link->initialize(conn);
207  ret = sub_link->handleHeader(header);
208  }
209  else if (header.getValue("service", val))
210  {
211  ROSCPP_LOG_DEBUG("Connection: Creating ServiceClientLink for service [%s] connected to [%s]",
212  val.c_str(), conn->getRemoteString().c_str());
213 
214  ServiceClientLinkPtr link(boost::make_shared<ServiceClientLink>());
215  link->initialize(conn);
216  ret = link->handleHeader(header);
217  }
218  else
219  {
220  ROSCPP_LOG_DEBUG("Got a connection for a type other than 'topic' or 'service' from [%s]. Fail.",
221  conn->getRemoteString().c_str());
222  return false;
223  }
224 
225  return ret;
226 }
227 
228 }
static const int MAX_TCPROS_CONN_QUEUE
void tcprosAcceptConnection(const TransportTCPPtr &transport)
void udprosIncomingConnection(const TransportUDPPtr &transport, Header &header)
#define ROS_FATAL(...)
void addConnection(const ConnectionPtr &connection)
Add a connection to be tracked by the node. Will automatically remove them if they&#39;ve been dropped...
TransportTCPPtr tcpserver_transport_
void onConnectionDropped(const ConnectionPtr &conn)
#define ROSCPP_LOG_DEBUG(...)
Definition: file_log.h:35
boost::mutex connection_id_counter_mutex_
static const ConnectionManagerPtr & instance()
uint32_t getNewConnectionID()
Get a new connection ID.
std::set< ConnectionPtr > S_Connection
Definition: forwards.h:64
void clear(Connection::DropReason reason)
std::vector< ConnectionPtr > V_Connection
Definition: forwards.h:65
bool onConnectionHeaderReceived(const ConnectionPtr &conn, const Header &header)
boost::signals2::connection poll_conn_
static const PollManagerPtr & instance()
TransportUDPPtr udpserver_transport_
#define ROSCPP_CONN_LOG_DEBUG(...)
Definition: file_log.h:36
bool getValue(const std::string &key, std::string &value) const
#define ROS_BREAK()
boost::mutex dropped_connections_mutex_
ROSCPP_DECL uint16_t getTCPROSPort()
Definition: network.cpp:76
V_Connection dropped_connections_
PollManagerPtr poll_manager_


roscpp
Author(s): Morgan Quigley, Josh Faust, Brian Gerkey, Troy Straszheim, Dirk Thomas
autogenerated on Mon Feb 28 2022 23:33:27