jointstick.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 import os
3 import yaml
4 import time
5 import rospy
6 import threading
7 from std_msgs.msg import Float64
8 from geometry_msgs.msg import Twist
9 from sensor_msgs.msg import Joy, JointState
10 from trajectory_msgs.msg import JointTrajectory, JointTrajectoryPoint
11 from controller_manager_msgs.srv import ListControllers
12 
13 # Imports from custom files of this package
14 from controllers_info import *
15 from helper import Controller, JoyAction
16 
17 joint_states = None
18 joy_msg = None
19 
20 controllers = []
21 publishers = dict()
22 actions_to_exec = []
23 rate = 10# Hz
24 
25 # Thread flag
26 kill = False
27 
28 # Just the joint state callback
30  global joint_states
31  joint_states = msg
32 
33 # Just the joy callback
34 def joyCallback(msg):
35  global joy_msg
36  joy_msg = msg
37  bindingsOfJoy(msg)
38 
39 # Initialize publisher objects for all controller with joy bindings
41  global publishers
42  for controller in controllers:
43  publishers[controller.name] = rospy.Publisher(
44  controller.topic,
45  controller_type_correspondence[controller.type],
46  queue_size=1)
47 
48 # With the action_to_exec at hand, send some commands to the robot! (in a new thread)
50  while(not kill):
51  actions = dict()
52  for action, controller in actions_to_exec:
53  if controller.type in twist_controllers:
54  msg = Twist()
55  if action.msg_field == "linear.x":
56  msg.linear.x = action.value if action.axis == -1 else joy_msg.axes[action.axis] * action.value
57  elif action.msg_field == "linear.y":
58  msg.linear.y = action.value if action.axis == -1 else joy_msg.axes[action.axis] * action.value
59  elif action.msg_field == "linear.z":
60  msg.linear.z = action.value if action.axis == -1 else joy_msg.axes[action.axis] * action.value
61  elif action.msg_field == "angular.x":
62  msg.angular.x = action.value if action.axis == -1 else joy_msg.axes[action.axis] * action.value
63  elif action.msg_field == "angular.y":
64  msg.angular.y = action.value if action.axis == -1 else joy_msg.axes[action.axis] * action.value
65  elif action.msg_field == "angular.z":
66  msg.angular.z = action.value if action.axis == -1 else joy_msg.axes[action.axis] * action.value
67  if controller not in actions:
68  actions[controller] = [msg]
69  else:
70  actions[controller].append(msg)
71  elif controller.type in float_controllers:
72  v = action.value if action.axis == -1 else joy_msg.axes[action.axis] * action.value
73  stepIt = joint_states.position[joint_states.name.index(action.joint)] + v
74  publishers[controller.name].publish(stepIt)
75  elif controller.type in joint_traj_controllers:
76  msg = JointTrajectory()
77  msg.points.append(JointTrajectoryPoint())
78  v = action.value if action.axis == -1 else joy_msg.axes[action.axis] * action.value
79  stepIt = joint_states.position[joint_states.name.index(action.joint)] + v
80  for joint in controller.joints:
81  msg.joint_names.append(joint)
82  msg.points[0].positions.append(joint_states.position[joint_states.name.index(joint)])
83  msg.points[0].velocities.append(joint_states.velocity[joint_states.name.index(joint)])
84  msg.points[0].effort.append(joint_states.effort[joint_states.name.index(joint)])
85  if action.msg_field == "position":
86  msg.points[0].positions[msg.joint_names.index(action.joint)] = stepIt
87  elif action.msg_field == "velocity":
88  msg.points[0].velocities[msg.joint_names.index(action.joint)] = stepIt
89  elif action.msg_field == "acceleration":
90  msg.points[0].accelerations[msg.joint_names.index(action.joint)] = stepIt
91  elif action.msg_field == "effort":
92  msg.points[0].effort[msg.joint_names.index(action.joint)] = stepIt
93  if controller not in actions:
94  actions[controller] = [msg]
95  else:
96  actions[controller].append(msg)
97 
98  for controller in actions:
99  msg = None
100  if controller.type in twist_controllers:
101  msg = Twist()
102  for action in actions[controller]:
103  msg.linear.x = action.linear.x if abs(action.linear.x) > abs(msg.linear.x) else msg.linear.x
104  msg.linear.y = action.linear.y if abs(action.linear.y) > abs(msg.linear.y) else msg.linear.y
105  msg.linear.z = action.linear.z if abs(action.linear.z) > abs(msg.linear.z) else msg.linear.z
106  msg.angular.x = action.angular.x if abs(action.angular.x) > abs(msg.angular.x) else msg.angular.x
107  msg.angular.y = action.angular.y if abs(action.angular.y) > abs(msg.angular.y) else msg.angular.y
108  msg.angular.z = action.angular.z if abs(action.angular.z) > abs(msg.angular.z) else msg.angular.z
109  elif controller.type in joint_traj_controllers:
110  for action in actions[controller]:
111  if msg is None:
112  msg = JointTrajectory()
113  msg.header.stamp = rospy.Time.now()
114  msg.joint_names = action.joint_names
115  msg.points = action.points
116  msg.points[0].time_from_start = rospy.Duration(1.0/rate)
117  else:
118  for i in range(len(action.points[0].positions)):
119  msg.points[0].positions[i] = action.points[0].positions[i] if action.points[0].positions[i] != joint_states.position[joint_states.name.index(msg.joint_names[i])] else msg.points[0].positions[i]
120  msg.points[0].velocities[i] = action.points[0].velocities[i] if action.points[0].velocities[i] != joint_states.velocity[joint_states.name.index(msg.joint_names[i])] else msg.points[0].velocities[i]
121  msg.point.accelerations[i] = action.points[0].accelerations[i] if abs(action.points[0].accelerations[i]) > abs(joint_states.acceleration[joint_states.name.index(msg.joint_names[i])]) else msg.points[0].accelerations[i]
122  msg.points[0].effort[i] = action.points[0].effort[i] if action.points[0].effort[i] != joint_states.effort[joint_states.name.index(msg.joint_names[i])] else msg.points[0].effort[i]
123  publishers[controller.name].publish(msg)
124  time.sleep(1.0/rate)
125 
126 # Save in actions_to_exec the joy bindings that should be used to send a command to a joint
127 def bindingsOfJoy(msg):
128  global actions_to_exec
129  actions_to_exec = []
130  pressed_buttons = [x for x, e in enumerate(msg.buttons) if e != 0]
131  pressed_axes = [x for x, e in enumerate(msg.axes) if e != 0]
132  for controller in controllers:
133  for ja in controller.joyActions:
134  if ((ja.axis in pressed_axes) or (not pressed_axes and ja.axis == -1)) and\
135  ((ja.button in pressed_buttons) or (not pressed_buttons and ja.button == -1)):
136  actions_to_exec.append((ja, controller))
137 
138 # Translate a dict record to a JoyAction object
139 def joyActionFromDict(dict_record):
140  ax = dict_record["axis"]
141  b = dict_record["button"]
142  j = dict_record["joint"]
143  mf = dict_record["msg_field"]
144  v = dict_record["value"]
145  return JoyAction(b, ax, v, j, mf)
146 
147 # Translate the read YAML dict to a list of Controller objects
148 def controllersFromYAML(yaml_mess):
149  global controllers
150  for k in yaml_mess:
151  actions = []
152  for controller in controllers:
153  if controller.name == k:
154  for j in yaml_mess[k]["actions"]:
155  controller.joyActions.append(joyActionFromDict(yaml_mess[k]["actions"][j]))
156  controller.topic = yaml_mess[k]["topic"]
157  break
158 
159 # Initialisations and other boring stuff
160 def main():
161  global controllers, kill, rate
162  rospy.init_node("jointstick")
163  config_file = rospy.get_param("~config_file", "-")
164  if config_file == "-":
165  print("No config file provided. Try using jointstick_setup first to create one, and then set it at the config.yaml file. Exiting...")
166  return
167  elif not os.path.isfile(config_file):
168  print("Config file {} not found. Exiting..").format(config_file)
169  return
170  rate = rospy.get_param("~rate", 10)
171  controllers_srv = rospy.ServiceProxy("controller_manager/list_controllers", ListControllers)
172 
173  print("Reading available controllers....")
174 
175  rospy.wait_for_service("controller_manager/list_controllers")
176 
177  c = controllers_srv().controller
178 
179  for controller in c:
180  controllers.append(Controller(controller.name, controller.type, [], controller.claimed_resources[0].resources, ""))
181 
182  print("Done!")
183 
184  controllersFromYAML(yaml.safe_load(open(config_file, "r")))
185 
186  # Delete controllers that do not have any joy bindings
187  controllers = [x for x in controllers if x.joyActions]
188 
190 
191  rospy.Subscriber("/joy", Joy, joyCallback)
192  rospy.Subscriber("/joint_states", JointState, jointStatesCallback)
193 
194  t = threading.Thread(target=execCommands)
195  t.deamon = True
196  t.start()
197 
198  while not rospy.is_shutdown():
199  rospy.spin()
200 
201  kill = True
202 
203 main()
def initPublishers()
Definition: jointstick.py:40
def joyCallback(msg)
Definition: jointstick.py:34
def jointStatesCallback(msg)
Definition: jointstick.py:29
def execCommands()
Definition: jointstick.py:49
def main()
Definition: jointstick.py:160
def controllersFromYAML(yaml_mess)
Definition: jointstick.py:148
def joyActionFromDict(dict_record)
Definition: jointstick.py:139
def bindingsOfJoy(msg)
Definition: jointstick.py:127


jointstick
Author(s): George Stavrinos
autogenerated on Mon Jun 10 2019 13:38:17