00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028 #include <cstdio>
00029 #include "ros/service_manager.h"
00030 #include "ros/xmlrpc_manager.h"
00031 #include "ros/connection_manager.h"
00032 #include "ros/poll_manager.h"
00033 #include "ros/service_publication.h"
00034 #include "ros/service_client_link.h"
00035 #include "ros/service_server_link.h"
00036 #include "ros/this_node.h"
00037 #include "ros/network.h"
00038 #include "ros/master.h"
00039 #include "ros/transport/transport_tcp.h"
00040 #include "ros/transport/transport_udp.h"
00041 #include "ros/init.h"
00042 #include "ros/connection.h"
00043 #include "ros/file_log.h"
00044
00045 #include "XmlRpc.h"
00046
00047 #include <ros/console.h>
00048
00049 using namespace XmlRpc;
00050 using namespace std;
00051
00052 namespace ros
00053 {
00054
00055 ServiceManagerPtr g_service_manager;
00056 boost::mutex g_service_manager_mutex;
00057 const ServiceManagerPtr& ServiceManager::instance()
00058 {
00059 if (!g_service_manager)
00060 {
00061 boost::mutex::scoped_lock lock(g_service_manager_mutex);
00062 if (!g_service_manager)
00063 {
00064 g_service_manager = boost::make_shared<ServiceManager>();
00065 }
00066 }
00067
00068 return g_service_manager;
00069 }
00070
00071 ServiceManager::ServiceManager()
00072 : shutting_down_(false)
00073 {
00074 }
00075
00076 ServiceManager::~ServiceManager()
00077 {
00078 shutdown();
00079 }
00080
00081 void ServiceManager::start()
00082 {
00083 shutting_down_ = false;
00084
00085 poll_manager_ = PollManager::instance();
00086 connection_manager_ = ConnectionManager::instance();
00087 xmlrpc_manager_ = XMLRPCManager::instance();
00088
00089 }
00090
00091 void ServiceManager::shutdown()
00092 {
00093 boost::recursive_mutex::scoped_lock shutdown_lock(shutting_down_mutex_);
00094 if (shutting_down_)
00095 {
00096 return;
00097 }
00098
00099 shutting_down_ = true;
00100
00101 ROSCPP_LOG_DEBUG("ServiceManager::shutdown(): unregistering our advertised services");
00102 {
00103 boost::mutex::scoped_lock ss_lock(service_publications_mutex_);
00104
00105 for (L_ServicePublication::iterator i = service_publications_.begin();
00106 i != service_publications_.end(); ++i)
00107 {
00108 unregisterService((*i)->getName());
00109
00110 (*i)->drop();
00111 }
00112 service_publications_.clear();
00113 }
00114
00115 L_ServiceServerLink local_service_clients;
00116 {
00117 boost::mutex::scoped_lock lock(service_server_links_mutex_);
00118 local_service_clients.swap(service_server_links_);
00119 }
00120
00121 {
00122 L_ServiceServerLink::iterator it = local_service_clients.begin();
00123 L_ServiceServerLink::iterator end = local_service_clients.end();
00124 for (; it != end; ++it)
00125 {
00126 (*it)->getConnection()->drop(Connection::Destructing);
00127 }
00128
00129 local_service_clients.clear();
00130 }
00131
00132 }
00133
00134 bool ServiceManager::advertiseService(const AdvertiseServiceOptions& ops)
00135 {
00136 boost::recursive_mutex::scoped_lock shutdown_lock(shutting_down_mutex_);
00137 if (shutting_down_)
00138 {
00139 return false;
00140 }
00141
00142 {
00143 boost::mutex::scoped_lock lock(service_publications_mutex_);
00144
00145 if (isServiceAdvertised(ops.service))
00146 {
00147 ROS_ERROR("Tried to advertise a service that is already advertised in this node [%s]", ops.service.c_str());
00148 return false;
00149 }
00150
00151 ServicePublicationPtr pub(boost::make_shared<ServicePublication>(ops.service, ops.md5sum, ops.datatype, ops.req_datatype, ops.res_datatype, ops.helper, ops.callback_queue, ops.tracked_object));
00152 service_publications_.push_back(pub);
00153 }
00154
00155 XmlRpcValue args, result, payload;
00156 args[0] = this_node::getName();
00157 args[1] = ops.service;
00158 char uri_buf[1024];
00159 snprintf(uri_buf, sizeof(uri_buf), "rosrpc://%s:%d",
00160 network::getHost().c_str(), connection_manager_->getTCPPort());
00161 args[2] = string(uri_buf);
00162 args[3] = xmlrpc_manager_->getServerURI();
00163 master::execute("registerService", args, result, payload, true);
00164
00165 return true;
00166 }
00167
00168 bool ServiceManager::unadvertiseService(const string &serv_name)
00169 {
00170 boost::recursive_mutex::scoped_lock shutdown_lock(shutting_down_mutex_);
00171 if (shutting_down_)
00172 {
00173 return false;
00174 }
00175
00176 ServicePublicationPtr pub;
00177 {
00178 boost::mutex::scoped_lock lock(service_publications_mutex_);
00179
00180 for (L_ServicePublication::iterator i = service_publications_.begin();
00181 i != service_publications_.end(); ++i)
00182 {
00183 if((*i)->getName() == serv_name && !(*i)->isDropped())
00184 {
00185 pub = *i;
00186 service_publications_.erase(i);
00187 break;
00188 }
00189 }
00190 }
00191
00192 if (pub)
00193 {
00194 unregisterService(pub->getName());
00195 ROSCPP_LOG_DEBUG( "shutting down service [%s]", pub->getName().c_str());
00196 pub->drop();
00197 return true;
00198 }
00199
00200 return false;
00201 }
00202
00203 bool ServiceManager::unregisterService(const std::string& service)
00204 {
00205 XmlRpcValue args, result, payload;
00206 args[0] = this_node::getName();
00207 args[1] = service;
00208 char uri_buf[1024];
00209 snprintf(uri_buf, sizeof(uri_buf), "rosrpc://%s:%d",
00210 network::getHost().c_str(), connection_manager_->getTCPPort());
00211 args[2] = string(uri_buf);
00212
00213 master::execute("unregisterService", args, result, payload, false);
00214
00215 return true;
00216 }
00217
00218 bool ServiceManager::isServiceAdvertised(const string& serv_name)
00219 {
00220 for (L_ServicePublication::iterator s = service_publications_.begin(); s != service_publications_.end(); ++s)
00221 {
00222 if (((*s)->getName() == serv_name) && !(*s)->isDropped())
00223 {
00224 return true;
00225 }
00226 }
00227
00228 return false;
00229 }
00230
00231 ServicePublicationPtr ServiceManager::lookupServicePublication(const std::string& service)
00232 {
00233 boost::mutex::scoped_lock lock(service_publications_mutex_);
00234
00235 for (L_ServicePublication::iterator t = service_publications_.begin();
00236 t != service_publications_.end(); ++t)
00237 {
00238 if ((*t)->getName() == service)
00239 {
00240 return *t;
00241 }
00242 }
00243
00244 return ServicePublicationPtr();
00245 }
00246
00247 ServiceServerLinkPtr ServiceManager::createServiceServerLink(const std::string& service, bool persistent,
00248 const std::string& request_md5sum, const std::string& response_md5sum,
00249 const M_string& header_values)
00250 {
00251
00252 boost::recursive_mutex::scoped_lock shutdown_lock(shutting_down_mutex_);
00253 if (shutting_down_)
00254 {
00255 return ServiceServerLinkPtr();
00256 }
00257
00258 uint32_t serv_port;
00259 std::string serv_host;
00260 if (!lookupService(service, serv_host, serv_port))
00261 {
00262 return ServiceServerLinkPtr();
00263 }
00264
00265 TransportTCPPtr transport(boost::make_shared<TransportTCP>(&poll_manager_->getPollSet()));
00266
00267
00268
00269 ConnectionPtr connection(boost::make_shared<Connection>());
00270 connection_manager_->addConnection(connection);
00271 connection->initialize(transport, false, HeaderReceivedFunc());
00272
00273 if (transport->connect(serv_host, serv_port))
00274 {
00275 ServiceServerLinkPtr client(boost::make_shared<ServiceServerLink>(service, persistent, request_md5sum, response_md5sum, header_values));
00276
00277 {
00278 boost::mutex::scoped_lock lock(service_server_links_mutex_);
00279 service_server_links_.push_back(client);
00280 }
00281
00282 client->initialize(connection);
00283
00284 return client;
00285 }
00286
00287 ROSCPP_LOG_DEBUG("Failed to connect to service [%s] (mapped=[%s]) at [%s:%d]", service.c_str(), service.c_str(), serv_host.c_str(), serv_port);
00288
00289 return ServiceServerLinkPtr();
00290 }
00291
00292 void ServiceManager::removeServiceServerLink(const ServiceServerLinkPtr& client)
00293 {
00294
00295 if (shutting_down_)
00296 {
00297 return;
00298 }
00299
00300 boost::recursive_mutex::scoped_lock shutdown_lock(shutting_down_mutex_);
00301
00302 if (shutting_down_)
00303 {
00304 return;
00305 }
00306
00307 boost::mutex::scoped_lock lock(service_server_links_mutex_);
00308
00309 L_ServiceServerLink::iterator it = std::find(service_server_links_.begin(), service_server_links_.end(), client);
00310 if (it != service_server_links_.end())
00311 {
00312 service_server_links_.erase(it);
00313 }
00314 }
00315
00316 bool ServiceManager::lookupService(const string &name, string &serv_host, uint32_t &serv_port)
00317 {
00318 XmlRpcValue args, result, payload;
00319 args[0] = this_node::getName();
00320 args[1] = name;
00321 if (!master::execute("lookupService", args, result, payload, false))
00322 return false;
00323
00324 string serv_uri(payload);
00325 if (!serv_uri.length())
00326 {
00327 ROS_ERROR("lookupService: Empty server URI returned from master");
00328
00329 return false;
00330 }
00331
00332 if (!network::splitURI(serv_uri, serv_host, serv_port))
00333 {
00334 ROS_ERROR("lookupService: Bad service uri [%s]", serv_uri.c_str());
00335
00336 return false;
00337 }
00338
00339 return true;
00340 }
00341
00342 }
00343