service_manager.cpp
Go to the documentation of this file.
00001 /*
00002  * Copyright (C) 2009, Willow Garage, Inc.
00003  *
00004  * Redistribution and use in source and binary forms, with or without
00005  * modification, are permitted provided that the following conditions are met:
00006  *   * Redistributions of source code must retain the above copyright notice,
00007  *     this list of conditions and the following disclaimer.
00008  *   * Redistributions in binary form must reproduce the above copyright
00009  *     notice, this list of conditions and the following disclaimer in the
00010  *     documentation and/or other materials provided with the distribution.
00011  *   * Neither the names of Willow Garage, Inc. nor the names of its
00012  *     contributors may be used to endorse or promote products derived from
00013  *     this software without specific prior written permission.
00014  *
00015  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
00016  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
00017  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
00018  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
00019  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
00020  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
00021  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
00022  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
00023  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
00024  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
00025  * POSSIBILITY OF SUCH DAMAGE.
00026  */
00027 
00028 #include <cstdio>
00029 #include "ros/service_manager.h"
00030 #include "ros/xmlrpc_manager.h"
00031 #include "ros/connection_manager.h"
00032 #include "ros/poll_manager.h"
00033 #include "ros/service_publication.h"
00034 #include "ros/service_client_link.h"
00035 #include "ros/service_server_link.h"
00036 #include "ros/this_node.h"
00037 #include "ros/network.h"
00038 #include "ros/master.h"
00039 #include "ros/transport/transport_tcp.h"
00040 #include "ros/transport/transport_udp.h"
00041 #include "ros/init.h"
00042 #include "ros/connection.h"
00043 #include "ros/file_log.h"
00044 
00045 #include "XmlRpc.h"
00046 
00047 #include <ros/console.h>
00048 
00049 using namespace XmlRpc; // A battle to be fought later
00050 using namespace std; // sigh
00051 
00052 namespace ros
00053 {
00054 
00055 ServiceManagerPtr g_service_manager;
00056 boost::mutex g_service_manager_mutex;
00057 const ServiceManagerPtr& ServiceManager::instance()
00058 {
00059   if (!g_service_manager)
00060   {
00061     boost::mutex::scoped_lock lock(g_service_manager_mutex);
00062     if (!g_service_manager)
00063     {
00064       g_service_manager.reset(new ServiceManager);
00065     }
00066   }
00067 
00068   return g_service_manager;
00069 }
00070 
00071 ServiceManager::ServiceManager()
00072 : shutting_down_(false)
00073 {
00074 }
00075 
00076 ServiceManager::~ServiceManager()
00077 {
00078   shutdown();
00079 }
00080 
00081 void ServiceManager::start()
00082 {
00083   shutting_down_ = false;
00084 
00085   poll_manager_ = PollManager::instance();
00086   connection_manager_ = ConnectionManager::instance();
00087   xmlrpc_manager_ = XMLRPCManager::instance();
00088 
00089 }
00090 
00091 void ServiceManager::shutdown()
00092 {
00093   boost::recursive_mutex::scoped_lock shutdown_lock(shutting_down_mutex_);
00094   if (shutting_down_)
00095   {
00096     return;
00097   }
00098 
00099   shutting_down_ = true;
00100 
00101   ROSCPP_LOG_DEBUG("ServiceManager::shutdown(): unregistering our advertised services");
00102   {
00103     boost::mutex::scoped_lock ss_lock(service_publications_mutex_);
00104 
00105     for (L_ServicePublication::iterator i = service_publications_.begin();
00106          i != service_publications_.end(); ++i)
00107     {
00108       unregisterService((*i)->getName());
00109       //ROSCPP_LOG_DEBUG( "shutting down service %s", (*i)->getName().c_str());
00110       (*i)->drop();
00111     }
00112     service_publications_.clear();
00113   }
00114 
00115   L_ServiceServerLink local_service_clients;
00116   {
00117     boost::mutex::scoped_lock lock(service_server_links_mutex_);
00118     local_service_clients.swap(service_server_links_);
00119   }
00120 
00121   {
00122     L_ServiceServerLink::iterator it = local_service_clients.begin();
00123     L_ServiceServerLink::iterator end = local_service_clients.end();
00124     for (; it != end; ++it)
00125     {
00126       (*it)->getConnection()->drop(Connection::Destructing);
00127     }
00128 
00129     local_service_clients.clear();
00130   }
00131 
00132 }
00133 
00134 bool ServiceManager::advertiseService(const AdvertiseServiceOptions& ops)
00135 {
00136   boost::recursive_mutex::scoped_lock shutdown_lock(shutting_down_mutex_);
00137   if (shutting_down_)
00138   {
00139     return false;
00140   }
00141 
00142   {
00143     boost::mutex::scoped_lock lock(service_publications_mutex_);
00144 
00145     if (isServiceAdvertised(ops.service))
00146     {
00147       ROS_ERROR("Tried to advertise a service that is already advertised in this node [%s]", ops.service.c_str());
00148       return false;
00149     }
00150 
00151     ServicePublicationPtr pub(new ServicePublication(ops.service, ops.md5sum, ops.datatype, ops.req_datatype, ops.res_datatype, ops.helper, ops.callback_queue, ops.tracked_object));
00152     service_publications_.push_back(pub);
00153   }
00154 
00155   XmlRpcValue args, result, payload;
00156   args[0] = this_node::getName();
00157   args[1] = ops.service;
00158   char uri_buf[1024];
00159   snprintf(uri_buf, sizeof(uri_buf), "rosrpc://%s:%d",
00160            network::getHost().c_str(), connection_manager_->getTCPPort());
00161   args[2] = string(uri_buf);
00162   args[3] = xmlrpc_manager_->getServerURI();
00163   master::execute("registerService", args, result, payload, true);
00164 
00165   return true;
00166 }
00167 
00168 bool ServiceManager::unadvertiseService(const string &serv_name)
00169 {
00170   boost::recursive_mutex::scoped_lock shutdown_lock(shutting_down_mutex_);
00171   if (shutting_down_)
00172   {
00173     return false;
00174   }
00175 
00176   ServicePublicationPtr pub;
00177   {
00178     boost::mutex::scoped_lock lock(service_publications_mutex_);
00179 
00180     for (L_ServicePublication::iterator i = service_publications_.begin();
00181          i != service_publications_.end(); ++i)
00182     {
00183       if((*i)->getName() == serv_name && !(*i)->isDropped())
00184       {
00185         pub = *i;
00186         service_publications_.erase(i);
00187         break;
00188       }
00189     }
00190   }
00191 
00192   if (pub)
00193   {
00194     unregisterService(pub->getName());
00195     ROSCPP_LOG_DEBUG( "shutting down service [%s]", pub->getName().c_str());
00196     pub->drop();
00197     return true;
00198   }
00199 
00200   return false;
00201 }
00202 
00203 bool ServiceManager::unregisterService(const std::string& service)
00204 {
00205   XmlRpcValue args, result, payload;
00206   args[0] = this_node::getName();
00207   args[1] = service;
00208   char uri_buf[1024];
00209   snprintf(uri_buf, sizeof(uri_buf), "rosrpc://%s:%d",
00210            network::getHost().c_str(), connection_manager_->getTCPPort());
00211   args[2] = string(uri_buf);
00212 
00213   master::execute("unregisterService", args, result, payload, false);
00214 
00215   return true;
00216 }
00217 
00218 bool ServiceManager::isServiceAdvertised(const string& serv_name)
00219 {
00220   for (L_ServicePublication::iterator s = service_publications_.begin(); s != service_publications_.end(); ++s)
00221   {
00222     if (((*s)->getName() == serv_name) && !(*s)->isDropped())
00223     {
00224       return true;
00225     }
00226   }
00227 
00228   return false;
00229 }
00230 
00231 ServicePublicationPtr ServiceManager::lookupServicePublication(const std::string& service)
00232 {
00233   boost::mutex::scoped_lock lock(service_publications_mutex_);
00234 
00235   for (L_ServicePublication::iterator t = service_publications_.begin();
00236        t != service_publications_.end(); ++t)
00237   {
00238     if ((*t)->getName() == service)
00239     {
00240       return *t;
00241     }
00242   }
00243 
00244   return ServicePublicationPtr();
00245 }
00246 
00247 ServiceServerLinkPtr ServiceManager::createServiceServerLink(const std::string& service, bool persistent,
00248                                              const std::string& request_md5sum, const std::string& response_md5sum,
00249                                              const M_string& header_values)
00250 {
00251 
00252   boost::recursive_mutex::scoped_lock shutdown_lock(shutting_down_mutex_);
00253   if (shutting_down_)
00254   {
00255     return ServiceServerLinkPtr();
00256   }
00257 
00258   uint32_t serv_port;
00259   std::string serv_host;
00260   if (!lookupService(service, serv_host, serv_port))
00261   {
00262     return ServiceServerLinkPtr();
00263   }
00264 
00265   TransportTCPPtr transport(new TransportTCP(&poll_manager_->getPollSet()));
00266   if (transport->connect(serv_host, serv_port))
00267   {
00268     ConnectionPtr connection(new Connection());
00269     connection_manager_->addConnection(connection);
00270 
00271     ServiceServerLinkPtr client(new ServiceServerLink(service, persistent, request_md5sum, response_md5sum, header_values));
00272 
00273     {
00274       boost::mutex::scoped_lock lock(service_server_links_mutex_);
00275       service_server_links_.push_back(client);
00276     }
00277 
00278     connection->initialize(transport, false, HeaderReceivedFunc());
00279     client->initialize(connection);
00280 
00281     return client;
00282   }
00283 
00284   ROSCPP_LOG_DEBUG("Failed to connect to service [%s] (mapped=[%s]) at [%s:%d]", service.c_str(), service.c_str(), serv_host.c_str(), serv_port);
00285 
00286   return ServiceServerLinkPtr();
00287 }
00288 
00289 void ServiceManager::removeServiceServerLink(const ServiceServerLinkPtr& client)
00290 {
00291   // Guard against this getting called as a result of shutdown() dropping all connections (where shutting_down_mutex_ is already locked)
00292   if (shutting_down_)
00293   {
00294     return;
00295   }
00296 
00297   boost::recursive_mutex::scoped_lock shutdown_lock(shutting_down_mutex_);
00298   // Now check again, since the state may have changed between pre-lock/now
00299   if (shutting_down_)
00300   {
00301     return;
00302   }
00303 
00304   boost::mutex::scoped_lock lock(service_server_links_mutex_);
00305 
00306   L_ServiceServerLink::iterator it = std::find(service_server_links_.begin(), service_server_links_.end(), client);
00307   if (it != service_server_links_.end())
00308   {
00309     service_server_links_.erase(it);
00310   }
00311 }
00312 
00313 bool ServiceManager::lookupService(const string &name, string &serv_host, uint32_t &serv_port)
00314 {
00315   XmlRpcValue args, result, payload;
00316   args[0] = this_node::getName();
00317   args[1] = name;
00318   if (!master::execute("lookupService", args, result, payload, false))
00319     return false;
00320 
00321   string serv_uri(payload);
00322   if (!serv_uri.length()) // shouldn't happen. but let's be sure.
00323   {
00324     ROS_ERROR("lookupService: Empty server URI returned from master");
00325 
00326     return false;
00327   }
00328 
00329   if (!network::splitURI(serv_uri, serv_host, serv_port))
00330   {
00331     ROS_ERROR("lookupService: Bad service uri [%s]", serv_uri.c_str());
00332 
00333     return false;
00334   }
00335 
00336   return true;
00337 }
00338 
00339 } // namespace ros
00340 


roscpp
Author(s): Morgan Quigley, Josh Faust, Brian Gerkey, Troy Straszheim
autogenerated on Mon Oct 6 2014 11:46:44