1   
  2   
  3   
  4   
  5   
  6   
  7   
  8   
  9   
 10   
 11   
 12   
 13   
 14   
 15   
 16   
 17   
 18   
 19   
 20   
 21   
 22   
 23   
 24   
 25   
 26   
 27   
 28   
 29   
 30   
 31   
 32   
 33  """Internal use: Service-specific extensions for TCPROS support""" 
 34   
 35  import io 
 36  import socket 
 37  import struct 
 38  import sys 
 39  import logging 
 40   
 41  import time 
 42  import traceback 
 43   
 44  import genpy 
 45   
 46  import rosgraph 
 47  import rosgraph.names 
 48  import rosgraph.network 
 49   
 50  from rospy.exceptions import TransportInitError, TransportTerminated, ROSException, ROSInterruptException 
 51  from rospy.service import _Service, ServiceException 
 52   
 53  from rospy.impl.registration import get_service_manager 
 54  from rospy.impl.tcpros_base import TCPROSTransport, TCPROSTransportProtocol, \ 
 55      get_tcpros_server_address, start_tcpros_server, recv_buff, \ 
 56      DEFAULT_BUFF_SIZE 
 57   
 58  from rospy.core import logwarn, loginfo, logerr, logdebug 
 59  import rospy.core 
 60  import rospy.msg 
 61  import rospy.names 
 62   
 63  import rospy.impl.validators 
 64   
 65  import threading 
 66   
 67  if sys.hexversion > 0x03000000:  
 69          return isinstance(s, str)  
  70  else: 
 72          return isinstance(s, basestring)  
  73   
 74  logger = logging.getLogger('rospy.service') 
 75   
 76   
 77   
 78   
 80      """ 
 81      Blocks until service is available. Use this in 
 82      initialization code if your program depends on a 
 83      service already running. 
 84      @param service: name of service 
 85      @type  service: str 
 86      @param timeout: timeout time in seconds, None for no 
 87      timeout. NOTE: timeout=0 is invalid as wait_for_service actually 
 88      contacts the service, so non-blocking behavior is not 
 89      possible. For timeout=0 uses cases, just call the service without 
 90      waiting. 
 91      @type  timeout: double 
 92      @raise ROSException: if specified timeout is exceeded 
 93      @raise ROSInterruptException: if shutdown interrupts wait 
 94      """ 
 95      master = rosgraph.Master(rospy.names.get_caller_id()) 
 96      def contact_service(resolved_name, timeout=10.0): 
 97          try: 
 98              uri = master.lookupService(resolved_name) 
 99          except rosgraph.MasterException: 
100              return False 
101   
102          addr = rospy.core.parse_rosrpc_uri(uri) 
103          if rosgraph.network.use_ipv6(): 
104              s = socket.socket(socket.AF_INET6, socket.SOCK_STREAM) 
105          else: 
106              s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 
107          try: 
108               
109               
110              s.settimeout(timeout) 
111              logdebug('connecting to ' + str(addr)) 
112              s.connect(addr) 
113              h = { 'probe' : '1', 'md5sum' : '*', 
114                    'callerid' : rospy.core.get_caller_id(), 
115                    'service': resolved_name } 
116              rosgraph.network.write_ros_handshake_header(s, h) 
117              return uri 
118          finally: 
119              if s is not None: 
120                  s.close() 
 121      if timeout == 0.: 
