Go to the documentation of this file.00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028 #include "ros/connection_manager.h"
00029 #include "ros/poll_manager.h"
00030 #include "ros/connection.h"
00031 #include "ros/transport_subscriber_link.h"
00032 #include "ros/service_client_link.h"
00033 #include "ros/transport/transport_tcp.h"
00034 #include "ros/transport/transport_udp.h"
00035 #include "ros/file_log.h"
00036 #include "ros/network.h"
00037
00038 #include <ros/assert.h>
00039
00040 namespace ros
00041 {
00042
00043 ConnectionManagerPtr g_connection_manager;
00044 boost::mutex g_connection_manager_mutex;
00045 const ConnectionManagerPtr& ConnectionManager::instance()
00046 {
00047 if (!g_connection_manager)
00048 {
00049 boost::mutex::scoped_lock lock(g_connection_manager_mutex);
00050 if (!g_connection_manager)
00051 {
00052 g_connection_manager = boost::make_shared<ConnectionManager>();
00053 }
00054 }
00055
00056 return g_connection_manager;
00057 }
00058
00059 ConnectionManager::ConnectionManager()
00060 : connection_id_counter_(0)
00061 {
00062 }
00063
00064 ConnectionManager::~ConnectionManager()
00065 {
00066 shutdown();
00067 }
00068
00069 void ConnectionManager::start()
00070 {
00071 poll_manager_ = PollManager::instance();
00072 poll_conn_ = poll_manager_->addPollThreadListener(boost::bind(&ConnectionManager::removeDroppedConnections,
00073 this));
00074
00075
00076 tcpserver_transport_ = boost::make_shared<TransportTCP>(&poll_manager_->getPollSet());
00077 if (!tcpserver_transport_->listen(network::getTCPROSPort(),
00078 MAX_TCPROS_CONN_QUEUE,
00079 boost::bind(&ConnectionManager::tcprosAcceptConnection, this, _1)))
00080 {
00081 ROS_FATAL("Listen on port [%d] failed", network::getTCPROSPort());
00082 ROS_BREAK();
00083 }
00084
00085
00086 udpserver_transport_ = boost::make_shared<TransportUDP>(&poll_manager_->getPollSet());
00087 if (!udpserver_transport_->createIncoming(0, true))
00088 {
00089 ROS_FATAL("Listen failed");
00090 ROS_BREAK();
00091 }
00092 }
00093
00094 void ConnectionManager::shutdown()
00095 {
00096 if (udpserver_transport_)
00097 {
00098 udpserver_transport_->close();
00099 udpserver_transport_.reset();
00100 }
00101
00102 if (tcpserver_transport_)
00103 {
00104 tcpserver_transport_->close();
00105 tcpserver_transport_.reset();
00106 }
00107
00108 poll_manager_->removePollThreadListener(poll_conn_);
00109
00110 clear(Connection::Destructing);
00111 }
00112
00113 void ConnectionManager::clear(Connection::DropReason reason)
00114 {
00115 S_Connection local_connections;
00116 {
00117 boost::mutex::scoped_lock conn_lock(connections_mutex_);
00118 local_connections.swap(connections_);
00119 }
00120
00121 for(S_Connection::iterator itr = local_connections.begin();
00122 itr != local_connections.end();
00123 itr++)
00124 {
00125 const ConnectionPtr& conn = *itr;
00126 conn->drop(reason);
00127 }
00128
00129 boost::mutex::scoped_lock dropped_lock(dropped_connections_mutex_);
00130 dropped_connections_.clear();
00131 }
00132
00133 uint32_t ConnectionManager::getTCPPort()
00134 {
00135 return tcpserver_transport_->getServerPort();
00136 }
00137
00138 uint32_t ConnectionManager::getUDPPort()
00139 {
00140 return udpserver_transport_->getServerPort();
00141 }
00142
00143 uint32_t ConnectionManager::getNewConnectionID()
00144 {
00145 boost::mutex::scoped_lock lock(connection_id_counter_mutex_);
00146 uint32_t ret = connection_id_counter_++;
00147 return ret;
00148 }
00149
00150 void ConnectionManager::addConnection(const ConnectionPtr& conn)
00151 {
00152 boost::mutex::scoped_lock lock(connections_mutex_);
00153
00154 connections_.insert(conn);
00155 conn->addDropListener(boost::bind(&ConnectionManager::onConnectionDropped, this, _1));
00156 }
00157
00158 void ConnectionManager::onConnectionDropped(const ConnectionPtr& conn)
00159 {
00160 boost::mutex::scoped_lock lock(dropped_connections_mutex_);
00161 dropped_connections_.push_back(conn);
00162 }
00163
00164 void ConnectionManager::removeDroppedConnections()
00165 {
00166 V_Connection local_dropped;
00167 {
00168 boost::mutex::scoped_lock dropped_lock(dropped_connections_mutex_);
00169 dropped_connections_.swap(local_dropped);
00170 }
00171
00172 boost::mutex::scoped_lock conn_lock(connections_mutex_);
00173
00174 V_Connection::iterator conn_it = local_dropped.begin();
00175 V_Connection::iterator conn_end = local_dropped.end();
00176 for (;conn_it != conn_end; ++conn_it)
00177 {
00178 const ConnectionPtr& conn = *conn_it;
00179 connections_.erase(conn);
00180 }
00181 }
00182
00183 void ConnectionManager::udprosIncomingConnection(const TransportUDPPtr& transport, Header& header)
00184 {
00185 std::string client_uri = "";
00186 ROSCPP_LOG_DEBUG("UDPROS received a connection from [%s]", client_uri.c_str());
00187
00188 ConnectionPtr conn(boost::make_shared<Connection>());
00189 addConnection(conn);
00190
00191 conn->initialize(transport, true, NULL);
00192 onConnectionHeaderReceived(conn, header);
00193 }
00194
00195 void ConnectionManager::tcprosAcceptConnection(const TransportTCPPtr& transport)
00196 {
00197 std::string client_uri = transport->getClientURI();
00198 ROSCPP_LOG_DEBUG("TCPROS received a connection from [%s]", client_uri.c_str());
00199
00200 ConnectionPtr conn(boost::make_shared<Connection>());
00201 addConnection(conn);
00202
00203 conn->initialize(transport, true, boost::bind(&ConnectionManager::onConnectionHeaderReceived, this, _1, _2));
00204 }
00205
00206 bool ConnectionManager::onConnectionHeaderReceived(const ConnectionPtr& conn, const Header& header)
00207 {
00208 bool ret = false;
00209 std::string val;
00210 if (header.getValue("topic", val))
00211 {
00212 ROSCPP_LOG_DEBUG("Connection: Creating TransportSubscriberLink for topic [%s] connected to [%s]",
00213 val.c_str(), conn->getRemoteString().c_str());
00214
00215 TransportSubscriberLinkPtr sub_link(boost::make_shared<TransportSubscriberLink>());
00216 sub_link->initialize(conn);
00217 ret = sub_link->handleHeader(header);
00218 }
00219 else if (header.getValue("service", val))
00220 {
00221 ROSCPP_LOG_DEBUG("Connection: Creating ServiceClientLink for service [%s] connected to [%s]",
00222 val.c_str(), conn->getRemoteString().c_str());
00223
00224 ServiceClientLinkPtr link(boost::make_shared<ServiceClientLink>());
00225 link->initialize(conn);
00226 ret = link->handleHeader(header);
00227 }
00228 else
00229 {
00230 ROSCPP_LOG_DEBUG("Got a connection for a type other than 'topic' or 'service' from [%s]. Fail.",
00231 conn->getRemoteString().c_str());
00232 return false;
00233 }
00234
00235 return ret;
00236 }
00237
00238 }
roscpp
Author(s): Morgan Quigley, Josh Faust, Brian Gerkey, Troy Straszheim
autogenerated on Thu Jun 6 2019 21:10:05