dyn_tune_main.py
Go to the documentation of this file.
1 #! /usr/bin/env python
2 
3 import rospy
4 import dynamic_tuner
5 import function
6 import dyn_tune.srv
7 import dyn_tune.msg
8 import json
9 
10 
11 
12 
13 def bypasser(func):
14  def wrapper(config):
15  return func(**config)
16  wrapper.tasks = func.tasks
17  return wrapper
18 
19 dyn_tuner = None
20 
21 
22 # TODO: need to double check things. just implemented
24 
25  # rospy.loginfo("I heard %s",req)
26  global dyn_tuner
27 
28  if dyn_tuner != None:
29  return False
30 
31  values = req.observation_values
32  start = json.loads(req.start_signals)
33  end = json.loads(req.end_signals)
34 
35  params = json.loads(req.experiments[0].parameters)
36  func = req.experiments[0].objective
37 
38  funcs = function.load_functions()
39 
40  tasks = [ (funcs[task.function], task.args, task.return_var, task.has_script, task.script) if not task.has_script
41  else (None, [], None, task.has_script, task.script)
42  for task in func.tasks ]
43 
44  obj_func = function.Function(tasks = tasks,
45  retvar = func.return_var,
46  types = func.arg_types )
47 
48  obj_func = bypasser(obj_func)
49 
50  # src_topic = req.src_topic
51  # dst_topic = req.dst_topic
52 
53  src_topic = "/haptix_deka_controller/command"
54  dst_topic = "/haptix_deka_controller/command"
55 
56  src_bag = req.src_bag
57  dst_bag = req.dst_bag
58 
59 
60 
61  dyn_tuner = dynamic_tuner.dynamic_tuner(params,
62  values,
63  src_bag,
64  dst_bag,
65  src_topic,
66  dst_topic,
67  objective = obj_func,
68  start_signal = start,
69  end_signal = end )
70 
71 
72  # # TODO: ROSLOG
73  # print "starting..."
74  # dyn.tune_params()
75 
76  return True
77 
79  # rospy.loginfo("I heard %s",req)
80 
81  funcs = { name: func for name, func in function.load_functions().items()
82  if not req.args or func.validate(tuple(req.args)) }
83 
84  return {'functions': map(str, funcs) }
85 
87  rospy.loginfo("I heard %s",req)
88  return True
89 
90 def main():
91  rospy.init_node('dyn_tune_backbone')
92 
93  # tuner = dynamic_tuner.dynamic_tuner()
94 
95  opt_srv = rospy.Service('optimize', dyn_tune.srv.Optimize, optimize_callback)
96  list_srv = rospy.Service('list_available_functions', dyn_tune.srv.ListAvailableFunctions, list_functions_callback)
97  save_srv = rospy.Service('create_function', dyn_tune.srv.CreateUserFunction, save_function_callback)
98 
99  rate = rospy.Rate(10)
100  while not rospy.is_shutdown():
101  global dyn_tuner
102 
103  if dyn_tuner != None:
104  dyn_tuner.tune_params()
105  print dyn_tuner._src_topic
106  print dyn_tuner._dst_topic
107  print dyn_tuner._real_bag_file
108  print dyn_tuner._sim_bag_file
109  print dyn_tuner._objective
110  print dyn_tuner._objective.tasks
111  print dyn_tuner._params_desc
112 
113  dyn_tuner = None
114 
115  rate.sleep()
116 
117 
118 if __name__ == '__main__':
119  main()
def list_functions_callback(req)
def save_function_callback(req)


dyn_tune
Author(s):
autogenerated on Mon Jun 10 2019 13:03:17