1   
  2   
  3   
  4   
  5   
  6   
  7   
  8   
  9   
 10   
 11   
 12   
 13   
 14   
 15   
 16   
 17   
 18   
 19   
 20   
 21   
 22   
 23   
 24   
 25   
 26   
 27   
 28   
 29   
 30   
 31   
 32   
 33  """Internal use: Service-specific extensions for TCPROS support""" 
 34   
 35  import io 
 36  import socket 
 37  import struct 
 38  import sys 
 39  import logging 
 40   
 41  import time 
 42  import traceback 
 43   
 44  import genpy 
 45   
 46  import rosgraph 
 47  import rosgraph.names 
 48  import rosgraph.network 
 49   
 50  from rospy.exceptions import TransportInitError, TransportTerminated, ROSException, ROSInterruptException 
 51  from rospy.service import _Service, ServiceException 
 52   
 53  from rospy.impl.registration import get_service_manager 
 54  from rospy.impl.tcpros_base import TCPROSTransport, TCPROSTransportProtocol, \ 
 55      get_tcpros_server_address, start_tcpros_server, recv_buff, \ 
 56      DEFAULT_BUFF_SIZE 
 57   
 58  from rospy.core import logwarn, loginfo, logerr, logdebug 
 59  import rospy.core 
 60  import rospy.msg 
 61  import rospy.names 
 62   
 63  import rospy.impl.validators 
 64   
 65  import threading 
 66   
 67  if sys.hexversion > 0x03000000:  
 69          return isinstance(s, str)  
  70  else: 
 72          return isinstance(s, basestring)  
  73   
 74  logger = logging.getLogger('rospy.service') 
 75   
 76   
 77   
 78   
 80      """ 
 81      Blocks until service is available. Use this in 
 82      initialization code if your program depends on a 
 83      service already running. 
 84      @param service: name of service 
 85      @type  service: str 
 86      @param timeout: timeout time in seconds, None for no 
 87      timeout. NOTE: timeout=0 is invalid as wait_for_service actually 
 88      contacts the service, so non-blocking behavior is not 
 89      possible. For timeout=0 uses cases, just call the service without 
 90      waiting. 
 91      @type  timeout: double 
 92      @raise ROSException: if specified timeout is exceeded 
 93      @raise ROSInterruptException: if shutdown interrupts wait 
 94      """ 
 95      master = rosgraph.Master(rospy.names.get_caller_id()) 
 96      def contact_service(resolved_name, timeout=10.0): 
 97          try: 
 98              uri = master.lookupService(resolved_name) 
 99          except rosgraph.MasterException: 
100              return False 
101   
102          addr = rospy.core.parse_rosrpc_uri(uri) 
103          if rosgraph.network.use_ipv6(): 
104              s = socket.socket(socket.AF_INET6, socket.SOCK_STREAM) 
105          else: 
106              s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 
107          try: 
108               
109               
110              s.settimeout(timeout) 
111              logdebug('connecting to ' + str(addr)) 
112              s.connect(addr) 
113              h = { 'probe' : '1', 'md5sum' : '*', 
114                    'callerid' : rospy.core.get_caller_id(), 
115                    'service': resolved_name } 
116              rosgraph.network.write_ros_handshake_header(s, h) 
117              return True 
118          finally: 
119              if s is not None: 
120                  s.close() 
 121      if timeout == 0.: 
