$search
00001 /* 00002 * Copyright (C) 2009, Willow Garage, Inc. 00003 * 00004 * Redistribution and use in source and binary forms, with or without 00005 * modification, are permitted provided that the following conditions are met: 00006 * * Redistributions of source code must retain the above copyright notice, 00007 * this list of conditions and the following disclaimer. 00008 * * Redistributions in binary form must reproduce the above copyright 00009 * notice, this list of conditions and the following disclaimer in the 00010 * documentation and/or other materials provided with the distribution. 00011 * * Neither the names of Willow Garage, Inc. nor the names of its 00012 * contributors may be used to endorse or promote products derived from 00013 * this software without specific prior written permission. 00014 * 00015 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" 00016 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 00017 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 00018 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE 00019 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 00020 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 00021 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 00022 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 00023 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) 00024 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 00025 * POSSIBILITY OF SUCH DAMAGE. 00026 */ 00027 00028 #include "ros/connection_manager.h" 00029 #include "ros/poll_manager.h" 00030 #include "ros/connection.h" 00031 #include "ros/transport_subscriber_link.h" 00032 #include "ros/service_client_link.h" 00033 #include "ros/transport/transport_tcp.h" 00034 #include "ros/transport/transport_udp.h" 00035 #include "ros/file_log.h" 00036 #include "ros/network.h" 00037 00038 #include <ros/assert.h> 00039 00040 namespace ros 00041 { 00042 00043 ConnectionManagerPtr g_connection_manager; 00044 boost::mutex g_connection_manager_mutex; 00045 const ConnectionManagerPtr& ConnectionManager::instance() 00046 { 00047 if (!g_connection_manager) 00048 { 00049 boost::mutex::scoped_lock lock(g_connection_manager_mutex); 00050 if (!g_connection_manager) 00051 { 00052 g_connection_manager.reset(new ConnectionManager); 00053 } 00054 } 00055 00056 return g_connection_manager; 00057 } 00058 00059 ConnectionManager::ConnectionManager() 00060 : connection_id_counter_(0) 00061 { 00062 } 00063 00064 ConnectionManager::~ConnectionManager() 00065 { 00066 shutdown(); 00067 } 00068 00069 void ConnectionManager::start() 00070 { 00071 poll_manager_ = PollManager::instance(); 00072 poll_conn_ = poll_manager_->addPollThreadListener(boost::bind(&ConnectionManager::removeDroppedConnections, 00073 this)); 00074 00075 // Bring up the TCP listener socket 00076 tcpserver_transport_ = TransportTCPPtr(new TransportTCP(&poll_manager_->getPollSet())); 00077 if (!tcpserver_transport_->listen(network::getTCPROSPort(), 00078 MAX_TCPROS_CONN_QUEUE, 00079 boost::bind(&ConnectionManager::tcprosAcceptConnection, this, _1))) 00080 { 00081 ROS_FATAL("Listen on port [%d] failed", network::getTCPROSPort()); 00082 ROS_BREAK(); 00083 } 00084 00085 // Bring up the UDP listener socket 00086 udpserver_transport_ = TransportUDPPtr(new TransportUDP(&poll_manager_->getPollSet())); 00087 if (!udpserver_transport_->createIncoming(0, true)) 00088 { 00089 ROS_FATAL("Listen failed"); 00090 ROS_BREAK(); 00091 } 00092 } 00093 00094 void ConnectionManager::shutdown() 00095 { 00096 if (udpserver_transport_) 00097 { 00098 udpserver_transport_->close(); 00099 udpserver_transport_.reset(); 00100 } 00101 00102 if (tcpserver_transport_) 00103 { 00104 tcpserver_transport_->close(); 00105 tcpserver_transport_.reset(); 00106 } 00107 00108 poll_manager_->removePollThreadListener(poll_conn_); 00109 00110 clear(Connection::Destructing); 00111 } 00112 00113 void ConnectionManager::clear(Connection::DropReason reason) 00114 { 00115 S_Connection local_connections; 00116 { 00117 boost::mutex::scoped_lock conn_lock(connections_mutex_); 00118 local_connections.swap(connections_); 00119 } 00120 00121 for(S_Connection::iterator itr = local_connections.begin(); 00122 itr != local_connections.end(); 00123 itr++) 00124 { 00125 const ConnectionPtr& conn = *itr; 00126 conn->drop(reason); 00127 } 00128 00129 boost::mutex::scoped_lock dropped_lock(dropped_connections_mutex_); 00130 dropped_connections_.clear(); 00131 } 00132 00133 uint32_t ConnectionManager::getTCPPort() 00134 { 00135 return tcpserver_transport_->getServerPort(); 00136 } 00137 00138 uint32_t ConnectionManager::getUDPPort() 00139 { 00140 return udpserver_transport_->getServerPort(); 00141 } 00142 00143 uint32_t ConnectionManager::getNewConnectionID() 00144 { 00145 boost::mutex::scoped_lock lock(connection_id_counter_mutex_); 00146 uint32_t ret = connection_id_counter_++; 00147 return ret; 00148 } 00149 00150 void ConnectionManager::addConnection(const ConnectionPtr& conn) 00151 { 00152 boost::mutex::scoped_lock lock(connections_mutex_); 00153 00154 connections_.insert(conn); 00155 conn->addDropListener(boost::bind(&ConnectionManager::onConnectionDropped, this, _1)); 00156 } 00157 00158 void ConnectionManager::onConnectionDropped(const ConnectionPtr& conn) 00159 { 00160 boost::mutex::scoped_lock lock(dropped_connections_mutex_); 00161 dropped_connections_.push_back(conn); 00162 } 00163 00164 void ConnectionManager::removeDroppedConnections() 00165 { 00166 V_Connection local_dropped; 00167 { 00168 boost::mutex::scoped_lock dropped_lock(dropped_connections_mutex_); 00169 dropped_connections_.swap(local_dropped); 00170 } 00171 00172 boost::mutex::scoped_lock conn_lock(connections_mutex_); 00173 00174 V_Connection::iterator conn_it = local_dropped.begin(); 00175 V_Connection::iterator conn_end = local_dropped.end(); 00176 for (;conn_it != conn_end; ++conn_it) 00177 { 00178 const ConnectionPtr& conn = *conn_it; 00179 connections_.erase(conn); 00180 } 00181 } 00182 00183 void ConnectionManager::udprosIncomingConnection(const TransportUDPPtr& transport, Header& header) 00184 { 00185 std::string client_uri = ""; // TODO: transport->getClientURI(); 00186 ROSCPP_LOG_DEBUG("UDPROS received a connection from [%s]", client_uri.c_str()); 00187 00188 ConnectionPtr conn(new Connection()); 00189 addConnection(conn); 00190 00191 conn->initialize(transport, true, NULL); 00192 onConnectionHeaderReceived(conn, header); 00193 } 00194 00195 void ConnectionManager::tcprosAcceptConnection(const TransportTCPPtr& transport) 00196 { 00197 std::string client_uri = transport->getClientURI(); 00198 ROSCPP_LOG_DEBUG("TCPROS received a connection from [%s]", client_uri.c_str()); 00199 00200 ConnectionPtr conn(new Connection()); 00201 addConnection(conn); 00202 00203 conn->initialize(transport, true, boost::bind(&ConnectionManager::onConnectionHeaderReceived, this, _1, _2)); 00204 } 00205 00206 bool ConnectionManager::onConnectionHeaderReceived(const ConnectionPtr& conn, const Header& header) 00207 { 00208 bool ret = false; 00209 std::string val; 00210 if (header.getValue("topic", val)) 00211 { 00212 ROSCPP_LOG_DEBUG("Connection: Creating TransportSubscriberLink for topic [%s] connected to [%s]", 00213 val.c_str(), conn->getRemoteString().c_str()); 00214 00215 TransportSubscriberLinkPtr sub_link(new TransportSubscriberLink()); 00216 sub_link->initialize(conn); 00217 ret = sub_link->handleHeader(header); 00218 } 00219 else if (header.getValue("service", val)) 00220 { 00221 ROSCPP_LOG_DEBUG("Connection: Creating ServiceClientLink for service [%s] connected to [%s]", 00222 val.c_str(), conn->getRemoteString().c_str()); 00223 00224 ServiceClientLinkPtr link(new ServiceClientLink()); 00225 link->initialize(conn); 00226 ret = link->handleHeader(header); 00227 } 00228 else 00229 { 00230 ROSCPP_LOG_DEBUG("Got a connection for a type other than 'topic' or 'service' from [%s]. Fail.", 00231 conn->getRemoteString().c_str()); 00232 return false; 00233 } 00234 00235 return ret; 00236 } 00237 00238 }