unadvertise_service.py
Go to the documentation of this file.
1 import fnmatch
2 from rosbridge_library.capability import Capability
3 
4 
6 
7  # unadvertise_service_msg_fields = [(True, "service", (str, unicode))]
8 
9  services_glob = None
10 
11  def __init__(self, protocol):
12  # Call superclass constructor
13  Capability.__init__(self, protocol)
14 
15  # Register the operations that this capability provides
16  protocol.register_operation("unadvertise_service", self.unadvertise_service)
17 
18  def unadvertise_service(self, message):
19  # parse the message
20  service_name = message["service"]
21 
22  if UnadvertiseService.services_glob is not None and UnadvertiseService.services_glob:
23  self.protocol.log("debug", "Service security glob enabled, checking service: " + service_name)
24  match = False
25  for glob in UnadvertiseService.services_glob:
26  if (fnmatch.fnmatch(service_name, glob)):
27  self.protocol.log("debug", "Found match with glob " + glob + ", continuing service unadvertisement...")
28  match = True
29  break
30  if not match:
31  self.protocol.log("warn", "No match found for service, cancelling service unadvertisement for: " + service_name)
32  return
33  else:
34  self.protocol.log("debug", "No service security glob, not checking service unadvertisement...")
35 
36  # unregister service in ROS
37  if service_name in self.protocol.external_service_list.keys():
38  self.protocol.external_service_list[service_name].graceful_shutdown(timeout=1.0)
39  self.protocol.external_service_list[service_name].service_handle.shutdown("Unadvertise request.")
40  del self.protocol.external_service_list[service_name]
41  self.protocol.log("info", "Unadvertised service %s." % service_name)
42  else:
43  self.protocol.log("error", "Service %s has not been advertised via rosbridge, can't unadvertise." % service_name)


rosbridge_library
Author(s): Jonathan Mace
autogenerated on Fri Oct 21 2022 02:45:18