122          raise ValueError("timeout must be non-zero") 
123      resolved_name = rospy.names.resolve_name(service) 
124      first = False 
125      if timeout: 
126          timeout_t = time.time() + timeout 
127          while not rospy.core.is_shutdown() and time.time() < timeout_t: 
128              try: 
129                  if contact_service(resolved_name, timeout_t-time.time()): 
130                      return 
131              except KeyboardInterrupt: 
132                   
133                  rospy.core.logdebug("wait_for_service: received keyboard interrupt, assuming signals disabled and re-raising") 
134                  raise  
135              except:  
136                  if first: 
137                      first = False 
138                      rospy.core.logerr("wait_for_service(%s): failed to contact, will keep trying"%(resolved_name)) 
139              time.sleep(0.3) 
140          if rospy.core.is_shutdown(): 
141              raise ROSInterruptException("rospy shutdown") 
142          else: 
143              raise ROSException("timeout exceeded while waiting for service %s"%resolved_name) 
144      else: 
145          while not rospy.core.is_shutdown(): 
146              try: 
147                  if contact_service(resolved_name): 
148                      return 
149              except KeyboardInterrupt: 
150                   
151                  rospy.core.logdebug("wait_for_service: received keyboard interrupt, assuming signals disabled and re-raising") 
152                  raise  
153              except:  
154                  if first: 
155                      first = False 
156                      rospy.core.logerr("wait_for_service(%s): failed to contact, will keep trying"%(resolved_name)) 
157              time.sleep(0.3) 
158          if rospy.core.is_shutdown(): 
159              raise ROSInterruptException("rospy shutdown") 
160       
161   
163      """ 
164      Convert return value of function to response instance. The 
165      rules/precedence for this are: 
166   
167      1. If the return type is the same as the response type, no conversion 
168      is done. 
169   
170      2. If the return type is a dictionary, it is used as a keyword-style 
171      initialization for a new response instance. 
172   
173      3. If the return type is *not* a list type, it is passed in as a single arg 
174      to a new response instance. 
175   
176      4. If the return type is a list/tuple type, it is used as a args-style 
177      initialization for a new response instance. 
178      """ 
179   
180       
181       
182       
183      if isinstance(response, genpy.Message) and response._type == response_class._type: 
184       
185          return response 
186      elif type(response) == dict: 
187           
188          try: 
189              return response_class(**response) 
190          except AttributeError as e: 
191              raise ServiceException("handler returned invalid value: %s"%str(e)) 
192      elif response == None: 
193          raise ServiceException("service handler returned None") 
194      elif type(response) not in [list, tuple]: 
195           
196          try: 
197              return response_class(response) 
198          except TypeError as e: 
199              raise ServiceException("handler returned invalid value: %s"%str(e)) 
200      else: 
201           
202           
203          try: 
204              return response_class(*response) 
205          except TypeError as e: 
206              raise ServiceException("handler returned wrong number of values: %s"%str(e)) 
 207   
209      """ 
210      Process incoming service connection. For use with 
211      TCPROSServer. Reads in service name from handshake and creates the 
212      appropriate service handler for the connection. 
213      @param sock: socket connection 
214      @type  sock: socket 
215      @param client_addr: client address 
216      @type  client_addr: (str, int) 
217      @param header: key/value pairs from handshake header 
218      @type  header: dict 
219      @return: error string or None  
220      @rtype: str 
221      """ 
222      for required in ['service', 'md5sum', 'callerid']: 
223          if not required in header: 
224              return "Missing required '%s' field"%required 
225      else: 
226          logger.debug("connection from %s:%s", client_addr[0], client_addr[1]) 
227          service_name = header['service'] 
228           
229           
230           
231           
232           
233           
234          sm = get_service_manager() 
235          md5sum = header['md5sum'] 
236          service = sm.get_service(service_name) 
237          if not service: 
238              return "[%s] is not a provider of  [%s]"%(rospy.names.get_caller_id(), service_name) 
239          elif md5sum != rospy.names.SERVICE_ANYTYPE and md5sum != service.service_class._md5sum: 
240              return "request from [%s]: md5sums do not match: [%s] vs. [%s]"%(header['callerid'], md5sum, service.service_class._md5sum) 
241          else: 
242              transport = TCPROSTransport(service.protocol, service_name, header=header) 
243              transport.set_socket(sock, header['callerid']) 
244              transport.write_header() 
245               
246               
247              t = threading.Thread(target=service.handle, args=(transport, header)) 
248              t.setDaemon(True) 
249              t.start() 
 250                   