122          raise ValueError("timeout must be non-zero") 
123      resolved_name = rospy.names.resolve_name(service) 
124      contact_failed = False 
125      if timeout: 
126          timeout_t = time.time() + timeout 
127          while not rospy.core.is_shutdown() and time.time() < timeout_t: 
128              try: 
129                  uri = contact_service(resolved_name, timeout_t - time.time()) 
130                  if uri: 
131                      if contact_failed: 
132                          rospy.core.loginfo("wait_for_service(%s): finally were able to contact [%s]"%(resolved_name, uri)) 
133                      return 
134              except KeyboardInterrupt: 
135                   
136                  rospy.core.logdebug("wait_for_service: received keyboard interrupt, assuming signals disabled and re-raising") 
137                  raise  
138              except:  
139                  contact_failed = True 
140                  rospy.core.logwarn_throttle(10, "wait_for_service(%s): failed to contact, will keep trying"%resolved_name) 
141              time.sleep(0.3) 
142          if rospy.core.is_shutdown(): 
143              raise ROSInterruptException("rospy shutdown") 
144          else: 
145              raise ROSException("timeout exceeded while waiting for service %s"%resolved_name) 
146      else: 
147          while not rospy.core.is_shutdown(): 
148              try: 
149                  uri = contact_service(resolved_name) 
150                  if uri: 
151                      if contact_failed: 
152                          rospy.core.loginfo("wait_for_service(%s): finally were able to contact [%s]"%(resolved_name, uri)) 
153                      return 
154              except KeyboardInterrupt: 
155                   
156                  rospy.core.logdebug("wait_for_service: received keyboard interrupt, assuming signals disabled and re-raising") 
157                  raise  
158              except:  
159                  contact_failed = True 
160                  rospy.core.logwarn_throttle(10, "wait_for_service(%s): failed to contact, will keep trying"%resolved_name) 
161              time.sleep(0.3) 
162          if rospy.core.is_shutdown(): 
163              raise ROSInterruptException("rospy shutdown") 
164       
165   
167      """ 
168      Convert return value of function to response instance. The 
169      rules/precedence for this are: 
170   
171      1. If the return type is the same as the response type, no conversion 
172      is done. 
173   
174      2. If the return type is a dictionary, it is used as a keyword-style 
175      initialization for a new response instance. 
176   
177      3. If the return type is *not* a list type, it is passed in as a single arg 
178      to a new response instance. 
179   
180      4. If the return type is a list/tuple type, it is used as a args-style 
181      initialization for a new response instance. 
182      """ 
183   
184       
185       
186       
187      if isinstance(response, genpy.Message) and response._type == response_class._type: 
188       
189          return response 
190      elif type(response) == dict: 
191           
192          try: 
193              return response_class(**response) 
194          except AttributeError as e: 
195              raise ServiceException("handler returned invalid value: %s"%str(e)) 
196      elif response == None: 
197          raise ServiceException("service handler returned None") 
198      elif type(response) not in [list, tuple]: 
199           
200          try: 
201              return response_class(response) 
202          except TypeError as e: 
203              raise ServiceException("handler returned invalid value: %s"%str(e)) 
204      else: 
205           
206           
207          try: 
208              return response_class(*response) 
209          except TypeError as e: 
210              raise ServiceException("handler returned wrong number of values: %s"%str(e)) 
 211   
213      """ 
214      Process incoming service connection. For use with 
215      TCPROSServer. Reads in service name from handshake and creates the 
216      appropriate service handler for the connection. 
217      @param sock: socket connection 
218      @type  sock: socket 
219      @param client_addr: client address 
220      @type  client_addr: (str, int) 
221      @param header: key/value pairs from handshake header 
222      @type  header: dict 
223      @return: error string or None  
224      @rtype: str 
225      """ 
226      for required in ['service', 'md5sum', 'callerid']: 
227          if not required in header: 
228              return "Missing required '%s' field"%required 
229      else: 
230          logger.debug("connection from %s:%s", client_addr[0], client_addr[1]) 
231          service_name = header['service'] 
232           
233           
234           
235           
236           
237           
238          sm = get_service_manager() 
239          md5sum = header['md5sum'] 
240          service = sm.get_service(service_name) 
241          if not service: 
242              return "[%s] is not a provider of  [%s]"%(rospy.names.get_caller_id(), service_name) 
243          elif md5sum != rospy.names.SERVICE_ANYTYPE and md5sum != service.service_class._md5sum: 
244              return "request from [%s]: md5sums do not match: [%s] vs. [%s]"%(header['callerid'], md5sum, service.service_class._md5sum) 
245          else: 
246              transport = TCPROSTransport(service.protocol, service_name, header=header) 
247              transport.set_socket(sock, header['callerid']) 
248              transport.write_header() 
249               
250               
251              t = threading.Thread(target=service.handle, args=(transport, header)) 
252              t.daemon = True 
253              t.start() 
 254                   
