1   
  2   
  3   
  4   
  5   
  6   
  7   
  8   
  9   
 10   
 11   
 12   
 13   
 14   
 15   
 16   
 17   
 18   
 19   
 20   
 21   
 22   
 23   
 24   
 25   
 26   
 27   
 28   
 29   
 30   
 31   
 32   
 33  """Internal use: Service-specific extensions for TCPROS support""" 
 34   
 35  import io 
 36  import socket 
 37  import struct 
 38  import sys 
 39  import logging 
 40   
 41  import time 
 42  import traceback 
 43   
 44  import genpy 
 45   
 46  import rosgraph 
 47  import rosgraph.names 
 48  import rosgraph.network 
 49   
 50  from rospy.exceptions import TransportInitError, TransportTerminated, ROSException, ROSInterruptException 
 51  from rospy.service import _Service, ServiceException 
 52   
 53  from rospy.impl.registration import get_service_manager 
 54  from rospy.impl.tcpros_base import TCPROSTransport, TCPROSTransportProtocol, \ 
 55      get_tcpros_server_address, start_tcpros_server, recv_buff, \ 
 56      DEFAULT_BUFF_SIZE 
 57   
 58  from rospy.core import logwarn, loginfo, logerr, logdebug 
 59  import rospy.core 
 60  import rospy.msg 
 61  import rospy.names 
 62   
 63  import rospy.impl.validators 
 64   
 65  try: 
 66          import _thread  
 67  except ImportError: 
 68      import thread as _thread 
 69   
 70  if sys.hexversion > 0x03000000:  
 72          return isinstance(s, str)  
  73  else: 
 75          return isinstance(s, basestring)  
  76   
 77  logger = logging.getLogger('rospy.service') 
 78   
 79   
 80   
 81   
 83      """ 
 84      Blocks until service is available. Use this in 
 85      initialization code if your program depends on a 
 86      service already running. 
 87      @param service: name of service 
 88      @type  service: str 
 89      @param timeout: timeout time in seconds, None for no 
 90      timeout. NOTE: timeout=0 is invalid as wait_for_service actually 
 91      contacts the service, so non-blocking behavior is not 
 92      possible. For timeout=0 uses cases, just call the service without 
 93      waiting. 
 94      @type  timeout: double 
 95      @raise ROSException: if specified timeout is exceeded 
 96      @raise ROSInterruptException: if shutdown interrupts wait 
 97      """ 
 98      master = rosgraph.Master(rospy.names.get_caller_id()) 
 99      def contact_service(resolved_name, timeout=10.0): 
100          try: 
101              uri = master.lookupService(resolved_name) 
102          except rosgraph.MasterException: 
103              return False 
104   
105          addr = rospy.core.parse_rosrpc_uri(uri) 
106          s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)             
107          try: 
108               
109               
110              s.settimeout(timeout) 
111              s.connect(addr) 
112              h = { 'probe' : '1', 'md5sum' : '*', 
113                    'callerid' : rospy.core.get_caller_id(), 
114                    'service': resolved_name } 
115              rosgraph.network.write_ros_handshake_header(s, h) 
116              return True 
117          finally: 
118              if s is not None: 
119                  s.close() 
 120      if timeout == 0.: 
