$search
00001 #! /usr/bin/env python 00002 00003 import subprocess 00004 import threading 00005 import time 00006 import sys, os, getopt 00007 00008 import roslib.scriptutil 00009 from roslib.names import ns_join, get_ros_namespace, make_caller_id, make_global_ns, GLOBALNS 00010 00011 PKG_NAME = 'rosweb' 00012 import roslib; roslib.load_manifest(PKG_NAME) 00013 00014 def _get_caller_id(): 00015 return make_caller_id('rosparam-%s'%os.getpid()) 00016 00017 class Params(roslib.message.Message): 00018 __slots__ = ('parameters', ) 00019 def __init__(self, params): 00020 self.parameters = params 00021 00022 def succeed(args): 00023 code, msg, val = args 00024 if code != 1: 00025 raise RosTopicException("remote call failed: %s"%msg) 00026 return val 00027 00028 class ParamThread(threading.Thread): 00029 def __init__(self): 00030 threading.Thread.__init__(self) 00031 00032 self.callback = None 00033 00034 def setCallback(self, callback): 00035 self.callback = callback 00036 00037 def flatten(self, params, path="/"): 00038 l = [] 00039 for k,v in params.items(): 00040 _path=path + k 00041 if type(v) == type({}): 00042 l = l + self.flatten(v, _path + "/") 00043 else: 00044 l.append((_path, str(v))) 00045 return l 00046 00047 def getParams(self): 00048 master = roslib.scriptutil.get_master() 00049 00050 _c, _t, params = master.getParam(_get_caller_id(), "/") 00051 paramlist = self.flatten(params) 00052 paramlist.sort() 00053 00054 return paramlist 00055 00056 def run(self): 00057 lastval = () 00058 while 1: 00059 try: 00060 val = self.getParams() 00061 if val != lastval: 00062 self.callback(Params(val)) 00063 lastval = val 00064 except: 00065 pass 00066 time.sleep(3) 00067 00068 def main(argv, stdout, environ): 00069 progname = argv[0] 00070 optlist, args = getopt.getopt(argv[1:], "", ["help", "test", "debug"]) 00071 00072 p = ParamThread() 00073 p.callback = None 00074 p.run() 00075 00076 if __name__ == "__main__": 00077 main(sys.argv, sys.stdout, os.environ) 00078