255           
257      """ 
258      Protocol implementation for Services over TCPROS 
259      """ 
260   
262          """ 
263          ctor. 
264          @param resolved_name: name of service 
265          @type  resolved_name: str 
266          @param service_class: Service data type class 
267          @type  service_class: Service 
268          @param buff_size int: size of buffer (bytes) to use for reading incoming requests. 
269          @type  buff_size: int 
270          """ 
271          super(TCPService, self).__init__(resolved_name, service_class._request_class, buff_size=buff_size) 
272          self.service_class = service_class 
 273   
275          """ 
276          Protocol API 
277          @return: header fields 
278          @rtype: dict 
279          """ 
280          return {'service': self.resolved_name, 'type': self.service_class._type, 
281                  'md5sum': self.service_class._md5sum, 'callerid': rospy.names.get_caller_id() } 
  282   
284      """Protocol Implementation for Service clients over TCPROS""" 
285       
287          """ 
288          ctor. 
289          @param resolved_name: resolved service name  
290          @type  resolved_name: str 
291          @param service_class: Service data type class 
292          @type  service_class: Service 
293          @param headers: identifier for Service session 
294          @type  headers: dict 
295          @param buff_size: size of buffer (bytes) for reading responses from Service.  
296          @type  buff_size: int 
297          """ 
298          super(TCPROSServiceClient, self).__init__(resolved_name, service_class._response_class) 
299          self.service_class = service_class 
300          self.headers = headers or {} 
301          self.buff_size = buff_size 
 302           
304          """ 
305          TCPROSTransportProtocol API         
306          """ 
307          headers = {'service': self.resolved_name, 'md5sum': self.service_class._md5sum, 
308                     'callerid': rospy.names.get_caller_id()} 
309           
310           
311           
312          for k, v in self.headers.items(): 
313              headers[k] = v 
314          return headers 
 315       
317          """ 
318          Utility for reading the OK-byte/error-message header preceding each message. 
319          @param sock: socket connection. Will be read from if OK byte is 
320          false and error message needs to be read 
321          @type  sock: socket.socket 
322          @param b: buffer to read from 
323          @type  b: StringIO 
324          """ 
325          if b.tell() == 0: 
326              return 
327          pos = b.tell() 
328          b.seek(0) 
329          ok = struct.unpack('<B', b.read(1))[0]  
330          b.seek(pos) 
331          if not ok: 
332              str = self._read_service_error(sock, b) 
333               
334               
335               
336               
337               
338              b.seek(0) 
339              b.truncate(0) 
340              raise ServiceException("service [%s] responded with an error: %s"%(self.resolved_name, str)) 
341          else: 
342               
343              b.seek(pos) 
 344           
346          """ 
347          In service implementation, reads in OK byte that preceeds each 
348          response. The OK byte allows for the passing of error messages 
349          instead of a response message 
350          @param b: buffer 
351          @type  b: StringIO 
352          @param msg_queue: Message queue to append to 
353          @type  msg_queue: [Message] 
354          @param sock: socket to read from 
355          @type  sock: socket.socket 
356          """ 
357          self._read_ok_byte(b, sock) 
358          rospy.msg.deserialize_messages(b, msg_queue, self.recv_data_class, queue_size=self.queue_size, max_msgs=1, start=1)  
359           
360           
361           
362          if b.tell() == 1: 
363              b.seek(0) 
 364           
366          """ 
367          Read service error from sock  
368          @param sock: socket to read from 
369          @type  sock: socket 
370          @param b: currently read data from sock 
371          @type  b: StringIO 
372          """ 
373          buff_size = 256  
374          while b.tell() < 5: 
375              recv_buff(sock, b, buff_size) 
376          bval = b.getvalue() 
377          (length,) = struct.unpack('<I', bval[1:5])  
378          while b.tell() < (5 + length): 
379              recv_buff(sock, b, buff_size) 
380          bval = b.getvalue() 
381          return struct.unpack('<%ss'%length, bval[5:5+length])[0]  
  382   