121          raise ValueError("timeout must be non-zero") 
122      resolved_name = rospy.names.resolve_name(service) 
123      first = False 
124      if timeout: 
125          timeout_t = time.time() + timeout 
126          while not rospy.core.is_shutdown() and time.time() < timeout_t: 
127              try: 
128                  if contact_service(resolved_name, timeout_t-time.time()): 
129                      return 
130                  time.sleep(0.3) 
131              except KeyboardInterrupt: 
132                   
133                  rospy.core.logdebug("wait_for_service: received keyboard interrupt, assuming signals disabled and re-raising") 
134                  raise  
135              except:  
136                  if first: 
137                      first = False 
138                      rospy.core.logerr("wait_for_service(%s): failed to contact [%s], will keep trying"%(resolved_name, uri)) 
139          if rospy.core.is_shutdown(): 
140              raise ROSInterruptException("rospy shutdown") 
141          else: 
142              raise ROSException("timeout exceeded while waiting for service %s"%resolved_name) 
143      else: 
144          while not rospy.core.is_shutdown(): 
145              try: 
146                  if contact_service(resolved_name): 
147                      return 
148                  time.sleep(0.3) 
149              except KeyboardInterrupt: 
150                   
151                  rospy.core.logdebug("wait_for_service: received keyboard interrupt, assuming signals disabled and re-raising") 
152                  raise  
153              except:  
154                  if first: 
155                      first = False 
156                      rospy.core.logerr("wait_for_service(%s): failed to contact [%s], will keep trying"%(resolved_name, uri)) 
157          if rospy.core.is_shutdown(): 
158              raise ROSInterruptException("rospy shutdown") 
159       
160   
162      """ 
163      Convert return value of function to response instance. The 
164      rules/precedence for this are: 
165   
166      1. If the return type is the same as the response type, no conversion 
167      is done. 
168   
169      2. If the return type is a dictionary, it is used as a keyword-style 
170      initialization for a new response instance. 
171   
172      3. If the return type is *not* a list type, it is passed in as a single arg 
173      to a new response instance. 
174   
175      4. If the return type is a list/tuple type, it is used as a args-style 
176      initialization for a new response instance. 
177      """ 
178   
179       
180       
181       
182      if isinstance(response, genpy.Message) and response._type == response_class._type: 
183       
184          return response 
185      elif type(response) == dict: 
186           
187          try: 
188              return response_class(**response) 
189          except AttributeError as e: 
190              raise ServiceException("handler returned invalid value: %s"%str(e)) 
191      elif response == None: 
192          raise ServiceException("service handler returned None") 
193      elif type(response) not in [list, tuple]: 
194           
195          try: 
196              return response_class(response) 
197          except TypeError as e: 
198              raise ServiceException("handler returned invalid value: %s"%str(e)) 
199      else: 
200           
201           
202          try: 
203              return response_class(*response) 
204          except TypeError as e: 
205              raise ServiceException("handler returned wrong number of values: %s"%str(e)) 
 206   
208      """ 
209      Process incoming service connection. For use with 
210      TCPROSServer. Reads in service name from handshake and creates the 
211      appropriate service handler for the connection. 
212      @param sock: socket connection 
213      @type  sock: socket 
214      @param client_addr: client address 
215      @type  client_addr: (str, int) 
216      @param header: key/value pairs from handshake header 
217      @type  header: dict 
218      @return: error string or None  
219      @rtype: str 
220      """ 
221      for required in ['service', 'md5sum', 'callerid']: 
222          if not required in header: 
223              return "Missing required '%s' field"%required 
224      else: 
225          logger.debug("connection from %s:%s", client_addr[0], client_addr[1]) 
226          service_name = header['service'] 
227           
228           
229           
230           
231           
232           
233          sm = get_service_manager() 
234          md5sum = header['md5sum'] 
235          service = sm.get_service(service_name) 
236          if not service: 
237              return "[%s] is not a provider of  [%s]"%(rospy.names.get_caller_id(), service_name) 
238          elif md5sum != rospy.names.SERVICE_ANYTYPE and md5sum != service.service_class._md5sum: 
239              return "request from [%s]: md5sums do not match: [%s] vs. [%s]"%(header['callerid'], md5sum, service.service_class._md5sum) 
240          else: 
241              transport = TCPROSTransport(service.protocol, service_name, header=header) 
242              transport.set_socket(sock, header['callerid']) 
243              transport.write_header() 
244               
245               
246              _thread.start_new_thread(service.handle, (transport, header)) 
 247                   
248           
250      """ 
251      Protocol implementation for Services over TCPROS 
252      """ 
253   
255          """ 
256          ctor. 
257          @param resolved_name: name of service 
258          @type  resolved_name: str 
259          @param service_class: Service data type class 
260          @type  service_class: Service 
261          @param buff_size int: size of buffer (bytes) to use for reading incoming requests. 
262          @type  buff_size: int 
263          """ 
264          super(TCPService, self).__init__(resolved_name, service_class._request_class, buff_size=buff_size) 
265          self.service_class = service_class 
 266   
268          """ 
269          Protocol API 
270          @return: header fields 
271          @rtype: dict 
272          """ 
273          return {'service': self.resolved_name, 'type': self.service_class._type, 
274                  'md5sum': self.service_class._md5sum, 'callerid': rospy.names.get_caller_id() } 
  275   
277      """Protocol Implementation for Service clients over TCPROS""" 
278       
280          """ 
281          ctor. 
282          @param resolved_name: resolved service name  
283          @type  resolved_name: str 
284          @param service_class: Service data type class 
285          @type  service_class: Service 
286          @param headers: identifier for Service session 
287          @type  headers: dict 
288          @param buff_size: size of buffer (bytes) for reading responses from Service.  
289          @type  buff_size: int 
290          """ 
291          super(TCPROSServiceClient, self).__init__(resolved_name, service_class._response_class) 
292          self.service_class = service_class 
293          self.headers = headers or {} 
294          self.buff_size = buff_size 
 295           
