00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028 #include "ros/connection_manager.h"
00029 #include "ros/poll_manager.h"
00030 #include "ros/connection.h"
00031 #include "ros/transport_subscriber_link.h"
00032 #include "ros/service_client_link.h"
00033 #include "ros/transport/transport_tcp.h"
00034 #include "ros/transport/transport_udp.h"
00035 #include "ros/file_log.h"
00036 #include "ros/network.h"
00037
00038 #include <ros/assert.h>
00039
00040 namespace ros
00041 {
00042
00043 ConnectionManagerPtr g_connection_manager;
00044 boost::mutex g_connection_manager_mutex;
00045 const ConnectionManagerPtr& ConnectionManager::instance()
00046 {
00047 if (!g_connection_manager)
00048 {
00049 boost::mutex::scoped_lock lock(g_connection_manager_mutex);
00050 if (!g_connection_manager)
00051 {
00052 g_connection_manager.reset(new ConnectionManager);
00053 }
00054 }
00055
00056 return g_connection_manager;
00057 }
00058
00059 ConnectionManager::ConnectionManager()
00060 : connection_id_counter_(0)
00061 {
00062 }
00063
00064 ConnectionManager::~ConnectionManager()
00065 {
00066 shutdown();
00067 }
00068
00069 void ConnectionManager::start()
00070 {
00071 poll_manager_ = PollManager::instance();
00072 poll_conn_ = poll_manager_->addPollThreadListener(boost::bind(&ConnectionManager::removeDroppedConnections,
00073 this));
00074
00075
00076 tcpserver_transport_ = TransportTCPPtr(new TransportTCP(&poll_manager_->getPollSet()));
00077 if (!tcpserver_transport_->listen(network::getTCPROSPort(),
00078 MAX_TCPROS_CONN_QUEUE,
00079 boost::bind(&ConnectionManager::tcprosAcceptConnection, this, _1)))
00080 {
00081 ROS_FATAL("Listen on port [%d] failed", network::getTCPROSPort());
00082 ROS_BREAK();
00083 }
00084
00085
00086 udpserver_transport_ = TransportUDPPtr(new TransportUDP(&poll_manager_->getPollSet()));
00087 if (!udpserver_transport_->createIncoming(0, true))
00088 {
00089 ROS_FATAL("Listen failed");
00090 ROS_BREAK();
00091 }
00092 }
00093
00094 void ConnectionManager::shutdown()
00095 {
00096 if (udpserver_transport_)
00097 {
00098 udpserver_transport_->close();
00099 udpserver_transport_.reset();
00100 }
00101
00102 if (tcpserver_transport_)
00103 {
00104 tcpserver_transport_->close();
00105 tcpserver_transport_.reset();
00106 }
00107
00108 poll_manager_->removePollThreadListener(poll_conn_);
00109
00110 clear(Connection::Destructing);
00111 }
00112
00113 void ConnectionManager::clear(Connection::DropReason reason)
00114 {
00115 S_Connection local_connections;
00116 {
00117 boost::mutex::scoped_lock conn_lock(connections_mutex_);
00118 local_connections.swap(connections_);
00119 }
00120
00121 for(S_Connection::iterator itr = local_connections.begin();
00122 itr != local_connections.end();
00123 itr++)
00124 {
00125 const ConnectionPtr& conn = *itr;
00126 conn->drop(reason);
00127 }
00128
00129 boost::mutex::scoped_lock dropped_lock(dropped_connections_mutex_);
00130 dropped_connections_.clear();
00131 }
00132
00133 uint32_t ConnectionManager::getTCPPort()
00134 {
00135 return tcpserver_transport_->getServerPort();
00136 }
00137
00138 uint32_t ConnectionManager::getUDPPort()
00139 {
00140 return udpserver_transport_->getServerPort();
00141 }
00142
00143 uint32_t ConnectionManager::getNewConnectionID()
00144 {
00145 boost::mutex::scoped_lock lock(connection_id_counter_mutex_);
00146 uint32_t ret = connection_id_counter_++;
00147 return ret;
00148 }
00149
00150 void ConnectionManager::addConnection(const ConnectionPtr& conn)
00151 {
00152 boost::mutex::scoped_lock lock(connections_mutex_);
00153
00154 connections_.insert(conn);
00155 conn->addDropListener(boost::bind(&ConnectionManager::onConnectionDropped, this, _1));
00156 }
00157
00158 void ConnectionManager::onConnectionDropped(const ConnectionPtr& conn)
00159 {
00160 boost::mutex::scoped_lock lock(dropped_connections_mutex_);
00161 dropped_connections_.push_back(conn);
00162 }
00163
00164 void ConnectionManager::removeDroppedConnections()
00165 {
00166 V_Connection local_dropped;
00167 {
00168 boost::mutex::scoped_lock dropped_lock(dropped_connections_mutex_);
00169 dropped_connections_.swap(local_dropped);
00170 }
00171
00172 boost::mutex::scoped_lock conn_lock(connections_mutex_);
00173
00174 V_Connection::iterator conn_it = local_dropped.begin();
00175 V_Connection::iterator conn_end = local_dropped.end();
00176 for (;conn_it != conn_end; ++conn_it)
00177 {
00178 const ConnectionPtr& conn = *conn_it;
00179 connections_.erase(conn);
00180 }
00181 }
00182
00183 void ConnectionManager::udprosIncomingConnection(const TransportUDPPtr& transport, Header& header)
00184 {
00185 std::string client_uri = "";
00186 ROSCPP_LOG_DEBUG("UDPROS received a connection from [%s]", client_uri.c_str());
00187
00188 ConnectionPtr conn(new Connection());
00189 addConnection(conn);
00190
00191 conn->initialize(transport, true, NULL);
00192 onConnectionHeaderReceived(conn, header);
00193 }
00194
00195 void ConnectionManager::tcprosAcceptConnection(const TransportTCPPtr& transport)
00196 {
00197 std::string client_uri = transport->getClientURI();
00198 ROSCPP_LOG_DEBUG("TCPROS received a connection from [%s]", client_uri.c_str());
00199
00200 ConnectionPtr conn(new Connection());
00201 addConnection(conn);
00202
00203 conn->initialize(transport, true, boost::bind(&ConnectionManager::onConnectionHeaderReceived, this, _1, _2));
00204 }
00205
00206 bool ConnectionManager::onConnectionHeaderReceived(const ConnectionPtr& conn, const Header& header)
00207 {
00208 bool ret = false;
00209 std::string val;
00210 if (header.getValue("topic", val))
00211 {
00212 ROSCPP_LOG_DEBUG("Connection: Creating TransportSubscriberLink for topic [%s] connected to [%s]",
00213 val.c_str(), conn->getRemoteString().c_str());
00214
00215 TransportSubscriberLinkPtr sub_link(new TransportSubscriberLink());
00216 sub_link->initialize(conn);
00217 ret = sub_link->handleHeader(header);
00218 }
00219 else if (header.getValue("service", val))
00220 {
00221 ROSCPP_LOG_DEBUG("Connection: Creating ServiceClientLink for service [%s] connected to [%s]",
00222 val.c_str(), conn->getRemoteString().c_str());
00223
00224 ServiceClientLinkPtr link(new ServiceClientLink());
00225 link->initialize(conn);
00226 ret = link->handleHeader(header);
00227 }
00228 else
00229 {
00230 ROSCPP_LOG_DEBUG("Got a connection for a type other than 'topic' or 'service' from [%s]. Fail.",
00231 conn->getRemoteString().c_str());
00232 return false;
00233 }
00234
00235 return ret;
00236 }
00237
00238 }