383       
385      """ 
386      Create a handle to a ROS service for invoking calls. 
387   
388      Usage:: 
389        add_two_ints = ServiceProxy('add_two_ints', AddTwoInts) 
390        resp = add_two_ints(1, 2) 
391      """ 
392       
393 -    def __init__(self, name, service_class, persistent=False, headers=None): 
 394          """ 
395          ctor. 
396          @param name: name of service to call 
397          @type  name: str 
398          @param service_class: auto-generated service class 
399          @type  service_class: Service class 
400          @param persistent: (optional) if True, proxy maintains a persistent 
401          connection to service. While this results in better call 
402          performance, persistent connections are discouraged as they are 
403          less resistent to network issues and service restarts. 
404          @type  persistent: bool 
405          @param headers: (optional) arbitrary headers  
406          @type  headers: dict 
407          """ 
408          super(ServiceProxy, self).__init__(name, service_class) 
409          self.uri = None 
410          self.seq = 0 
411          self.buff_size = DEFAULT_BUFF_SIZE 
412          self.persistent = persistent 
413          if persistent: 
414              if not headers: 
415                  headers = {} 
416              headers['persistent'] = '1' 
417          self.protocol = TCPROSServiceClient(self.resolved_name, 
418                                              self.service_class, headers=headers) 
419          self.transport = None  
 420   
423   
424       
426          """ 
427          Callable-style version of the service API. This accepts either a request message instance, 
428          or you can call directly with arguments to create a new request instance. e.g.:: 
429           
430            add_two_ints(AddTwoIntsRequest(1, 2)) 
431            add_two_ints(1, 2) 
432            add_two_ints(a=1, b=2)           
433           
434          @param args: arguments to remote service 
435          @param kwds: message keyword arguments 
436          @raise ROSSerializationException: If unable to serialize 
437          message. This is usually a type error with one of the fields. 
438          """ 
439          return self.call(*args, **kwds) 
 440       
442          """ 
443          private routine for getting URI of service to call 
444          @param request: request message 
445          @type  request: L{rospy.Message} 
446          """ 
447          if not isinstance(request, genpy.Message): 
448              raise TypeError("request object is not a valid request message instance") 
449           
450           
451           
452           
453          if not self.request_class._type == request._type: 
454              raise TypeError("request object type [%s] does not match service type [%s]"%(request.__class__, self.request_class)) 
455   
456           
457           
458          if 1:  
459              try: 
460                  try: 
461                      master = rosgraph.Master(rospy.names.get_caller_id()) 
462                      self.uri = master.lookupService(self.resolved_name) 
463                  except socket.error: 
464                      raise ServiceException("unable to contact master") 
465                  except rosgraph.MasterError as e: 
466                      logger.error("[%s]: lookup service failed with message [%s]", self.resolved_name, str(e)) 
467                      raise ServiceException("service [%s] unavailable"%self.resolved_name) 
468                   
469                   
470                  try: 
471                      rospy.core.parse_rosrpc_uri(self.uri) 
472                  except rospy.impl.validators.ParameterInvalid: 
473                      raise ServiceException("master returned invalid ROSRPC URI: %s"%self.uri) 
474              except socket.error as e: 
475                  logger.error("[%s]: socket error contacting service, master is probably unavailable",self.resolved_name) 
476          return self.uri 
 477   
