advertise_service.py
Go to the documentation of this file.
1 import fnmatch
2 from threading import Lock
3 import time
4 
5 from rosbridge_library.internal.ros_loader import get_service_class
6 from rosbridge_library.internal import message_conversion
7 from rosbridge_library.capability import Capability
8 from rosbridge_library.util import string_types
9 
10 import rospy
11 
13 
14  id_counter = 1
15  responses = {}
16 
17  def __init__(self, service_name, service_type, protocol):
18  self.active_requests = 0
19  self.shutdown_requested = False
20  self.lock = Lock()
21  self.service_name = service_name
22  self.service_type = service_type
23  self.protocol = protocol
24  # setup the service
25  self.service_handle = rospy.Service(service_name, get_service_class(service_type), self.handle_request)
26 
27  def next_id(self):
28  id = self.id_counter
29  self.id_counter += 1
30  return id
31 
32  def handle_request(self, req):
33  with self.lock:
34  self.active_requests += 1
35  # generate a unique ID
36  request_id = "service_request:" + self.service_name + ":" + str(self.next_id())
37 
38  # build a request to send to the external client
39  request_message = {
40  "op": "call_service",
41  "id": request_id,
42  "service": self.service_name,
43  "args": message_conversion.extract_values(req)
44  }
45  self.protocol.send(request_message)
46 
47  # wait for a response
48  while request_id not in self.responses.keys():
49  with self.lock:
50  if self.shutdown_requested:
51  break
52  time.sleep(0)
53 
54  with self.lock:
55  self.active_requests -= 1
56 
57  if self.shutdown_requested:
58  self.protocol.log(
59  "warning",
60  "Service %s was unadvertised with a service call in progress, "
61  "aborting service call with request ID %s" % (self.service_name, request_id))
62  return None
63 
64  resp = self.responses[request_id]
65  del self.responses[request_id]
66  return resp
67 
68  def graceful_shutdown(self, timeout):
69  """
70  Signal the AdvertisedServiceHandler to shutdown
71 
72  Using this, rather than just rospy.Service.shutdown(), allows us
73  time to stop any active service requests, ending their busy wait
74  loops.
75  """
76  with self.lock:
77  self.shutdown_requested = True
78  start_time = time.time()
79  while time.time() - start_time < timeout:
80  time.sleep(0)
81 
83  services_glob = None
84 
85  advertise_service_msg_fields = [(True, "service", string_types), (True, "type", string_types)]
86 
87  def __init__(self, protocol):
88  # Call superclass constructor
89  Capability.__init__(self, protocol)
90 
91  # Register the operations that this capability provides
92  protocol.register_operation("advertise_service", self.advertise_service)
93 
94  def advertise_service(self, message):
95  # Typecheck the args
97 
98  # parse the incoming message
99  service_name = message["service"]
100 
101  if AdvertiseService.services_glob is not None and AdvertiseService.services_glob:
102  self.protocol.log("debug", "Service security glob enabled, checking service: " + service_name)
103  match = False
104  for glob in AdvertiseService.services_glob:
105  if (fnmatch.fnmatch(service_name, glob)):
106  self.protocol.log("debug", "Found match with glob " + glob + ", continuing service advertisement...")
107  match = True
108  break
109  if not match:
110  self.protocol.log("warn", "No match found for service, cancelling service advertisement for: " + service_name)
111  return
112  else:
113  self.protocol.log("debug", "No service security glob, not checking service advertisement.")
114 
115  # check for an existing entry
116  if service_name in self.protocol.external_service_list.keys():
117  self.protocol.log("warn", "Duplicate service advertised. Overwriting %s." % service_name)
118  self.protocol.external_service_list[service_name].service_handle.shutdown("Duplicate advertiser.")
119  del self.protocol.external_service_list[service_name]
120 
121  # setup and store the service information
122  service_type = message["type"]
123  service_handler = AdvertisedServiceHandler(service_name, service_type, self.protocol)
124  self.protocol.external_service_list[service_name] = service_handler
125  self.protocol.log("info", "Advertised service %s." % service_name)
def basic_type_check(self, msg, types_info)
Definition: capability.py:76


rosbridge_library
Author(s): Jonathan Mace
autogenerated on Fri Oct 21 2022 02:45:18