00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028 #include "ros/connection_manager.h"
00029 #include "ros/poll_manager.h"
00030 #include "ros/connection.h"
00031 #include "ros/transport_subscriber_link.h"
00032 #include "ros/service_client_link.h"
00033 #include "ros/transport/transport_tcp.h"
00034 #include "ros/transport/transport_udp.h"
00035 #include "ros/file_log.h"
00036 #include "ros/network.h"
00037
00038 #include <ros/assert.h>
00039
00040 namespace ros
00041 {
00042
00043 ConnectionManagerPtr g_connection_manager;
00044 boost::mutex g_connection_manager_mutex;
00045 const ConnectionManagerPtr& ConnectionManager::instance()
00046 {
00047 if (!g_connection_manager)
00048 {
00049 boost::mutex::scoped_lock lock(g_connection_manager_mutex);
00050 if (!g_connection_manager)
00051 {
00052 g_connection_manager.reset(new ConnectionManager);
00053 }
00054 }
00055
00056 return g_connection_manager;
00057 }
00058
00059 ConnectionManager::ConnectionManager()
00060 : connection_id_counter_(0)
00061 {
00062 }
00063
00064 ConnectionManager::~ConnectionManager()
00065 {
00066 shutdown();
00067 }
00068
00069 void ConnectionManager::start()
00070 {
00071 poll_manager_ = PollManager::instance();
00072 poll_conn_ = poll_manager_->addPollThreadListener(boost::bind(&ConnectionManager::removeDroppedConnections, this));
00073
00074
00075 tcpserver_transport_ = TransportTCPPtr(new TransportTCP(&poll_manager_->getPollSet()));
00076 if (!tcpserver_transport_->listen(network::getTCPROSPort(), MAX_TCPROS_CONN_QUEUE, boost::bind(&ConnectionManager::tcprosAcceptConnection, this, _1)))
00077 {
00078 ROS_FATAL("Listen on port [%d] failed", network::getTCPROSPort());
00079 ROS_BREAK();
00080 }
00081
00082
00083 udpserver_transport_ = TransportUDPPtr(new TransportUDP(&poll_manager_->getPollSet()));
00084 if (!udpserver_transport_->createIncoming(0, true))
00085 {
00086 ROS_FATAL("Listen failed");
00087 ROS_BREAK();
00088 }
00089 }
00090
00091 void ConnectionManager::shutdown()
00092 {
00093 if (udpserver_transport_)
00094 {
00095 udpserver_transport_->close();
00096 udpserver_transport_.reset();
00097 }
00098
00099 if (tcpserver_transport_)
00100 {
00101 tcpserver_transport_->close();
00102 tcpserver_transport_.reset();
00103 }
00104
00105 poll_manager_->removePollThreadListener(poll_conn_);
00106
00107 clear(Connection::Destructing);
00108 }
00109
00110 void ConnectionManager::clear(Connection::DropReason reason)
00111 {
00112 S_Connection local_connections;
00113 {
00114 boost::mutex::scoped_lock conn_lock(connections_mutex_);
00115 local_connections.swap(connections_);
00116 }
00117
00118 for(S_Connection::iterator itr = local_connections.begin();
00119 itr != local_connections.end();
00120 itr++)
00121 {
00122 const ConnectionPtr& conn = *itr;
00123 conn->drop(reason);
00124 }
00125
00126 boost::mutex::scoped_lock dropped_lock(dropped_connections_mutex_);
00127 dropped_connections_.clear();
00128 }
00129
00130 uint32_t ConnectionManager::getTCPPort()
00131 {
00132 return tcpserver_transport_->getServerPort();
00133 }
00134
00135 uint32_t ConnectionManager::getUDPPort()
00136 {
00137 return udpserver_transport_->getServerPort();
00138 }
00139
00140 uint32_t ConnectionManager::getNewConnectionID()
00141 {
00142 boost::mutex::scoped_lock lock(connection_id_counter_mutex_);
00143 uint32_t ret = connection_id_counter_++;
00144 return ret;
00145 }
00146
00147 void ConnectionManager::addConnection(const ConnectionPtr& conn)
00148 {
00149 boost::mutex::scoped_lock lock(connections_mutex_);
00150
00151 connections_.insert(conn);
00152 conn->addDropListener(boost::bind(&ConnectionManager::onConnectionDropped, this, _1));
00153 }
00154
00155 void ConnectionManager::onConnectionDropped(const ConnectionPtr& conn)
00156 {
00157 boost::mutex::scoped_lock lock(dropped_connections_mutex_);
00158 dropped_connections_.push_back(conn);
00159 }
00160
00161 void ConnectionManager::removeDroppedConnections()
00162 {
00163 V_Connection local_dropped;
00164 {
00165 boost::mutex::scoped_lock dropped_lock(dropped_connections_mutex_);
00166 dropped_connections_.swap(local_dropped);
00167 }
00168
00169 boost::mutex::scoped_lock conn_lock(connections_mutex_);
00170
00171 V_Connection::iterator conn_it = local_dropped.begin();
00172 V_Connection::iterator conn_end = local_dropped.end();
00173 for (;conn_it != conn_end; ++conn_it)
00174 {
00175 const ConnectionPtr& conn = *conn_it;
00176 connections_.erase(conn);
00177 }
00178 }
00179
00180 void ConnectionManager::udprosIncomingConnection(const TransportUDPPtr& transport, Header& header)
00181 {
00182 std::string client_uri = "";
00183 ROSCPP_LOG_DEBUG("UDPROS received a connection from [%s]", client_uri.c_str());
00184
00185 ConnectionPtr conn(new Connection());
00186 addConnection(conn);
00187
00188 conn->initialize(transport, true, NULL);
00189 onConnectionHeaderReceived(conn, header);
00190 }
00191
00192 void ConnectionManager::tcprosAcceptConnection(const TransportTCPPtr& transport)
00193 {
00194 std::string client_uri = transport->getClientURI();
00195 ROSCPP_LOG_DEBUG("TCPROS received a connection from [%s]", client_uri.c_str());
00196
00197 ConnectionPtr conn(new Connection());
00198 addConnection(conn);
00199
00200 conn->initialize(transport, true, boost::bind(&ConnectionManager::onConnectionHeaderReceived, this, _1, _2));
00201 }
00202
00203 bool ConnectionManager::onConnectionHeaderReceived(const ConnectionPtr& conn, const Header& header)
00204 {
00205 bool ret = false;
00206 std::string val;
00207 if (header.getValue("topic", val))
00208 {
00209 ROSCPP_LOG_DEBUG("Connection: Creating TransportSubscriberLink for topic [%s] connected to [%s]", val.c_str(), conn->getRemoteString().c_str());
00210
00211 TransportSubscriberLinkPtr sub_link(new TransportSubscriberLink());
00212 sub_link->initialize(conn);
00213 ret = sub_link->handleHeader(header);
00214 }
00215 else if (header.getValue("service", val))
00216 {
00217 ROSCPP_LOG_DEBUG("Connection: Creating ServiceClientLink for service [%s] connected to [%s]", val.c_str(), conn->getRemoteString().c_str());
00218
00219 ServiceClientLinkPtr link(new ServiceClientLink());
00220 link->initialize(conn);
00221 ret = link->handleHeader(header);
00222 }
00223 else
00224 {
00225 ROSCPP_LOG_DEBUG("Got a connection for a type other than 'topic' or 'service' from [%s]. Fail.", conn->getRemoteString().c_str());
00226 return false;
00227 }
00228
00229 return ret;
00230 }
00231
00232 }