function.py
Go to the documentation of this file.
1 import dill
2 import os
3 import errno
4 import rospkg
5 import copy
6 
7 import function
8 
9 
10 
11 rospack = rospkg.RosPack()
12 
13 class Resolver(object):
14 
15  def validate_rule(self, arg_types, arg_rule):
16 
17  def is_valid(type_src, type_dst):
18  return ( type_src == type_dst or type_dst == "*" or
19  ( type(type_src) == str and type(type_dst) == str and type_src.lower() == type_dst.lower() ) or
20  ( hasattr(type_dst,'__iter__') and
21  (type_src in type_dst or
22  any( is_valid(type_src, atype) for atype in type_dst ))) )
23 
24  index_1 = 0
25  index_2 = 0
26 
27  while index_1 < len(arg_types):
28 
29  if index_2 >= len(arg_rule):
30  return False
31 
32  type_src = arg_types[index_1]
33  type_dst = arg_rule[index_2]
34 
35  if not type_dst:
36  if index_2>0 and is_valid(type_src, arg_rule[index_2-1]):
37  index_1 += 1
38  continue
39  else:
40  index_2 += 1
41  continue
42 
43  elif is_valid(type_src, type_dst):
44  index_1 += 1
45  index_2 += 1
46  else:
47  return False
48 
49  if index_2 < len(arg_rule) and arg_rule[index_2]:
50  return False
51 
52  return True
53 
54 
55  def validate(self, arg_types):
56 
57  for rule in self.types:
58  if self.validate_rule(arg_types, rule):
59  return True
60 
61  return False
62 
63 
64 
65  def __init__(self, func):
66  self.func = func
67  self.__name__ = func.__name__
68  self.types = []
69 
70  def get_name(self):
71  return self.func.__name__
72 
73  def __call__(self, *args, **kwargs):
74  # print('Called {func} with args: {args} and keyword args: {kwargs}'.format(func="func",
75  # kwargs = kwargs,
76  # args=args))
77 
78  rargs = []
79  for arg in args:
80  if arg not in kwargs:
81  raise KeyError('Cannot resolve the argument \'{arg}\''.format(arg = arg))
82  rargs.append(copy.copy(kwargs.get(arg)))
83 
84  return self.func(*rargs)
85 
86  def serialize(self):
87  return dill.dumps(self)
88 
89  def save(self, name, path = rospack.get_path('dyn_tune')+"/functions/default"):
90 
91  filename = '/'.join([path, name])
92 
93  if not os.path.exists(os.path.dirname(filename)):
94  try:
95  os.makedirs(os.path.dirname(filename))
96  except OSError as exc: # Guard against race condition
97  if exc.errno != errno.EEXIST:
98  raise
99 
100  file = open(filename, 'w')
101 
102  code = self.serialize()
103  file.write(code)
104 
105 
106 
107 def accept(*args):
108  def decorator(func):
109  func.types.append(args)
110  return func
111  return decorator
112 
113 import random
114 import string
115 
116 def id_generator(size=6, chars=string.ascii_uppercase + string.digits):
117  return ''.join(random.choice(chars) for _ in range(size))
118 
119 class Function():
120  def __init__(self, tasks = [], retvar = '_', types = [], name = None):
121  self.tasks = tasks
122  self.retvar = retvar
123  self.types = types
124 
125  if name == None:
126  self.__name__ = "USER_FUNC_" + id_generator()
127 
128 
129  def __call__(self, *args, **values):
130  current = {}
131  current.update(values)
132 
133  for i in range(len(args)):
134  current.update({'args[{0}]'.format(i):current[args[i]]})
135 
136  for func, args, retvar, has_script, script in self.tasks:
137  if func != None:
138  current.update({retvar: func(*args, **current)})
139  if has_script:
140  exec(script, current, current)
141 
142  return current.get(self.retvar)
143 
144  def add_task(self, func, args, retvar, index = None):
145 
146  if index is None:
147  index = len(self.tasks)
148 
149  self.tasks.insert(index, (func, args, retvar))
150 
151  def delete_task(index):
152  del self.tasks[index]
153 
154  def serialize(self):
155  return dill.dumps(self)
156 
157  def save(self, name, path = rospack.get_path('dyn_tune')+"/functions/user"):
158 
159  filename = '/'.join([path, name])
160 
161  if not os.path.exists(os.path.dirname(filename)):
162  try:
163  os.makedirs(os.path.dirname(filename))
164  except OSError as exc: # Guard against race condition
165  if exc.errno != errno.EEXIST:
166  raise
167 
168  file = open(filename, 'w')
169 
170  code = self.serialize()
171  file.write(code)
172 
173 
174  def validate(self, arg_types):
175  return arg_types in self.types
176 
177 from os import listdir
178 from os.path import isfile, join
179 
180 def load_functions(path = rospack.get_path('dyn_tune')+'/functions/default'):
181  func_dict = {}
182 
183  for name in listdir(path):
184  filename = join(path, name)
185 
186  if isfile(filename):
187  file = open(filename, 'r')
188  code = file.read()
189  func = dill.loads(code)
190  func_dict.update({func.__name__: func})
191 
192  return func_dict
193 
194 
195 
196 
197 
def __init__(self, func)
Definition: function.py:65
def id_generator(size=6, chars=string.ascii_uppercase+string.digits)
Definition: function.py:116
def validate(self, arg_types)
Definition: function.py:174
def __call__(self, args, kwargs)
Definition: function.py:73
def accept(args)
Definition: function.py:107
def load_functions(path=rospack.get_path('dyn_tune')+'/functions/default')
Definition: function.py:180
def validate_rule(self, arg_types, arg_rule)
Definition: function.py:15
def __call__(self, args, values)
Definition: function.py:129
def save(self, name, path=rospack.get_path('dyn_tune')+"/functions/default")
Definition: function.py:89
def save(self, name, path=rospack.get_path('dyn_tune')+"/functions/user")
Definition: function.py:157
def add_task(self, func, args, retvar, index=None)
Definition: function.py:144
def __init__(self, tasks=[], retvar='_', types=[], name=None)
Definition: function.py:120
def validate(self, arg_types)
Definition: function.py:55


dyn_tune
Author(s):
autogenerated on Mon Jun 10 2019 13:03:17