297          """ 
298          TCPROSTransportProtocol API         
299          """ 
300          headers = {'service': self.resolved_name, 'md5sum': self.service_class._md5sum, 
301                     'callerid': rospy.names.get_caller_id()} 
302           
303           
304           
305          for k, v in self.headers.items(): 
306              headers[k] = v 
307          return headers 
 308       
310          """ 
311          Utility for reading the OK-byte/error-message header preceding each message. 
312          @param sock: socket connection. Will be read from if OK byte is 
313          false and error message needs to be read 
314          @type  sock: socket.socket 
315          @param b: buffer to read from 
316          @type  b: StringIO 
317          """ 
318          if b.tell() == 0: 
319              return 
320          pos = b.tell() 
321          b.seek(0) 
322          ok = struct.unpack('<B', b.read(1))[0]  
323          b.seek(pos) 
324          if not ok: 
325              str = self._read_service_error(sock, b) 
326               
327               
328               
329               
330               
331              b.seek(0) 
332              b.truncate(0) 
333              raise ServiceException("service [%s] responded with an error: %s"%(self.resolved_name, str)) 
334          else: 
335               
336              b.seek(pos) 
 337           
339          """ 
340          In service implementation, reads in OK byte that preceeds each 
341          response. The OK byte allows for the passing of error messages 
342          instead of a response message 
343          @param b: buffer 
344          @type  b: StringIO 
345          @param msg_queue: Message queue to append to 
346          @type  msg_queue: [Message] 
347          @param sock: socket to read from 
348          @type  sock: socket.socket 
349          """ 
350          self._read_ok_byte(b, sock) 
351          rospy.msg.deserialize_messages(b, msg_queue, self.recv_data_class, queue_size=self.queue_size, max_msgs=1, start=1)  
352           
353           
354           
355          if b.tell() == 1: 
356              b.seek(0) 
 357           
359          """ 
360          Read service error from sock  
361          @param sock: socket to read from 
362          @type  sock: socket 
363          @param b: currently read data from sock 
364          @type  b: StringIO 
365          """ 
366          buff_size = 256  
367          while b.tell() < 5: 
368              recv_buff(sock, b, buff_size) 
369          bval = b.getvalue() 
370          (length,) = struct.unpack('<I', bval[1:5])  
371          while b.tell() < (5 + length): 
372              recv_buff(sock, b, buff_size) 
373          bval = b.getvalue() 
374          return struct.unpack('<%ss'%length, bval[5:5+length])[0]  
  375   
376       
378      """ 
379      Create a handle to a ROS service for invoking calls. 
380   
381      Usage:: 
382        add_two_ints = ServiceProxy('add_two_ints', AddTwoInts) 
383        resp = add_two_ints(1, 2) 
384      """ 
385       
386 -    def __init__(self, name, service_class, persistent=False, headers=None): 
 387          """ 
388          ctor. 
389          @param name: name of service to call 
390          @type  name: str 
391          @param service_class: auto-generated service class 
392          @type  service_class: Service class 
393          @param persistent: (optional) if True, proxy maintains a persistent 
394          connection to service. While this results in better call 
395          performance, persistent connections are discouraged as they are 
396          less resistent to network issues and service restarts. 
397          @type  persistent: bool 
398          @param headers: (optional) arbitrary headers  
399          @type  headers: dict 
400          """ 
401          super(ServiceProxy, self).__init__(name, service_class) 
402          self.uri = None 
403          self.seq = 0 
404          self.buff_size = DEFAULT_BUFF_SIZE 
405          self.persistent = persistent 
406          if persistent: 
407              if not headers: 
408                  headers = {} 
409              headers['persistent'] = '1' 
410          self.protocol = TCPROSServiceClient(self.resolved_name, 
411                                              self.service_class, headers=headers) 
412          self.transport = None  
 413   
416   
417       
419          """ 
420          Callable-style version of the service API. This accepts either a request message instance, 
421          or you can call directly with arguments to create a new request instance. e.g.:: 
422           
423            add_two_ints(AddTwoIntsRequest(1, 2)) 
424            add_two_ints(1, 2) 
425            add_two_ints(a=1, b=2)           
426           
427          @param args: arguments to remote service 
428          @param kwds: message keyword arguments 
429          @raise ROSSerializationException: If unable to serialize 
430          message. This is usually a type error with one of the fields. 
431          """ 
432          return self.call(*args, **kwds) 
 433       