251           
253      """ 
254      Protocol implementation for Services over TCPROS 
255      """ 
256   
258          """ 
259          ctor. 
260          @param resolved_name: name of service 
261          @type  resolved_name: str 
262          @param service_class: Service data type class 
263          @type  service_class: Service 
264          @param buff_size int: size of buffer (bytes) to use for reading incoming requests. 
265          @type  buff_size: int 
266          """ 
267          super(TCPService, self).__init__(resolved_name, service_class._request_class, buff_size=buff_size) 
268          self.service_class = service_class 
 269   
271          """ 
272          Protocol API 
273          @return: header fields 
274          @rtype: dict 
275          """ 
276          return {'service': self.resolved_name, 'type': self.service_class._type, 
277                  'md5sum': self.service_class._md5sum, 'callerid': rospy.names.get_caller_id() } 
  278   
280      """Protocol Implementation for Service clients over TCPROS""" 
281       
283          """ 
284          ctor. 
285          @param resolved_name: resolved service name  
286          @type  resolved_name: str 
287          @param service_class: Service data type class 
288          @type  service_class: Service 
289          @param headers: identifier for Service session 
290          @type  headers: dict 
291          @param buff_size: size of buffer (bytes) for reading responses from Service.  
292          @type  buff_size: int 
293          """ 
294          super(TCPROSServiceClient, self).__init__(resolved_name, service_class._response_class) 
295          self.service_class = service_class 
296          self.headers = headers or {} 
297          self.buff_size = buff_size 
 298           
300          """ 
301          TCPROSTransportProtocol API         
302          """ 
303          headers = {'service': self.resolved_name, 'md5sum': self.service_class._md5sum, 
304                     'callerid': rospy.names.get_caller_id()} 
305           
306           
307           
308          for k, v in self.headers.items(): 
309              headers[k] = v 
310          return headers 
 311       
313          """ 
314          Utility for reading the OK-byte/error-message header preceding each message. 
315          @param sock: socket connection. Will be read from if OK byte is 
316          false and error message needs to be read 
317          @type  sock: socket.socket 
318          @param b: buffer to read from 
319          @type  b: StringIO 
320          """ 
321          if b.tell() == 0: 
322              return 
323          pos = b.tell() 
324          b.seek(0) 
325          ok = struct.unpack('<B', b.read(1))[0]  
326          b.seek(pos) 
327          if not ok: 
328              str = self._read_service_error(sock, b) 
329               
330               
331               
332               
333               
334              b.seek(0) 
335              b.truncate(0) 
336              raise ServiceException("service [%s] responded with an error: %s"%(self.resolved_name, str)) 
337          else: 
338               
339              b.seek(pos) 
 340           
342          """ 
343          In service implementation, reads in OK byte that preceeds each 
344          response. The OK byte allows for the passing of error messages 
345          instead of a response message 
346          @param b: buffer 
347          @type  b: StringIO 
348          @param msg_queue: Message queue to append to 
349          @type  msg_queue: [Message] 
350          @param sock: socket to read from 
351          @type  sock: socket.socket 
352          """ 
353          self._read_ok_byte(b, sock) 
354          rospy.msg.deserialize_messages(b, msg_queue, self.recv_data_class, queue_size=self.queue_size, max_msgs=1, start=1)  
355           
356           
357           
358          if b.tell() == 1: 
359              b.seek(0) 
 360           
362          """ 
363          Read service error from sock  
364          @param sock: socket to read from 
365          @type  sock: socket 
366          @param b: currently read data from sock 
367          @type  b: StringIO 
368          """ 
369          buff_size = 256  
370          while b.tell() < 5: 
371              recv_buff(sock, b, buff_size) 
372          bval = b.getvalue() 
373          (length,) = struct.unpack('<I', bval[1:5])  
374          while b.tell() < (5 + length): 
375              recv_buff(sock, b, buff_size) 
376          bval = b.getvalue() 
377          return struct.unpack('<%ss'%length, bval[5:5+length])[0]  
  378   
