11 rospack = rospkg.RosPack()
17 def is_valid(type_src, type_dst):
18 return ( type_src == type_dst
or type_dst ==
"*" or 19 ( type(type_src) == str
and type(type_dst) == str
and type_src.lower() == type_dst.lower() )
or 20 ( hasattr(type_dst,
'__iter__')
and 21 (type_src
in type_dst
or 22 any( is_valid(type_src, atype)
for atype
in type_dst ))) )
27 while index_1 < len(arg_types):
29 if index_2 >= len(arg_rule):
32 type_src = arg_types[index_1]
33 type_dst = arg_rule[index_2]
36 if index_2>0
and is_valid(type_src, arg_rule[index_2-1]):
43 elif is_valid(type_src, type_dst):
49 if index_2 < len(arg_rule)
and arg_rule[index_2]:
57 for rule
in self.
types:
71 return self.func.__name__
81 raise KeyError(
'Cannot resolve the argument \'{arg}\''.format(arg = arg))
82 rargs.append(copy.copy(kwargs.get(arg)))
84 return self.
func(*rargs)
87 return dill.dumps(self)
89 def save(self, name, path = rospack.get_path(
'dyn_tune')+
"/functions/default"):
91 filename =
'/'.join([path, name])
93 if not os.path.exists(os.path.dirname(filename)):
95 os.makedirs(os.path.dirname(filename))
96 except OSError
as exc:
97 if exc.errno != errno.EEXIST:
100 file = open(filename,
'w')
109 func.types.append(args)
116 def id_generator(size=6, chars=string.ascii_uppercase + string.digits):
117 return ''.join(random.choice(chars)
for _
in range(size))
120 def __init__(self, tasks = [], retvar = '_', types = [], name = None):
131 current.update(values)
133 for i
in range(len(args)):
134 current.update({
'args[{0}]'.format(i):current[args[i]]})
136 for func, args, retvar, has_script, script
in self.
tasks:
138 current.update({retvar: func(*args, **current)})
140 exec(script, current, current)
142 return current.get(self.
retvar)
144 def add_task(self, func, args, retvar, index = None):
147 index = len(self.
tasks)
149 self.tasks.insert(index, (func, args, retvar))
152 del self.
tasks[index]
155 return dill.dumps(self)
157 def save(self, name, path = rospack.get_path(
'dyn_tune')+
"/functions/user"):
159 filename =
'/'.join([path, name])
161 if not os.path.exists(os.path.dirname(filename)):
163 os.makedirs(os.path.dirname(filename))
164 except OSError
as exc:
165 if exc.errno != errno.EEXIST:
168 file = open(filename,
'w')
175 return arg_types
in self.
types 177 from os
import listdir
178 from os.path
import isfile, join
183 for name
in listdir(path):
184 filename = join(path, name)
187 file = open(filename,
'r') 189 func = dill.loads(code) 190 func_dict.update({func.__name__: func})
def id_generator(size=6, chars=string.ascii_uppercase+string.digits)
def validate(self, arg_types)
def __call__(self, args, kwargs)
def load_functions(path=rospack.get_path('dyn_tune')+'/functions/default')
def validate_rule(self, arg_types, arg_rule)
def __call__(self, args, values)
def save(self, name, path=rospack.get_path('dyn_tune')+"/functions/default")
def save(self, name, path=rospack.get_path('dyn_tune')+"/functions/user")
def add_task(self, func, args, retvar, index=None)
def __init__(self, tasks=[], retvar='_', types=[], name=None)
def validate(self, arg_types)