connection_manager.cpp
Go to the documentation of this file.
00001 /*
00002  * Copyright (C) 2009, Willow Garage, Inc.
00003  *
00004  * Redistribution and use in source and binary forms, with or without
00005  * modification, are permitted provided that the following conditions are met:
00006  *   * Redistributions of source code must retain the above copyright notice,
00007  *     this list of conditions and the following disclaimer.
00008  *   * Redistributions in binary form must reproduce the above copyright
00009  *     notice, this list of conditions and the following disclaimer in the
00010  *     documentation and/or other materials provided with the distribution.
00011  *   * Neither the names of Willow Garage, Inc. nor the names of its
00012  *     contributors may be used to endorse or promote products derived from
00013  *     this software without specific prior written permission.
00014  *
00015  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
00016  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
00017  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
00018  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
00019  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
00020  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
00021  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
00022  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
00023  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
00024  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
00025  * POSSIBILITY OF SUCH DAMAGE.
00026  */
00027 
00028 #include "ros/connection_manager.h"
00029 #include "ros/poll_manager.h"
00030 #include "ros/connection.h"
00031 #include "ros/transport_subscriber_link.h"
00032 #include "ros/service_client_link.h"
00033 #include "ros/transport/transport_tcp.h"
00034 #include "ros/transport/transport_udp.h"
00035 #include "ros/file_log.h"
00036 #include "ros/network.h"
00037 
00038 #include <ros/assert.h>
00039 
00040 namespace ros
00041 {
00042 
00043 ConnectionManagerPtr g_connection_manager;
00044 boost::mutex g_connection_manager_mutex;
00045 const ConnectionManagerPtr& ConnectionManager::instance()
00046 {
00047   if (!g_connection_manager)
00048   {
00049     boost::mutex::scoped_lock lock(g_connection_manager_mutex);
00050     if (!g_connection_manager)
00051     {
00052       g_connection_manager = boost::make_shared<ConnectionManager>();
00053     }
00054   }
00055 
00056   return g_connection_manager;
00057 }
00058 
00059 ConnectionManager::ConnectionManager()
00060 : connection_id_counter_(0)
00061 {
00062 }
00063 
00064 ConnectionManager::~ConnectionManager()
00065 {
00066   shutdown();
00067 }
00068 
00069 void ConnectionManager::start()
00070 {
00071   poll_manager_ = PollManager::instance();
00072   poll_conn_ = poll_manager_->addPollThreadListener(boost::bind(&ConnectionManager::removeDroppedConnections, 
00073                                                                 this));
00074 
00075   // Bring up the TCP listener socket
00076   tcpserver_transport_ = boost::make_shared<TransportTCP>(&poll_manager_->getPollSet());
00077   if (!tcpserver_transport_->listen(network::getTCPROSPort(), 
00078                                     MAX_TCPROS_CONN_QUEUE, 
00079                                     boost::bind(&ConnectionManager::tcprosAcceptConnection, this, _1)))
00080   {
00081     ROS_FATAL("Listen on port [%d] failed", network::getTCPROSPort());
00082     ROS_BREAK();
00083   }
00084 
00085   // Bring up the UDP listener socket
00086   udpserver_transport_ = boost::make_shared<TransportUDP>(&poll_manager_->getPollSet());
00087   if (!udpserver_transport_->createIncoming(0, true))
00088   {
00089     ROS_FATAL("Listen failed");
00090     ROS_BREAK();
00091   }
00092 }
00093 
00094 void ConnectionManager::shutdown()
00095 {
00096   if (udpserver_transport_)
00097   {
00098     udpserver_transport_->close();
00099     udpserver_transport_.reset();
00100   }
00101 
00102   if (tcpserver_transport_)
00103   {
00104     tcpserver_transport_->close();
00105     tcpserver_transport_.reset();
00106   }
00107 
00108   poll_manager_->removePollThreadListener(poll_conn_);
00109 
00110   clear(Connection::Destructing);
00111 }
00112 
00113 void ConnectionManager::clear(Connection::DropReason reason)
00114 {
00115   S_Connection local_connections;
00116   {
00117     boost::mutex::scoped_lock conn_lock(connections_mutex_);
00118     local_connections.swap(connections_);
00119   }
00120 
00121   for(S_Connection::iterator itr = local_connections.begin();
00122       itr != local_connections.end();
00123       itr++)
00124   {
00125     const ConnectionPtr& conn = *itr;
00126     conn->drop(reason);
00127   }
00128 
00129   boost::mutex::scoped_lock dropped_lock(dropped_connections_mutex_);
00130   dropped_connections_.clear();
00131 }
00132 
00133 uint32_t ConnectionManager::getTCPPort()
00134 {
00135   return tcpserver_transport_->getServerPort();
00136 }
00137 
00138 uint32_t ConnectionManager::getUDPPort()
00139 {
00140   return udpserver_transport_->getServerPort();
00141 }
00142 
00143 uint32_t ConnectionManager::getNewConnectionID()
00144 {
00145   boost::mutex::scoped_lock lock(connection_id_counter_mutex_);
00146   uint32_t ret = connection_id_counter_++;
00147   return ret;
00148 }
00149 
00150 void ConnectionManager::addConnection(const ConnectionPtr& conn)
00151 {
00152   boost::mutex::scoped_lock lock(connections_mutex_);
00153 
00154   connections_.insert(conn);
00155   conn->addDropListener(boost::bind(&ConnectionManager::onConnectionDropped, this, _1));
00156 }
00157 
00158 void ConnectionManager::onConnectionDropped(const ConnectionPtr& conn)
00159 {
00160   boost::mutex::scoped_lock lock(dropped_connections_mutex_);
00161   dropped_connections_.push_back(conn);
00162 }
00163 
00164 void ConnectionManager::removeDroppedConnections()
00165 {
00166   V_Connection local_dropped;
00167   {
00168     boost::mutex::scoped_lock dropped_lock(dropped_connections_mutex_);
00169     dropped_connections_.swap(local_dropped);
00170   }
00171 
00172   boost::mutex::scoped_lock conn_lock(connections_mutex_);
00173 
00174   V_Connection::iterator conn_it = local_dropped.begin();
00175   V_Connection::iterator conn_end = local_dropped.end();
00176   for (;conn_it != conn_end; ++conn_it)
00177   {
00178     const ConnectionPtr& conn = *conn_it;
00179     connections_.erase(conn);
00180   }
00181 }
00182 
00183 void ConnectionManager::udprosIncomingConnection(const TransportUDPPtr& transport, Header& header)
00184 {
00185   std::string client_uri = ""; // TODO: transport->getClientURI();
00186   ROSCPP_LOG_DEBUG("UDPROS received a connection from [%s]", client_uri.c_str());
00187 
00188   ConnectionPtr conn(boost::make_shared<Connection>());
00189   addConnection(conn);
00190 
00191   conn->initialize(transport, true, NULL);
00192   onConnectionHeaderReceived(conn, header);
00193 }
00194 
00195 void ConnectionManager::tcprosAcceptConnection(const TransportTCPPtr& transport)
00196 {
00197   std::string client_uri = transport->getClientURI();
00198   ROSCPP_LOG_DEBUG("TCPROS received a connection from [%s]", client_uri.c_str());
00199 
00200   ConnectionPtr conn(boost::make_shared<Connection>());
00201   addConnection(conn);
00202 
00203   conn->initialize(transport, true, boost::bind(&ConnectionManager::onConnectionHeaderReceived, this, _1, _2));
00204 }
00205 
00206 bool ConnectionManager::onConnectionHeaderReceived(const ConnectionPtr& conn, const Header& header)
00207 {
00208   bool ret = false;
00209   std::string val;
00210   if (header.getValue("topic", val))
00211   {
00212     ROSCPP_LOG_DEBUG("Connection: Creating TransportSubscriberLink for topic [%s] connected to [%s]", 
00213                      val.c_str(), conn->getRemoteString().c_str());
00214 
00215     TransportSubscriberLinkPtr sub_link(boost::make_shared<TransportSubscriberLink>());
00216     sub_link->initialize(conn);
00217     ret = sub_link->handleHeader(header);
00218   }
00219   else if (header.getValue("service", val))
00220   {
00221     ROSCPP_LOG_DEBUG("Connection: Creating ServiceClientLink for service [%s] connected to [%s]", 
00222                      val.c_str(), conn->getRemoteString().c_str());
00223 
00224     ServiceClientLinkPtr link(boost::make_shared<ServiceClientLink>());
00225     link->initialize(conn);
00226     ret = link->handleHeader(header);
00227   }
00228   else
00229   {
00230         ROSCPP_LOG_DEBUG("Got a connection for a type other than 'topic' or 'service' from [%s].  Fail.", 
00231                          conn->getRemoteString().c_str());
00232     return false;
00233   }
00234 
00235   return ret;
00236 }
00237 
00238 }


roscpp
Author(s): Morgan Quigley, Josh Faust, Brian Gerkey, Troy Straszheim
autogenerated on Tue Mar 7 2017 04:01:03