435          """ 
436          private routine for getting URI of service to call 
437          @param request: request message 
438          @type  request: L{rospy.Message} 
439          """ 
440          if not isinstance(request, genpy.Message): 
441              raise TypeError("request object is not a valid request message instance") 
442           
443           
444           
445           
446          if not self.request_class._type == request._type: 
447              raise TypeError("request object type [%s] does not match service type [%s]"%(request.__class__, self.request_class)) 
448   
449           
450           
451          if 1:  
452              try: 
453                  try: 
454                      master = rosgraph.Master(rospy.names.get_caller_id()) 
455                      self.uri = master.lookupService(self.resolved_name) 
456                  except socket.error: 
457                      raise ServiceException("unable to contact master") 
458                  except rosgraph.MasterError as e: 
459                      logger.error("[%s]: lookup service failed with message [%s]", self.resolved_name, str(e)) 
460                      raise ServiceException("service [%s] unavailable"%self.resolved_name) 
461                   
462                   
463                  try: 
464                      rospy.core.parse_rosrpc_uri(self.uri) 
465                  except rospy.impl.validators.ParameterInvalid: 
466                      raise ServiceException("master returned invalid ROSRPC URI: %s"%self.uri) 
467              except socket.error as e: 
468                  logger.error("[%s]: socket error contacting service, master is probably unavailable",self.resolved_name) 
469          return self.uri 
 470   
471 -    def call(self, *args, **kwds): 
 472          """ 
473          Call the service. This accepts either a request message instance, 
474          or you can call directly with arguments to create a new request instance. e.g.:: 
475           
476            add_two_ints(AddTwoIntsRequest(1, 2)) 
477            add_two_ints(1, 2) 
478            add_two_ints(a=1, b=2)           
479           
480          @raise TypeError: if request is not of the valid type (Message) 
481          @raise ServiceException: if communication with remote service fails 
482          @raise ROSInterruptException: if node shutdown (e.g. ctrl-C) interrupts service call 
483          @raise ROSSerializationException: If unable to serialize 
484          message. This is usually a type error with one of the fields. 
485          """ 
486   
487           
488          request = rospy.msg.args_kwds_to_message(self.request_class, args, kwds)  
489               
490           
491          if self.transport is None: 
492              service_uri = self._get_service_uri(request) 
493              dest_addr, dest_port = rospy.core.parse_rosrpc_uri(service_uri) 
494   
495               
496              transport = TCPROSTransport(self.protocol, self.resolved_name) 
497              transport.buff_size = self.buff_size 
498              try: 
499                  transport.connect(dest_addr, dest_port, service_uri) 
500              except TransportInitError as e: 
501                   
502                  raise ServiceException("unable to connect to service: %s"%e) 
503              self.transport = transport 
504          else: 
505              transport = self.transport 
506   
507           
508          self.seq += 1 
509          transport.send_message(request, self.seq) 
510   
511          try: 
512              responses = transport.receive_once() 
513              if len(responses) == 0: 
514                  raise ServiceException("service [%s] returned no response"%self.resolved_name) 
515              elif len(responses) > 1: 
516                  raise ServiceException("service [%s] returned multiple responses: %s"%(self.resolved_name, len(responses))) 
517          except rospy.exceptions.TransportException as e: 
518               
519              if rospy.core.is_shutdown(): 
520                  raise rospy.exceptions.ROSInterruptException("node shutdown interrupted service call") 
521              else: 
522                  raise ServiceException("transport error completing service call: %s"%(str(e))) 
523          finally: 
524              if not self.persistent: 
525                  transport.close() 
526                  self.transport = None 
527          return responses[0] 
 528   
529       
531          """Close this ServiceProxy. This only has an effect on persistent ServiceProxy instances.""" 
532          if self.transport is not None: 
533              self.transport.close() 
  534   
536      """ 
537      Implementation of ROS Service. This intermediary class allows for more configuration of behavior than the Service class. 
538      """ 
539       
541          super(ServiceImpl, self).__init__(name, service_class) 
542   
543          if not name or not isstring(name): 
544              raise ValueError("service name is not a non-empty string") 
545           
546          if not rosgraph.names.is_legal_name(name): 
547              import warnings 
548              warnings.warn("'%s' is not a legal ROS graph resource name. This may cause problems with other ROS tools"%name, stacklevel=2) 
549   
550           
551          self.handler = handler 
552          self.registered = False 
553          self.seq = 0 
554          self.done = False 
555          self.buff_size=buff_size 
556   
557          start_tcpros_server()  
558          host, port = get_tcpros_server_address() 
559          self.uri = '%s%s:%s'%(rospy.core.ROSRPC, host, port) 
560          logdebug("... service URL is %s"%self.uri) 
561   
562          self.protocol = TCPService(self.resolved_name, service_class, self.buff_size) 
563   
564          logdebug("[%s]: new Service instance"%self.resolved_name) 
 565   