478 -    def call(self, *args, **kwds): 
 479          """ 
480          Call the service. This accepts either a request message instance, 
481          or you can call directly with arguments to create a new request instance. e.g.:: 
482           
483            add_two_ints(AddTwoIntsRequest(1, 2)) 
484            add_two_ints(1, 2) 
485            add_two_ints(a=1, b=2)           
486           
487          @raise TypeError: if request is not of the valid type (Message) 
488          @raise ServiceException: if communication with remote service fails 
489          @raise ROSInterruptException: if node shutdown (e.g. ctrl-C) interrupts service call 
490          @raise ROSSerializationException: If unable to serialize 
491          message. This is usually a type error with one of the fields. 
492          """ 
493   
494           
495          request = rospy.msg.args_kwds_to_message(self.request_class, args, kwds)  
496               
497           
498          if self.transport is None: 
499              service_uri = self._get_service_uri(request) 
500              dest_addr, dest_port = rospy.core.parse_rosrpc_uri(service_uri) 
501   
502               
503              transport = TCPROSTransport(self.protocol, self.resolved_name) 
504              transport.buff_size = self.buff_size 
505              try: 
506                  transport.connect(dest_addr, dest_port, service_uri) 
507              except TransportInitError as e: 
508                   
509                  raise ServiceException("unable to connect to service: %s"%e) 
510              self.transport = transport 
511          else: 
512              transport = self.transport 
513   
514           
515          self.seq += 1 
516          transport.send_message(request, self.seq) 
517   
518          try: 
519              responses = transport.receive_once() 
520              if len(responses) == 0: 
521                  raise ServiceException("service [%s] returned no response"%self.resolved_name) 
522              elif len(responses) > 1: 
523                  raise ServiceException("service [%s] returned multiple responses: %s"%(self.resolved_name, len(responses))) 
524          except rospy.exceptions.TransportException as e: 
525               
526              if rospy.core.is_shutdown(): 
527                  raise rospy.exceptions.ROSInterruptException("node shutdown interrupted service call") 
528              else: 
529                  raise ServiceException("transport error completing service call: %s"%(str(e))) 
530          finally: 
531              if not self.persistent: 
532                  transport.close() 
533                  self.transport = None 
534          return responses[0] 
 535   
536       
538          """Close this ServiceProxy. This only has an effect on persistent ServiceProxy instances.""" 
539          if self.transport is not None: 
540              self.transport.close() 
  541   
543      """ 
544      Implementation of ROS Service. This intermediary class allows for more configuration of behavior than the Service class. 
545      """ 
546       
548          super(ServiceImpl, self).__init__(name, service_class) 
549   
550          if not name or not isstring(name): 
551              raise ValueError("service name is not a non-empty string") 
552           
553          if not rosgraph.names.is_legal_name(name): 
554              import warnings 
555              warnings.warn("'%s' is not a legal ROS graph resource name. This may cause problems with other ROS tools"%name, stacklevel=2) 
556   
557           
558          self.handler = handler 
559          if error_handler is not None: 
560              self.error_handler = error_handler 
561          self.registered = False 
562          self.seq = 0 
563          self.done = False 
564          self.buff_size=buff_size 
565   
566          start_tcpros_server()  
567          host, port = get_tcpros_server_address() 
568          self.uri = '%s%s:%s'%(rospy.core.ROSRPC, host, port) 
569          logdebug("... service URL is %s"%self.uri) 
570   
571          self.protocol = TCPService(self.resolved_name, service_class, self.buff_size) 
572   
573          logdebug("[%s]: new Service instance"%self.resolved_name) 
 574   
575       
576   
578          logerr("Error processing request: %s\n%s" % (e, traceback.format_exception(exc_type, exc_value, tb))) 
 579   
581          """ 
582          Stop this service 
583          @param reason: human-readable shutdown reason 
584          @type  reason: str 
585          """ 
586          self.done = True 
587          logdebug('[%s].shutdown: reason [%s]'%(self.resolved_name, reason)) 
588          try: 
589               
590              get_service_manager().unregister(self.resolved_name, self) 
591          except Exception as e: 
592              logerr("Unable to unregister with master: "+traceback.format_exc()) 
593              raise ServiceException("Unable to connect to master: %s"%e) 
 594   
596          """ 
597          Let service run and take over thread until service or node 
598          shutdown. Use this method to keep your scripts from exiting 
599          execution. 
600          """ 
601          try: 
602              while not rospy.core.is_shutdown() and not self.done: 
603                  time.sleep(0.5) 
604          except KeyboardInterrupt: 
605              logdebug("keyboard interrupt, shutting down") 
 606   
