$search
00001 """This module provides a class that allows for (slightly) easier dynamic access to ROS""" 00002 00003 import roslib; roslib.load_manifest('rospy') 00004 import rospy 00005 roslib.load_manifest('rosservice') 00006 import rosservice 00007 import re 00008 00009 from base64 import standard_b64encode 00010 from types import TypeType 00011 00012 #fuerte/electric decisions 00013 importedTime = False 00014 00015 try: 00016 import roslib.rostime 00017 from roslib.rostime import Time 00018 from roslib.rostime import Duration 00019 importedTime = True 00020 except ImportError, ign: 00021 pass 00022 00023 try: 00024 if not importedTime: 00025 import rospy.rostime 00026 from rospy.rostime import Time 00027 from rospy.rostime import Duration 00028 importedTime = True 00029 except ImportError, ign: 00030 pass 00031 00032 if not importedTime: 00033 raise ImportError 00034 00035 class ROSProxy(object): 00036 def __init__(self): 00037 self.mans = {} 00038 self.mods = {} 00039 00040 def __GetTopics(self): 00041 return [x[0] for x in rospy.get_published_topics()] 00042 topics = property(fget=__GetTopics) 00043 00044 def __GetServices(self): 00045 return rosservice.get_service_list() 00046 services = property(fget=__GetServices) 00047 00048 def typeStringFromTopic(self, topic): 00049 try: 00050 return [x[1] for x in rospy.get_published_topics() if x[0] == topic][0] 00051 except: 00052 print "Can't find topic %s" % (topic,) 00053 return None 00054 00055 def typeStringFromService(self, service): 00056 try: 00057 return rosservice.get_service_type(service) 00058 except: 00059 print "Can't find service %s" % (service,) 00060 return None 00061 00062 def __classFromTypeString(self, typeString, subname): 00063 basemodule, itype = typeString.split('/') 00064 00065 if not (basemodule in self.mans): 00066 try: 00067 roslib.load_manifest(basemodule) 00068 self.mans[basemodule] = True; 00069 except: 00070 print "Can't find class for %s" % (typeString,) 00071 return None 00072 00073 modname = basemodule + '.'+ subname + '._' + itype 00074 if not (modname in self.mods): 00075 try: 00076 mod = __import__(modname) 00077 self.mods['modname'] = mod 00078 except: 00079 return None 00080 00081 mod = self.mods['modname'] 00082 00083 return getattr(getattr(getattr(mod,subname),'_' + itype), itype) 00084 00085 def msgClassFromTypeString(self, typeString): 00086 return self.__classFromTypeString(typeString, 'msg') 00087 00088 def srvClassFromTypeString(self, typeString): 00089 return self.__classFromTypeString(typeString, 'srv') 00090 00091 def classFromTopic(self, topic): 00092 return self.msgClassFromTypeString(self.typeStringFromTopic(topic)) 00093 00094 def classFromService(self, service): 00095 return self.srvClassFromTypeString(self.typeStringFromService(service)) 00096 00097 def callService(self, service, arguments, callback=False, wait=True): 00098 def defCallback(x): 00099 pass 00100 00101 if callback == False: 00102 callback = defCallback 00103 00104 if wait: 00105 try: 00106 rospy.wait_for_service(service) 00107 except: 00108 callback(None) 00109 raise 00110 try: 00111 function = rospy.ServiceProxy(service, self.classFromService(service)) 00112 if isinstance(arguments, list): 00113 response = function(*arguments) 00114 else: 00115 response = function(arguments) 00116 callback(response) 00117 except: 00118 callback(None) 00119 raise 00120 00121 def generalize(self, inst): 00122 if hasattr(inst,'__slots__'): 00123 obj = {} 00124 for i in xrange(len(inst.__slots__)): 00125 field = inst.__slots__[i] 00126 if (hasattr(inst,'_slot_types') and inst._slot_types[i] == 'uint8[]'): 00127 obj[field] = self.generalize(standard_b64encode(getattr(inst,field))) 00128 else: 00129 obj[field] = self.generalize(getattr(inst,field)) 00130 return obj 00131 elif isinstance(inst,tuple) or isinstance(inst,list): 00132 return [self.generalize(x) for x in inst] 00133 else: 00134 return inst 00135 00136 braces = re.compile(r'\[[^\]]*\]') 00137 atomics = ['bool', 'byte','int8', 'uint8', 'int16', 'uint16', 'int32', 'uint32', 'int64', 'uint64', 'float32', 'float64', 'string'] 00138 00139 def specify(self, typeStr, obj): 00140 if isinstance(typeStr,TypeType): 00141 cls = typeStr 00142 elif isinstance(typeStr,list): 00143 lst = [] 00144 for i in xrange(len(typeStr)): 00145 lst.append(self.specify(typeStr[i],obj[i])) 00146 return lst 00147 elif typeStr != self.braces.sub('',typeStr): 00148 return [self.specify(self.braces.sub('',typeStr), x) for x in obj] 00149 elif typeStr in self.atomics: 00150 if typeStr == 'string': 00151 return obj.encode('ascii','ignore') 00152 return obj 00153 elif typeStr == 'time' or typeStr == 'duration': 00154 inst = None 00155 if typeStr == 'time': 00156 inst = Time() 00157 else: 00158 inst = Duration() 00159 if 'nsecs' in obj and 'secs' in obj: 00160 inst.nsecs = obj['nsecs'] 00161 inst.secs = obj['secs'] 00162 else: 00163 inst = rospy.get_rostime() 00164 return inst 00165 else: 00166 if typeStr == 'Header': 00167 typeStr = 'std_msgs/Header' 00168 cls = self.msgClassFromTypeString(typeStr) 00169 00170 if not hasattr(cls,'__call__'): 00171 return None 00172 inst = cls() 00173 for i in xrange(len(cls._slot_types)): 00174 field = cls.__slots__[i] 00175 typ = cls._slot_types[i] 00176 if field in obj: 00177 value = self.specify(typ,obj[field]) 00178 if value != None: 00179 setattr(inst,field,value) 00180 else: 00181 print "Warning: passed object was only partially specified." 00182 elif typ=='time' or typ=='duration': 00183 setattr(inst, field, rospy.get_rostime()) 00184 elif typ=='Header' or typ=='std_msgs/Header': 00185 inst.header.stamp = rospy.get_rostime() 00186 return inst