connection_manager.cpp
Go to the documentation of this file.
1 /*
2  * Copyright (C) 2009, Willow Garage, Inc.
3  *
4  * Redistribution and use in source and binary forms, with or without
5  * modification, are permitted provided that the following conditions are met:
6  * * Redistributions of source code must retain the above copyright notice,
7  * this list of conditions and the following disclaimer.
8  * * Redistributions in binary form must reproduce the above copyright
9  * notice, this list of conditions and the following disclaimer in the
10  * documentation and/or other materials provided with the distribution.
11  * * Neither the names of Willow Garage, Inc. nor the names of its
12  * contributors may be used to endorse or promote products derived from
13  * this software without specific prior written permission.
14  *
15  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
16  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
17  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
18  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
19  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
20  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
21  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
22  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
23  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
24  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
25  * POSSIBILITY OF SUCH DAMAGE.
26  */
27 
28 #include "ros/connection_manager.h"
29 #include "ros/poll_manager.h"
30 #include "ros/connection.h"
35 #include "ros/file_log.h"
36 #include "ros/network.h"
37 
38 #include <boost/bind/bind.hpp>
39 #include <ros/assert.h>
40 
41 namespace ros
42 {
43 
45 {
46  static ConnectionManagerPtr connection_manager = boost::make_shared<ConnectionManager>();
47  return connection_manager;
48 }
49 
51 : connection_id_counter_(0)
52 {
53 }
54 
56 {
57  shutdown();
58 }
59 
61 {
63  poll_conn_ = poll_manager_->addPollThreadListener(boost::bind(&ConnectionManager::removeDroppedConnections,
64  this));
65 
66  // Bring up the TCP listener socket
67  tcpserver_transport_ = boost::make_shared<TransportTCP>(&poll_manager_->getPollSet());
70  boost::bind(&ConnectionManager::tcprosAcceptConnection, this, boost::placeholders::_1)))
71  {
72  ROS_FATAL("Listen on port [%d] failed", network::getTCPROSPort());
73  ROS_BREAK();
74  }
75 
76  // Bring up the UDP listener socket
77  udpserver_transport_ = boost::make_shared<TransportUDP>(&poll_manager_->getPollSet());
78  if (!udpserver_transport_->createIncoming(0, true))
79  {
80  ROS_FATAL("Listen failed");
81  ROS_BREAK();
82  }
83 }
84 
86 {
88  {
89  udpserver_transport_->close();
90  udpserver_transport_.reset();
91  }
92 
94  {
95  tcpserver_transport_->close();
96  tcpserver_transport_.reset();
97  }
98 
99  poll_manager_->removePollThreadListener(poll_conn_);
100 
102 }
103 
105 {
106  S_Connection local_connections;
107  {
108  boost::mutex::scoped_lock conn_lock(connections_mutex_);
109  local_connections.swap(connections_);
110  }
111 
112  for(S_Connection::iterator itr = local_connections.begin();
113  itr != local_connections.end();
114  itr++)
115  {
116  const ConnectionPtr& conn = *itr;
117  conn->drop(reason);
118  }
119 
120  boost::mutex::scoped_lock dropped_lock(dropped_connections_mutex_);
121  dropped_connections_.clear();
122 }
123 
125 {
126  return tcpserver_transport_->getServerPort();
127 }
128 
130 {
131  return udpserver_transport_->getServerPort();
132 }
133 
135 {
136  boost::mutex::scoped_lock lock(connection_id_counter_mutex_);
137  uint32_t ret = connection_id_counter_++;
138  return ret;
139 }
140 
142 {
143  boost::mutex::scoped_lock lock(connections_mutex_);
144 
145  connections_.insert(conn);
146  conn->addDropListener(boost::bind(&ConnectionManager::onConnectionDropped, this, boost::placeholders::_1));
147 }
148 
150 {
151  boost::mutex::scoped_lock lock(dropped_connections_mutex_);
152  dropped_connections_.push_back(conn);
153 }
154 
156 {
157  V_Connection local_dropped;
158  {
159  boost::mutex::scoped_lock dropped_lock(dropped_connections_mutex_);
160  dropped_connections_.swap(local_dropped);
161  }
162 
163  boost::mutex::scoped_lock conn_lock(connections_mutex_);
164 
165  V_Connection::iterator conn_it = local_dropped.begin();
166  V_Connection::iterator conn_end = local_dropped.end();
167  for (;conn_it != conn_end; ++conn_it)
168  {
169  const ConnectionPtr& conn = *conn_it;
170  connections_.erase(conn);
171  }
172 }
173 
175 {
176  std::string client_uri = ""; // TODO: transport->getClientURI();
177  ROSCPP_LOG_DEBUG("UDPROS received a connection from [%s]", client_uri.c_str());
178 
179  ConnectionPtr conn(boost::make_shared<Connection>());
180  addConnection(conn);
181 
182  conn->initialize(transport, true, NULL);
184 }
185 
187 {
188  std::string client_uri = transport->getClientURI();
189  ROSCPP_LOG_DEBUG("TCPROS received a connection from [%s]", client_uri.c_str());
190 
191  ConnectionPtr conn(boost::make_shared<Connection>());
192  addConnection(conn);
193 
194  conn->initialize(transport, true, boost::bind(&ConnectionManager::onConnectionHeaderReceived, this, boost::placeholders::_1, boost::placeholders::_2));
195 }
196 
198 {
199  bool ret = false;
200  std::string val;
201  if (header.getValue("topic", val))
202  {
203  ROSCPP_CONN_LOG_DEBUG("Connection: Creating TransportSubscriberLink for topic [%s] connected to [%s]",
204  val.c_str(), conn->getRemoteString().c_str());
205 
206  TransportSubscriberLinkPtr sub_link(boost::make_shared<TransportSubscriberLink>());
207  sub_link->initialize(conn);
208  ret = sub_link->handleHeader(header);
209  }
210  else if (header.getValue("service", val))
211  {
212  ROSCPP_LOG_DEBUG("Connection: Creating ServiceClientLink for service [%s] connected to [%s]",
213  val.c_str(), conn->getRemoteString().c_str());
214 
215  ServiceClientLinkPtr link(boost::make_shared<ServiceClientLink>());
216  link->initialize(conn);
217  ret = link->handleHeader(header);
218  }
219  else
220  {
221  ROSCPP_LOG_DEBUG("Got a connection for a type other than 'topic' or 'service' from [%s]. Fail.",
222  conn->getRemoteString().c_str());
223  return false;
224  }
225 
226  return ret;
227 }
228 
229 }
ros::ConnectionManager::onConnectionHeaderReceived
bool onConnectionHeaderReceived(const ConnectionPtr &conn, const Header &header)
Definition: connection_manager.cpp:197
connection_manager.h
ros::ConnectionManager::onConnectionDropped
void onConnectionDropped(const ConnectionPtr &conn)
Definition: connection_manager.cpp:149
ROS_BREAK
#define ROS_BREAK()
ROSCPP_CONN_LOG_DEBUG
#define ROSCPP_CONN_LOG_DEBUG(...)
Definition: file_log.h:36
boost::shared_ptr< ConnectionManager >
ros::ConnectionManager::connection_id_counter_
uint32_t connection_id_counter_
Definition: connection_manager.h:94
ros
ros::ConnectionManager::shutdown
void shutdown()
Definition: connection_manager.cpp:85
ros::ConnectionManager::removeDroppedConnections
void removeDroppedConnections()
Definition: connection_manager.cpp:155
ros::Header
ros::ConnectionManager::~ConnectionManager
~ConnectionManager()
Definition: connection_manager.cpp:55
ros::ConnectionManager::dropped_connections_mutex_
boost::mutex dropped_connections_mutex_
Definition: connection_manager.h:90
ros::ConnectionManager::clear
void clear(Connection::DropReason reason)
Definition: connection_manager.cpp:104
ros::ConnectionManager::addConnection
void addConnection(const ConnectionPtr &connection)
Add a connection to be tracked by the node. Will automatically remove them if they've been dropped,...
Definition: connection_manager.cpp:141
ros::ConnectionManager::tcpserver_transport_
TransportTCPPtr tcpserver_transport_
Definition: connection_manager.h:99
ros::ConnectionManager::ConnectionManager
ConnectionManager()
Definition: connection_manager.cpp:50
ros::ConnectionManager::getUDPPort
uint32_t getUDPPort()
Definition: connection_manager.cpp:129
ros::network::getTCPROSPort
ROSCPP_DECL uint16_t getTCPROSPort()
Definition: network.cpp:76
ros::S_Connection
std::set< ConnectionPtr > S_Connection
Definition: forwards.h:64
ros::ConnectionManager::udpserver_transport_
TransportUDPPtr udpserver_transport_
Definition: connection_manager.h:100
transport_udp.h
ros::V_Connection
std::vector< ConnectionPtr > V_Connection
Definition: forwards.h:65
transport_tcp.h
ros::ConnectionManager::poll_conn_
boost::signals2::connection poll_conn_
Definition: connection_manager.h:97
ros::PollManager::instance
static const PollManagerPtr & instance()
Definition: poll_manager.cpp:36
ROS_FATAL
#define ROS_FATAL(...)
ros::Connection::DropReason
DropReason
Definition: connection.h:73
ros::ConnectionManager::connection_id_counter_mutex_
boost::mutex connection_id_counter_mutex_
Definition: connection_manager.h:95
ros::Connection::Destructing
@ Destructing
Definition: connection.h:77
ros::ConnectionManager::getNewConnectionID
uint32_t getNewConnectionID()
Get a new connection ID.
Definition: connection_manager.cpp:134
ros::ConnectionManager::dropped_connections_
V_Connection dropped_connections_
Definition: connection_manager.h:88
connection.h
ros::ConnectionManager::poll_manager_
PollManagerPtr poll_manager_
Definition: connection_manager.h:85
ros::ConnectionManager::tcprosAcceptConnection
void tcprosAcceptConnection(const TransportTCPPtr &transport)
Definition: connection_manager.cpp:186
ros::ConnectionManager::udprosIncomingConnection
void udprosIncomingConnection(const TransportUDPPtr &transport, Header &header)
Definition: connection_manager.cpp:174
poll_manager.h
network.h
ros::ConnectionManager::instance
static const ConnectionManagerPtr & instance()
Definition: connection_manager.cpp:44
ros::ConnectionManager::start
void start()
Definition: connection_manager.cpp:60
header
const std::string header
assert.h
ros::ConnectionManager::MAX_TCPROS_CONN_QUEUE
const static int MAX_TCPROS_CONN_QUEUE
Definition: connection_manager.h:102
ros::ConnectionManager::connections_
S_Connection connections_
Definition: connection_manager.h:87
ros::ConnectionManager::getTCPPort
uint32_t getTCPPort()
Definition: connection_manager.cpp:124
ROSCPP_LOG_DEBUG
#define ROSCPP_LOG_DEBUG(...)
Definition: file_log.h:35
ros::ConnectionManager::connections_mutex_
boost::mutex connections_mutex_
Definition: connection_manager.h:89
file_log.h


roscpp
Author(s): Morgan Quigley, Josh Faust, Brian Gerkey, Troy Straszheim, Dirk Thomas , Jacob Perron
autogenerated on Tue May 20 2025 03:00:11