$search
00001 /* 00002 * Copyright (C) 2009, Willow Garage, Inc. 00003 * 00004 * Redistribution and use in source and binary forms, with or without 00005 * modification, are permitted provided that the following conditions are met: 00006 * * Redistributions of source code must retain the above copyright notice, 00007 * this list of conditions and the following disclaimer. 00008 * * Redistributions in binary form must reproduce the above copyright 00009 * notice, this list of conditions and the following disclaimer in the 00010 * documentation and/or other materials provided with the distribution. 00011 * * Neither the names of Willow Garage, Inc. nor the names of its 00012 * contributors may be used to endorse or promote products derived from 00013 * this software without specific prior written permission. 00014 * 00015 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" 00016 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 00017 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 00018 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE 00019 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 00020 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 00021 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 00022 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 00023 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) 00024 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 00025 * POSSIBILITY OF SUCH DAMAGE. 00026 */ 00027 00028 #include <cstdio> 00029 #include "ros/service_manager.h" 00030 #include "ros/xmlrpc_manager.h" 00031 #include "ros/connection_manager.h" 00032 #include "ros/poll_manager.h" 00033 #include "ros/service_publication.h" 00034 #include "ros/service_client_link.h" 00035 #include "ros/service_server_link.h" 00036 #include "ros/this_node.h" 00037 #include "ros/network.h" 00038 #include "ros/master.h" 00039 #include "ros/transport/transport_tcp.h" 00040 #include "ros/transport/transport_udp.h" 00041 #include "ros/init.h" 00042 #include "ros/connection.h" 00043 #include "ros/file_log.h" 00044 00045 #include "XmlRpc.h" 00046 00047 #include <ros/console.h> 00048 00049 using namespace XmlRpc; // A battle to be fought later 00050 using namespace std; // sigh 00051 00052 namespace ros 00053 { 00054 00055 ServiceManagerPtr g_service_manager; 00056 boost::mutex g_service_manager_mutex; 00057 const ServiceManagerPtr& ServiceManager::instance() 00058 { 00059 if (!g_service_manager) 00060 { 00061 boost::mutex::scoped_lock lock(g_service_manager_mutex); 00062 if (!g_service_manager) 00063 { 00064 g_service_manager.reset(new ServiceManager); 00065 } 00066 } 00067 00068 return g_service_manager; 00069 } 00070 00071 ServiceManager::ServiceManager() 00072 : shutting_down_(false) 00073 { 00074 } 00075 00076 ServiceManager::~ServiceManager() 00077 { 00078 shutdown(); 00079 } 00080 00081 void ServiceManager::start() 00082 { 00083 shutting_down_ = false; 00084 00085 poll_manager_ = PollManager::instance(); 00086 connection_manager_ = ConnectionManager::instance(); 00087 xmlrpc_manager_ = XMLRPCManager::instance(); 00088 00089 } 00090 00091 void ServiceManager::shutdown() 00092 { 00093 boost::recursive_mutex::scoped_lock shutdown_lock(shutting_down_mutex_); 00094 if (shutting_down_) 00095 { 00096 return; 00097 } 00098 00099 shutting_down_ = true; 00100 00101 ROSCPP_LOG_DEBUG("ServiceManager::shutdown(): unregistering our advertised services"); 00102 { 00103 boost::mutex::scoped_lock ss_lock(service_publications_mutex_); 00104 00105 for (L_ServicePublication::iterator i = service_publications_.begin(); 00106 i != service_publications_.end(); ++i) 00107 { 00108 unregisterService((*i)->getName()); 00109 //ROSCPP_LOG_DEBUG( "shutting down service %s", (*i)->getName().c_str()); 00110 (*i)->drop(); 00111 } 00112 service_publications_.clear(); 00113 } 00114 00115 L_ServiceServerLink local_service_clients; 00116 { 00117 boost::mutex::scoped_lock lock(service_server_links_mutex_); 00118 local_service_clients.swap(service_server_links_); 00119 } 00120 00121 { 00122 L_ServiceServerLink::iterator it = local_service_clients.begin(); 00123 L_ServiceServerLink::iterator end = local_service_clients.end(); 00124 for (; it != end; ++it) 00125 { 00126 (*it)->getConnection()->drop(Connection::Destructing); 00127 } 00128 00129 local_service_clients.clear(); 00130 } 00131 00132 } 00133 00134 bool ServiceManager::advertiseService(const AdvertiseServiceOptions& ops) 00135 { 00136 boost::recursive_mutex::scoped_lock shutdown_lock(shutting_down_mutex_); 00137 if (shutting_down_) 00138 { 00139 return false; 00140 } 00141 00142 { 00143 boost::mutex::scoped_lock lock(service_publications_mutex_); 00144 00145 if (isServiceAdvertised(ops.service)) 00146 { 00147 ROS_ERROR("Tried to advertise a service that is already advertised in this node [%s]", ops.service.c_str()); 00148 return false; 00149 } 00150 00151 ServicePublicationPtr pub(new ServicePublication(ops.service, ops.md5sum, ops.datatype, ops.req_datatype, ops.res_datatype, ops.helper, ops.callback_queue, ops.tracked_object)); 00152 service_publications_.push_back(pub); 00153 } 00154 00155 XmlRpcValue args, result, payload; 00156 args[0] = this_node::getName(); 00157 args[1] = ops.service; 00158 char uri_buf[1024]; 00159 snprintf(uri_buf, sizeof(uri_buf), "rosrpc://%s:%d", 00160 network::getHost().c_str(), connection_manager_->getTCPPort()); 00161 args[2] = string(uri_buf); 00162 args[3] = xmlrpc_manager_->getServerURI(); 00163 master::execute("registerService", args, result, payload, true); 00164 00165 return true; 00166 } 00167 00168 bool ServiceManager::unadvertiseService(const string &serv_name) 00169 { 00170 boost::recursive_mutex::scoped_lock shutdown_lock(shutting_down_mutex_); 00171 if (shutting_down_) 00172 { 00173 return false; 00174 } 00175 00176 ServicePublicationPtr pub; 00177 { 00178 boost::mutex::scoped_lock lock(service_publications_mutex_); 00179 00180 for (L_ServicePublication::iterator i = service_publications_.begin(); 00181 i != service_publications_.end(); ++i) 00182 { 00183 if((*i)->getName() == serv_name && !(*i)->isDropped()) 00184 { 00185 pub = *i; 00186 service_publications_.erase(i); 00187 break; 00188 } 00189 } 00190 } 00191 00192 if (pub) 00193 { 00194 unregisterService(pub->getName()); 00195 ROSCPP_LOG_DEBUG( "shutting down service [%s]", pub->getName().c_str()); 00196 pub->drop(); 00197 return true; 00198 } 00199 00200 return false; 00201 } 00202 00203 bool ServiceManager::unregisterService(const std::string& service) 00204 { 00205 XmlRpcValue args, result, payload; 00206 args[0] = this_node::getName(); 00207 args[1] = service; 00208 char uri_buf[1024]; 00209 snprintf(uri_buf, sizeof(uri_buf), "rosrpc://%s:%d", 00210 network::getHost().c_str(), connection_manager_->getTCPPort()); 00211 args[2] = string(uri_buf); 00212 00213 master::execute("unregisterService", args, result, payload, false); 00214 00215 return true; 00216 } 00217 00218 bool ServiceManager::isServiceAdvertised(const string& serv_name) 00219 { 00220 for (L_ServicePublication::iterator s = service_publications_.begin(); s != service_publications_.end(); ++s) 00221 { 00222 if (((*s)->getName() == serv_name) && !(*s)->isDropped()) 00223 { 00224 return true; 00225 } 00226 } 00227 00228 return false; 00229 } 00230 00231 ServicePublicationPtr ServiceManager::lookupServicePublication(const std::string& service) 00232 { 00233 boost::mutex::scoped_lock lock(service_publications_mutex_); 00234 00235 for (L_ServicePublication::iterator t = service_publications_.begin(); 00236 t != service_publications_.end(); ++t) 00237 { 00238 if ((*t)->getName() == service) 00239 { 00240 return *t; 00241 } 00242 } 00243 00244 return ServicePublicationPtr(); 00245 } 00246 00247 ServiceServerLinkPtr ServiceManager::createServiceServerLink(const std::string& service, bool persistent, 00248 const std::string& request_md5sum, const std::string& response_md5sum, 00249 const M_string& header_values) 00250 { 00251 00252 boost::recursive_mutex::scoped_lock shutdown_lock(shutting_down_mutex_); 00253 if (shutting_down_) 00254 { 00255 return ServiceServerLinkPtr(); 00256 } 00257 00258 uint32_t serv_port; 00259 std::string serv_host; 00260 if (!lookupService(service, serv_host, serv_port)) 00261 { 00262 return ServiceServerLinkPtr(); 00263 } 00264 00265 TransportTCPPtr transport(new TransportTCP(&poll_manager_->getPollSet())); 00266 if (transport->connect(serv_host, serv_port)) 00267 { 00268 ConnectionPtr connection(new Connection()); 00269 connection_manager_->addConnection(connection); 00270 00271 ServiceServerLinkPtr client(new ServiceServerLink(service, persistent, request_md5sum, response_md5sum, header_values)); 00272 00273 { 00274 boost::mutex::scoped_lock lock(service_server_links_mutex_); 00275 service_server_links_.push_back(client); 00276 } 00277 00278 connection->initialize(transport, false, HeaderReceivedFunc()); 00279 client->initialize(connection); 00280 00281 return client; 00282 } 00283 00284 ROSCPP_LOG_DEBUG("Failed to connect to service [%s] (mapped=[%s]) at [%s:%d]", service.c_str(), service.c_str(), serv_host.c_str(), serv_port); 00285 00286 return ServiceServerLinkPtr(); 00287 } 00288 00289 void ServiceManager::removeServiceServerLink(const ServiceServerLinkPtr& client) 00290 { 00291 // Guard against this getting called as a result of shutdown() dropping all connections (where shutting_down_mutex_ is already locked) 00292 if (shutting_down_) 00293 { 00294 return; 00295 } 00296 00297 boost::recursive_mutex::scoped_lock shutdown_lock(shutting_down_mutex_); 00298 // Now check again, since the state may have changed between pre-lock/now 00299 if (shutting_down_) 00300 { 00301 return; 00302 } 00303 00304 boost::mutex::scoped_lock lock(service_server_links_mutex_); 00305 00306 L_ServiceServerLink::iterator it = std::find(service_server_links_.begin(), service_server_links_.end(), client); 00307 if (it != service_server_links_.end()) 00308 { 00309 service_server_links_.erase(it); 00310 } 00311 } 00312 00313 bool ServiceManager::lookupService(const string &name, string &serv_host, uint32_t &serv_port) 00314 { 00315 XmlRpcValue args, result, payload; 00316 args[0] = this_node::getName(); 00317 args[1] = name; 00318 if (!master::execute("lookupService", args, result, payload, false)) 00319 return false; 00320 00321 string serv_uri(payload); 00322 if (!serv_uri.length()) // shouldn't happen. but let's be sure. 00323 { 00324 ROS_ERROR("lookupService: Empty server URI returned from master"); 00325 00326 return false; 00327 } 00328 00329 if (!network::splitURI(serv_uri, serv_host, serv_port)) 00330 { 00331 ROS_ERROR("lookupService: Bad service uri [%s]", serv_uri.c_str()); 00332 00333 return false; 00334 } 00335 00336 return true; 00337 } 00338 00339 } // namespace ros 00340