00001 """This module provides a class that allows for (slightly) easier dynamic access to ROS""" 00002 00003 import roslib; roslib.load_manifest('rospy') 00004 import rospy 00005 roslib.load_manifest('rosservice') 00006 import roslib.rostime 00007 import rosservice 00008 import re 00009 00010 from base64 import standard_b64encode 00011 from types import TypeType 00012 00013 class ROSProxy(object): 00014 def __init__(self): 00015 self.mans = {} 00016 self.mods = {} 00017 00018 def __GetTopics(self): 00019 return [x[0] for x in rospy.get_published_topics()] 00020 topics = property(fget=__GetTopics) 00021 00022 def __GetServices(self): 00023 return rosservice.get_service_list() 00024 services = property(fget=__GetServices) 00025 00026 def typeStringFromTopic(self, topic): 00027 try: 00028 return [x[1] for x in rospy.get_published_topics() if x[0] == topic][0] 00029 except: 00030 print "Can't find topic %s" % (topic,) 00031 return None 00032 00033 def typeStringFromService(self, service): 00034 try: 00035 return rosservice.get_service_type(service) 00036 except: 00037 print "Can't find service %s" % (service,) 00038 return None 00039 00040 def __classFromTypeString(self, typeString, subname): 00041 basemodule, itype = typeString.split('/') 00042 00043 if not (basemodule in self.mans): 00044 try: 00045 roslib.load_manifest(basemodule) 00046 self.mans[basemodule] = True; 00047 except: 00048 print "Can't find class for %s" % (typeString,) 00049 return None 00050 00051 modname = basemodule + '.'+ subname + '._' + itype 00052 if not (modname in self.mods): 00053 try: 00054 mod = __import__(modname) 00055 self.mods['modname'] = mod 00056 except: 00057 return None 00058 00059 mod = self.mods['modname'] 00060 00061 return getattr(getattr(getattr(mod,subname),'_' + itype), itype) 00062 00063 def msgClassFromTypeString(self, typeString): 00064 return self.__classFromTypeString(typeString, 'msg') 00065 00066 def srvClassFromTypeString(self, typeString): 00067 return self.__classFromTypeString(typeString, 'srv') 00068 00069 def classFromTopic(self, topic): 00070 return self.msgClassFromTypeString(self.typeStringFromTopic(topic)) 00071 00072 def classFromService(self, service): 00073 return self.srvClassFromTypeString(self.typeStringFromService(service)) 00074 00075 def callService(self, service, arguments, callback=False, wait=True): 00076 def defCallback(x): 00077 pass 00078 00079 if callback == False: 00080 callback = defCallback 00081 00082 if wait: 00083 try: 00084 rospy.wait_for_service(service) 00085 except: 00086 callback(None) 00087 raise 00088 try: 00089 function = rospy.ServiceProxy(service, self.classFromService(service)) 00090 if isinstance(arguments, list): 00091 response = function(*arguments) 00092 else: 00093 response = function(arguments) 00094 callback(response) 00095 except: 00096 callback(None) 00097 raise 00098 00099 def generalize(self, inst): 00100 if hasattr(inst,'__slots__'): 00101 obj = {} 00102 for i in xrange(len(inst.__slots__)): 00103 field = inst.__slots__[i] 00104 if (hasattr(inst,'_slot_types') and inst._slot_types[i] == 'uint8[]'): 00105 obj[field] = self.generalize(standard_b64encode(getattr(inst,field))) 00106 else: 00107 obj[field] = self.generalize(getattr(inst,field)) 00108 return obj 00109 elif isinstance(inst,tuple) or isinstance(inst,list): 00110 return [self.generalize(x) for x in inst] 00111 else: 00112 return inst 00113 00114 braces = re.compile(r'\[[^\]]*\]') 00115 atomics = ['bool', 'byte','int8', 'uint8', 'int16', 'uint16', 'int32', 'uint32', 'int64', 'uint64', 'float32', 'float64', 'string'] 00116 00117 def specify(self, typeStr, obj): 00118 if isinstance(typeStr,TypeType): 00119 cls = typeStr 00120 elif isinstance(typeStr,list): 00121 lst = [] 00122 for i in xrange(len(typeStr)): 00123 lst.append(self.specify(typeStr[i],obj[i])) 00124 return lst 00125 elif typeStr != self.braces.sub('',typeStr): 00126 return [self.specify(self.braces.sub('',typeStr), x) for x in obj] 00127 elif typeStr in self.atomics: 00128 if typeStr == 'string': 00129 return obj.encode('ascii','ignore') 00130 return obj 00131 elif typeStr == 'time' or typeStr == 'duration': 00132 inst = None 00133 if typeStr == 'time': 00134 inst = roslib.rostime.Time() 00135 else: 00136 inst = roslib.rostime.Duration() 00137 inst.nsecs = obj['nsecs'] 00138 inst.secs = obj['secs'] 00139 return inst 00140 else: 00141 if typeStr == 'Header': 00142 typeStr = 'std_msgs/Header' 00143 cls = self.msgClassFromTypeString(typeStr) 00144 00145 if not hasattr(cls,'__call__'): 00146 return None 00147 inst = cls() 00148 for i in xrange(len(cls._slot_types)): 00149 field = cls.__slots__[i] 00150 typ = cls._slot_types[i] 00151 if field in obj: 00152 value = self.specify(typ,obj[field]) 00153 if value != None: 00154 setattr(inst,field,value) 00155 else: 00156 print "Warning: passed object was only partially specified." 00157 return inst