379       
381      """ 
382      Create a handle to a ROS service for invoking calls. 
383   
384      Usage:: 
385        add_two_ints = ServiceProxy('add_two_ints', AddTwoInts) 
386        resp = add_two_ints(1, 2) 
387      """ 
388       
389 -    def __init__(self, name, service_class, persistent=False, headers=None): 
 390          """ 
391          ctor. 
392          @param name: name of service to call 
393          @type  name: str 
394          @param service_class: auto-generated service class 
395          @type  service_class: Service class 
396          @param persistent: (optional) if True, proxy maintains a persistent 
397          connection to service. While this results in better call 
398          performance, persistent connections are discouraged as they are 
399          less resistent to network issues and service restarts. 
400          @type  persistent: bool 
401          @param headers: (optional) arbitrary headers  
402          @type  headers: dict 
403          """ 
404          super(ServiceProxy, self).__init__(name, service_class) 
405          self.uri = None 
406          self.seq = 0 
407          self.buff_size = DEFAULT_BUFF_SIZE 
408          self.persistent = persistent 
409          if persistent: 
410              if not headers: 
411                  headers = {} 
412              headers['persistent'] = '1' 
413          self.protocol = TCPROSServiceClient(self.resolved_name, 
414                                              self.service_class, headers=headers) 
415          self.transport = None  
 416   
419   
420       
422          """ 
423          Callable-style version of the service API. This accepts either a request message instance, 
424          or you can call directly with arguments to create a new request instance. e.g.:: 
425           
426            add_two_ints(AddTwoIntsRequest(1, 2)) 
427            add_two_ints(1, 2) 
428            add_two_ints(a=1, b=2)           
429           
430          @param args: arguments to remote service 
431          @param kwds: message keyword arguments 
432          @raise ROSSerializationException: If unable to serialize 
433          message. This is usually a type error with one of the fields. 
434          """ 
435          return self.call(*args, **kwds) 
 436       
438          """ 
439          private routine for getting URI of service to call 
440          @param request: request message 
441          @type  request: L{rospy.Message} 
442          """ 
443          if not isinstance(request, genpy.Message): 
444              raise TypeError("request object is not a valid request message instance") 
445           
446           
447           
448           
449          if not self.request_class._type == request._type: 
450              raise TypeError("request object type [%s] does not match service type [%s]"%(request.__class__, self.request_class)) 
451   
452           
453           
454          if 1:  
455              try: 
456                  try: 
457                      master = rosgraph.Master(rospy.names.get_caller_id()) 
458                      self.uri = master.lookupService(self.resolved_name) 
459                  except socket.error: 
460                      raise ServiceException("unable to contact master") 
461                  except rosgraph.MasterError as e: 
462                      logger.error("[%s]: lookup service failed with message [%s]", self.resolved_name, str(e)) 
463                      raise ServiceException("service [%s] unavailable"%self.resolved_name) 
464                   
465                   
466                  try: 
467                      rospy.core.parse_rosrpc_uri(self.uri) 
468                  except rospy.impl.validators.ParameterInvalid: 
469                      raise ServiceException("master returned invalid ROSRPC URI: %s"%self.uri) 
470              except socket.error as e: 
471                  logger.error("[%s]: socket error contacting service, master is probably unavailable",self.resolved_name) 
472          return self.uri 
 473   