566       
567       
569          """ 
570          Stop this service 
571          @param reason: human-readable shutdown reason 
572          @type  reason: str 
573          """ 
574          self.done = True 
575          logdebug('[%s].shutdown: reason [%s]'%(self.resolved_name, reason)) 
576          try: 
577               
578              get_service_manager().unregister(self.resolved_name, self) 
579          except Exception as e: 
580              logerr("Unable to unregister with master: "+traceback.format_exc()) 
581              raise ServiceException("Unable to connect to master: %s"%e) 
 582   
584          """ 
585          Let service run and take over thread until service or node 
586          shutdown. Use this method to keep your scripts from exiting 
587          execution. 
588          """ 
589          try: 
590              while not rospy.core.is_shutdown() and not self.done: 
591                  time.sleep(0.5) 
592          except KeyboardInterrupt: 
593              logdebug("keyboard interrupt, shutting down") 
 594   
596          """ 
597          Send error message to client 
598          @param transport: transport connection to client  
599          @type  transport: Transport 
600          @param err_msg: error message to send to client 
601          @type  err_msg: str 
602          """ 
603          transport.write_data(struct.pack('<BI%ss'%len(err_msg), 0, len(err_msg), err_msg)) 
 604   
606          """ 
607          Process a single incoming request. 
608          @param transport: transport instance 
609          @type  transport: L{TCPROSTransport} 
610          @param request: Message 
611          @type  request: genpy.Message 
612          """ 
613          try: 
614               
615              response = convert_return_to_response(self.handler(request), self.response_class) 
616              self.seq += 1 
617               
618              transport.write_buff.write(struct.pack('<B', 1)) 
619              transport.send_message(response, self.seq) 
620          except ServiceException as e: 
621              rospy.core.rospydebug("handler raised ServiceException: %s"%(e)) 
622              self._write_service_error(transport, "service cannot process request: %s"%e) 
623          except Exception as e: 
624              logerr("Error processing request: %s\n%s"%(e,traceback.print_exc())) 
625              self._write_service_error(transport, "error processing request: %s"%e) 
 626       
627 -    def handle(self, transport, header): 
 628          """ 
629          Process incoming request. This method should be run in its 
630          own thread. If header['persistent'] is set to 1, method will 
631          block until connection is broken. 
632          @param transport: transport instance 
633          @type  transport: L{TCPROSTransport} 
634          @param header: headers from client 
635          @type  header: dict 
636          """ 
637          if 'persistent' in header and \ 
638                 header['persistent'].lower() in ['1', 'true']: 
639              persistent = True 
640          else: 
641              persistent = False 
642          if header.get('probe', None) == '1': 
643               
644              transport.close() 
645              return 
646          handle_done = False 
647          while not handle_done: 
648              try: 
649                  requests = transport.receive_once() 
650                  for request in requests: 
651                      self._handle_request(transport, request) 
652                  if not persistent: 
653                      handle_done = True 
654              except rospy.exceptions.TransportTerminated as e: 
655                  if not persistent: 
656                      logerr("incoming connection failed: %s"%e) 
657                  logdebug("service[%s]: transport terminated"%self.resolved_name) 
658                  handle_done = True 
659          transport.close() 
  660   
661   
663      """ 
664      Declare a ROS service. Service requests are passed to the 
665      specified handler.  
666   
667      Service Usage:: 
668        s = Service('getmapservice', GetMap, get_map_handler) 
669      """ 
670       
672          """ 
673          ctor. 
674   
675          @param name: service name, ``str`` 
676          @param service_class: Service definition class 
677           
678          @param handler: callback function for processing service 
679          request. Function takes in a ServiceRequest and returns a 
680          ServiceResponse of the appropriate type. Function may also 
681          return a list, tuple, or dictionary with arguments to initialize 
682          a ServiceResponse instance of the correct type. 
683   
684          If handler cannot process request, it may either return None, 
685          to indicate failure, or it may raise a rospy.ServiceException 
686          to send a specific error message to the client. Returning None 
687          is always considered a failure. 
688           
689          @type  handler: fn(req)->resp 
690          @param buff_size: size of buffer for reading incoming requests. Should be at least size of request message 
691          @type  buff_size: int 
692          """ 
693          super(Service, self).__init__(name, service_class, handler, buff_size) 
694   
695           
696          get_service_manager().register(self.resolved_name, self) 
  697