608          """ 
609          Send error message to client 
610          @param transport: transport connection to client  
611          @type  transport: Transport 
612          @param err_msg: error message to send to client 
613          @type  err_msg: str 
614          """ 
615          if sys.hexversion > 0x03000000:  
616              err_msg = bytes(err_msg, 'utf-8') 
617          transport.write_data(struct.pack('<BI%ss'%len(err_msg), 0, len(err_msg), err_msg)) 
 618   
620          """ 
621          Process a single incoming request. 
622          @param transport: transport instance 
623          @type  transport: L{TCPROSTransport} 
624          @param request: Message 
625          @type  request: genpy.Message 
626          """ 
627          try: 
628               
629              response = convert_return_to_response(self.handler(request), self.response_class) 
630              self.seq += 1 
631               
632              transport.write_buff.write(struct.pack('<B', 1)) 
633              transport.send_message(response, self.seq) 
634          except ServiceException as e: 
635              rospy.core.rospydebug("handler raised ServiceException: %s"%(e)) 
636              self._write_service_error(transport, "service cannot process request: %s"%e) 
637          except Exception as e: 
638              (exc_type, exc_value, tb) = sys.exc_info() 
639              self.error_handler(e, exc_type, exc_value, tb) 
640              self._write_service_error(transport, "error processing request: %s"%e) 
 641   
642 -    def handle(self, transport, header): 
 643          """ 
644          Process incoming request. This method should be run in its 
645          own thread. If header['persistent'] is set to 1, method will 
646          block until connection is broken. 
647          @param transport: transport instance 
648          @type  transport: L{TCPROSTransport} 
649          @param header: headers from client 
650          @type  header: dict 
651          """ 
652          if 'persistent' in header and \ 
653                 header['persistent'].lower() in ['1', 'true']: 
654              persistent = True 
655          else: 
656              persistent = False 
657          if header.get('probe', None) == '1': 
658               
659              transport.close() 
660              return 
661          handle_done = False 
662          while not handle_done: 
663              try: 
664                  requests = transport.receive_once() 
665                  for request in requests: 
666                      self._handle_request(transport, request) 
667                  if not persistent: 
668                      handle_done = True 
669              except rospy.exceptions.TransportTerminated as e: 
670                  if not persistent: 
671                      logerr("incoming connection failed: %s"%e) 
672                  logdebug("service[%s]: transport terminated"%self.resolved_name) 
673                  handle_done = True 
674          transport.close() 
  675   
676   
678      """ 
679      Declare a ROS service. Service requests are passed to the 
680      specified handler.  
681   
682      Service Usage:: 
683        s = Service('getmapservice', GetMap, get_map_handler) 
684      """ 
685       
688          """ 
689          ctor. 
690   
691          @param name: service name, ``str`` 
692          @param service_class: Service definition class 
693           
694          @param handler: callback function for processing service 
695          request. Function takes in a ServiceRequest and returns a 
696          ServiceResponse of the appropriate type. Function may also 
697          return a list, tuple, or dictionary with arguments to initialize 
698          a ServiceResponse instance of the correct type. 
699   
700          If handler cannot process request, it may either return None, 
701          to indicate failure, or it may raise a rospy.ServiceException 
702          to send a specific error message to the client. Returning None 
703          is always considered a failure. 
704           
705          @type  handler: fn(req)->resp 
706   
707          @param buff_size: size of buffer for reading incoming requests. Should be at least size of request message 
708          @type  buff_size: int 
709   
710          @param error_handler: callback function for handling errors 
711          raised in the service code. 
712          @type  error_handler: fn(exception, exception_type, exception_value, traceback)->None 
713          """ 
714          super(Service, self).__init__(name, service_class, handler, buff_size, 
715                                        error_handler) 
716   
717           
718          get_service_manager().register(self.resolved_name, self) 
  719