474 -    def call(self, *args, **kwds): 
 475          """ 
476          Call the service. This accepts either a request message instance, 
477          or you can call directly with arguments to create a new request instance. e.g.:: 
478           
479            add_two_ints(AddTwoIntsRequest(1, 2)) 
480            add_two_ints(1, 2) 
481            add_two_ints(a=1, b=2)           
482           
483          @raise TypeError: if request is not of the valid type (Message) 
484          @raise ServiceException: if communication with remote service fails 
485          @raise ROSInterruptException: if node shutdown (e.g. ctrl-C) interrupts service call 
486          @raise ROSSerializationException: If unable to serialize 
487          message. This is usually a type error with one of the fields. 
488          """ 
489   
490           
491          request = rospy.msg.args_kwds_to_message(self.request_class, args, kwds)  
492               
493           
494          if self.transport is None: 
495              service_uri = self._get_service_uri(request) 
496              dest_addr, dest_port = rospy.core.parse_rosrpc_uri(service_uri) 
497   
498               
499              transport = TCPROSTransport(self.protocol, self.resolved_name) 
500              transport.buff_size = self.buff_size 
501              try: 
502                  transport.connect(dest_addr, dest_port, service_uri) 
503              except TransportInitError as e: 
504                   
505                  raise ServiceException("unable to connect to service: %s"%e) 
506              self.transport = transport 
507          else: 
508              transport = self.transport 
509   
510           
511          self.seq += 1 
512          transport.send_message(request, self.seq) 
513   
514          try: 
515              responses = transport.receive_once() 
516              if len(responses) == 0: 
517                  raise ServiceException("service [%s] returned no response"%self.resolved_name) 
518              elif len(responses) > 1: 
519                  raise ServiceException("service [%s] returned multiple responses: %s"%(self.resolved_name, len(responses))) 
520          except rospy.exceptions.TransportException as e: 
521               
522              if rospy.core.is_shutdown(): 
523                  raise rospy.exceptions.ROSInterruptException("node shutdown interrupted service call") 
524              else: 
525                  raise ServiceException("transport error completing service call: %s"%(str(e))) 
526          finally: 
527              if not self.persistent: 
528                  transport.close() 
529                  self.transport = None 
530          return responses[0] 
 531   
532       
534          """Close this ServiceProxy. This only has an effect on persistent ServiceProxy instances.""" 
535          if self.transport is not None: 
536              self.transport.close() 
  537   
539      """ 
540      Implementation of ROS Service. This intermediary class allows for more configuration of behavior than the Service class. 
541      """ 
542       
544          super(ServiceImpl, self).__init__(name, service_class) 
545   
546          if not name or not isstring(name): 
547              raise ValueError("service name is not a non-empty string") 
548           
549          if not rosgraph.names.is_legal_name(name): 
550              import warnings 
551              warnings.warn("'%s' is not a legal ROS graph resource name. This may cause problems with other ROS tools"%name, stacklevel=2) 
552   
553           
554          self.handler = handler 
555          if error_handler is not None: 
556              self.error_handler = error_handler 
557          self.registered = False 
558          self.seq = 0 
559          self.done = False 
560          self.buff_size=buff_size 
561   
562          start_tcpros_server()  
563          host, port = get_tcpros_server_address() 
564          self.uri = '%s%s:%s'%(rospy.core.ROSRPC, host, port) 
565          logdebug("... service URL is %s"%self.uri) 
566   
567          self.protocol = TCPService(self.resolved_name, service_class, self.buff_size) 
568   
569          logdebug("[%s]: new Service instance"%self.resolved_name) 
 570   
571       
572   
574          logerr("Error processing request: %s\n%s" % (e, traceback.format_exception(exc_type, exc_value, tb))) 
 575   
577          """ 
578          Stop this service 
579          @param reason: human-readable shutdown reason 
580          @type  reason: str 
581          """ 
582          self.done = True 
583          logdebug('[%s].shutdown: reason [%s]'%(self.resolved_name, reason)) 
584          try: 
585               
586              get_service_manager().unregister(self.resolved_name, self) 
587          except Exception as e: 
588              logerr("Unable to unregister with master: "+traceback.format_exc()) 
589              raise ServiceException("Unable to connect to master: %s"%e) 
 590   
592          """ 
593          Let service run and take over thread until service or node 
594          shutdown. Use this method to keep your scripts from exiting 
595          execution. 
596          """ 
597          try: 
598              while not rospy.core.is_shutdown() and not self.done: 
599                  time.sleep(0.5) 
600          except KeyboardInterrupt: 
601              logdebug("keyboard interrupt, shutting down") 
 602   
