connection_manager.cpp
Go to the documentation of this file.
1 /*
2  * Copyright (C) 2009, Willow Garage, Inc.
3  *
4  * Redistribution and use in source and binary forms, with or without
5  * modification, are permitted provided that the following conditions are met:
6  * * Redistributions of source code must retain the above copyright notice,
7  * this list of conditions and the following disclaimer.
8  * * Redistributions in binary form must reproduce the above copyright
9  * notice, this list of conditions and the following disclaimer in the
10  * documentation and/or other materials provided with the distribution.
11  * * Neither the names of Willow Garage, Inc. nor the names of its
12  * contributors may be used to endorse or promote products derived from
13  * this software without specific prior written permission.
14  *
15  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
16  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
17  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
18  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
19  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
20  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
21  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
22  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
23  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
24  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
25  * POSSIBILITY OF SUCH DAMAGE.
26  */
27 
28 #include "ros/connection_manager.h"
29 #include "ros/poll_manager.h"
30 #include "ros/connection.h"
35 #include "ros/file_log.h"
36 #include "ros/network.h"
37 
38 #include <ros/assert.h>
39 
40 namespace ros
41 {
42 
46 {
47  if (!g_connection_manager)
48  {
49  boost::mutex::scoped_lock lock(g_connection_manager_mutex);
50  if (!g_connection_manager)
51  {
52  g_connection_manager = boost::make_shared<ConnectionManager>();
53  }
54  }
55 
56  return g_connection_manager;
57 }
58 
61 {
62 }
63 
65 {
66  shutdown();
67 }
68 
70 {
72  poll_conn_ = poll_manager_->addPollThreadListener(boost::bind(&ConnectionManager::removeDroppedConnections,
73  this));
74 
75  // Bring up the TCP listener socket
76  tcpserver_transport_ = boost::make_shared<TransportTCP>(&poll_manager_->getPollSet());
79  boost::bind(&ConnectionManager::tcprosAcceptConnection, this, _1)))
80  {
81  ROS_FATAL("Listen on port [%d] failed", network::getTCPROSPort());
82  ROS_BREAK();
83  }
84 
85  // Bring up the UDP listener socket
86  udpserver_transport_ = boost::make_shared<TransportUDP>(&poll_manager_->getPollSet());
87  if (!udpserver_transport_->createIncoming(0, true))
88  {
89  ROS_FATAL("Listen failed");
90  ROS_BREAK();
91  }
92 }
93 
95 {
97  {
98  udpserver_transport_->close();
99  udpserver_transport_.reset();
100  }
101 
103  {
104  tcpserver_transport_->close();
105  tcpserver_transport_.reset();
106  }
107 
108  poll_manager_->removePollThreadListener(poll_conn_);
109 
111 }
112 
114 {
115  S_Connection local_connections;
116  {
117  boost::mutex::scoped_lock conn_lock(connections_mutex_);
118  local_connections.swap(connections_);
119  }
120 
121  for(S_Connection::iterator itr = local_connections.begin();
122  itr != local_connections.end();
123  itr++)
124  {
125  const ConnectionPtr& conn = *itr;
126  conn->drop(reason);
127  }
128 
129  boost::mutex::scoped_lock dropped_lock(dropped_connections_mutex_);
130  dropped_connections_.clear();
131 }
132 
134 {
135  return tcpserver_transport_->getServerPort();
136 }
137 
139 {
140  return udpserver_transport_->getServerPort();
141 }
142 
144 {
145  boost::mutex::scoped_lock lock(connection_id_counter_mutex_);
146  uint32_t ret = connection_id_counter_++;
147  return ret;
148 }
149 
151 {
152  boost::mutex::scoped_lock lock(connections_mutex_);
153 
154  connections_.insert(conn);
155  conn->addDropListener(boost::bind(&ConnectionManager::onConnectionDropped, this, _1));
156 }
157 
159 {
160  boost::mutex::scoped_lock lock(dropped_connections_mutex_);
161  dropped_connections_.push_back(conn);
162 }
163 
165 {
166  V_Connection local_dropped;
167  {
168  boost::mutex::scoped_lock dropped_lock(dropped_connections_mutex_);
169  dropped_connections_.swap(local_dropped);
170  }
171 
172  boost::mutex::scoped_lock conn_lock(connections_mutex_);
173 
174  V_Connection::iterator conn_it = local_dropped.begin();
175  V_Connection::iterator conn_end = local_dropped.end();
176  for (;conn_it != conn_end; ++conn_it)
177  {
178  const ConnectionPtr& conn = *conn_it;
179  connections_.erase(conn);
180  }
181 }
182 
184 {
185  std::string client_uri = ""; // TODO: transport->getClientURI();
186  ROSCPP_LOG_DEBUG("UDPROS received a connection from [%s]", client_uri.c_str());
187 
188  ConnectionPtr conn(boost::make_shared<Connection>());
189  addConnection(conn);
190 
191  conn->initialize(transport, true, NULL);
192  onConnectionHeaderReceived(conn, header);
193 }
194 
196 {
197  std::string client_uri = transport->getClientURI();
198  ROSCPP_LOG_DEBUG("TCPROS received a connection from [%s]", client_uri.c_str());
199 
200  ConnectionPtr conn(boost::make_shared<Connection>());
201  addConnection(conn);
202 
203  conn->initialize(transport, true, boost::bind(&ConnectionManager::onConnectionHeaderReceived, this, _1, _2));
204 }
205 
207 {
208  bool ret = false;
209  std::string val;
210  if (header.getValue("topic", val))
211  {
212  ROSCPP_CONN_LOG_DEBUG("Connection: Creating TransportSubscriberLink for topic [%s] connected to [%s]",
213  val.c_str(), conn->getRemoteString().c_str());
214 
215  TransportSubscriberLinkPtr sub_link(boost::make_shared<TransportSubscriberLink>());
216  sub_link->initialize(conn);
217  ret = sub_link->handleHeader(header);
218  }
219  else if (header.getValue("service", val))
220  {
221  ROSCPP_LOG_DEBUG("Connection: Creating ServiceClientLink for service [%s] connected to [%s]",
222  val.c_str(), conn->getRemoteString().c_str());
223 
224  ServiceClientLinkPtr link(boost::make_shared<ServiceClientLink>());
225  link->initialize(conn);
226  ret = link->handleHeader(header);
227  }
228  else
229  {
230  ROSCPP_LOG_DEBUG("Got a connection for a type other than 'topic' or 'service' from [%s]. Fail.",
231  conn->getRemoteString().c_str());
232  return false;
233  }
234 
235  return ret;
236 }
237 
238 }
static const int MAX_TCPROS_CONN_QUEUE
void tcprosAcceptConnection(const TransportTCPPtr &transport)
void udprosIncomingConnection(const TransportUDPPtr &transport, Header &header)
#define ROS_FATAL(...)
void addConnection(const ConnectionPtr &connection)
Add a connection to be tracked by the node. Will automatically remove them if they&#39;ve been dropped...
TransportTCPPtr tcpserver_transport_
void onConnectionDropped(const ConnectionPtr &conn)
#define ROSCPP_LOG_DEBUG(...)
Definition: file_log.h:35
boost::mutex connection_id_counter_mutex_
static const ConnectionManagerPtr & instance()
uint32_t getNewConnectionID()
Get a new connection ID.
boost::mutex g_connection_manager_mutex
std::set< ConnectionPtr > S_Connection
Definition: forwards.h:64
void clear(Connection::DropReason reason)
bool getValue(const std::string &key, std::string &value) const
std::vector< ConnectionPtr > V_Connection
Definition: forwards.h:65
bool onConnectionHeaderReceived(const ConnectionPtr &conn, const Header &header)
boost::signals2::connection poll_conn_
ConnectionManagerPtr g_connection_manager
static const PollManagerPtr & instance()
TransportUDPPtr udpserver_transport_
#define ROSCPP_CONN_LOG_DEBUG(...)
Definition: file_log.h:36
#define ROS_BREAK()
boost::mutex dropped_connections_mutex_
ROSCPP_DECL uint16_t getTCPROSPort()
Definition: network.cpp:75
V_Connection dropped_connections_
PollManagerPtr poll_manager_


roscpp
Author(s): Morgan Quigley, Josh Faust, Brian Gerkey, Troy Straszheim
autogenerated on Fri Jun 8 2018 02:54:34