604          """ 
605          Send error message to client 
606          @param transport: transport connection to client  
607          @type  transport: Transport 
608          @param err_msg: error message to send to client 
609          @type  err_msg: str 
610          """ 
611          if sys.hexversion > 0x03000000:  
612              err_msg = bytes(err_msg, 'utf-8') 
613          transport.write_data(struct.pack('<BI%ss'%len(err_msg), 0, len(err_msg), err_msg)) 
 614   
616          """ 
617          Process a single incoming request. 
618          @param transport: transport instance 
619          @type  transport: L{TCPROSTransport} 
620          @param request: Message 
621          @type  request: genpy.Message 
622          """ 
623          try: 
624               
625              response = convert_return_to_response(self.handler(request), self.response_class) 
626              self.seq += 1 
627               
628              transport.write_buff.write(struct.pack('<B', 1)) 
629              transport.send_message(response, self.seq) 
630          except ServiceException as e: 
631              rospy.core.rospydebug("handler raised ServiceException: %s"%(e)) 
632              self._write_service_error(transport, "service cannot process request: %s"%e) 
633          except Exception as e: 
634              (exc_type, exc_value, tb) = sys.exc_info() 
635              self.error_handler(e, exc_type, exc_value, tb) 
636              self._write_service_error(transport, "error processing request: %s"%e) 
 637   
638 -    def handle(self, transport, header): 
 639          """ 
640          Process incoming request. This method should be run in its 
641          own thread. If header['persistent'] is set to 1, method will 
642          block until connection is broken. 
643          @param transport: transport instance 
644          @type  transport: L{TCPROSTransport} 
645          @param header: headers from client 
646          @type  header: dict 
647          """ 
648          if 'persistent' in header and \ 
649                 header['persistent'].lower() in ['1', 'true']: 
650              persistent = True 
651          else: 
652              persistent = False 
653          if header.get('probe', None) == '1': 
654               
655              transport.close() 
656              return 
657          handle_done = False 
658          while not handle_done: 
659              try: 
660                  requests = transport.receive_once() 
661                  for request in requests: 
662                      self._handle_request(transport, request) 
663                  if not persistent: 
664                      handle_done = True 
665              except rospy.exceptions.TransportTerminated as e: 
666                  if not persistent: 
667                      logerr("incoming connection failed: %s"%e) 
668                  logdebug("service[%s]: transport terminated"%self.resolved_name) 
669                  handle_done = True 
670          transport.close() 
  671   
672   
674      """ 
675      Declare a ROS service. Service requests are passed to the 
676      specified handler.  
677   
678      Service Usage:: 
679        s = Service('getmapservice', GetMap, get_map_handler) 
680      """ 
681       
684          """ 
685          ctor. 
686   
687          @param name: service name, ``str`` 
688          @param service_class: Service definition class 
689           
690          @param handler: callback function for processing service 
691          request. Function takes in a ServiceRequest and returns a 
692          ServiceResponse of the appropriate type. Function may also 
693          return a list, tuple, or dictionary with arguments to initialize 
694          a ServiceResponse instance of the correct type. 
695   
696          If handler cannot process request, it may either return None, 
697          to indicate failure, or it may raise a rospy.ServiceException 
698          to send a specific error message to the client. Returning None 
699          is always considered a failure. 
700           
701          @type  handler: fn(req)->resp 
702   
703          @param buff_size: size of buffer for reading incoming requests. Should be at least size of request message 
704          @type  buff_size: int 
705   
706          @param error_handler: callback function for handling errors 
707          raised in the service code. 
708          @type  error_handler: fn(exception, exception_type, exception_value, traceback)->None 
709          """ 
710          super(Service, self).__init__(name, service_class, handler, buff_size, 
711                                        error_handler) 
712   
713           
714          get_service_manager().register(self.resolved_name, self) 
  715