vehicle.py
Go to the documentation of this file.
1 import rospy
2 
3 import actionlib
4 from jsk_teleop_joy.joy_plugin import JSKJoyPlugin
5 from drc_task_common.srv import StringRequest, StringRequestResponse
6 
7 try:
8  imp.find_module("std_msgs")
9 except:
10  import roslib; roslib.load_manifest('jsk_teleop_joy')
11 
12 
13 from std_msgs.msg import String, Float64, Bool, Float32
14 from std_srvs.srv import Empty, EmptyResponse
15 from geometry_msgs.msg import PoseStamped
16 import xml.etree.ElementTree as ET
17 import os, sys, time, numpy
18 
20  def __init__(self, name, args):
21  JSKJoyPlugin.__init__(self, name, args)
22  self.command_states = {
23  "handle": HandleCommandState("handle", Float64, Float32, "drive/controller/goal_handle_angle"),
24  "accel": VehicleCommandState("accel", Float64, Float32, "drive/controller/step"),
25  "brake": VehicleCommandState("brake", Float64, None, None), # do not support brake synchronize
26  "neck_y": VehicleCommandState("neck_y", Float64, Float32, "drive/controller/neck_y_angle"),
27  "neck_p": VehicleCommandState("neck_p", Float64, Float32, "drive/controller/neck_p_angle")
28  }
30  self.execute_flag = False
31  print("Joystick initialization is finished", file=sys.stderr)
32  self.initialize_service = rospy.Service('drive/operation/initialize', Empty, self.initializeServiceCallback)
33  self.synchronize_service = rospy.Service('drive/operation/synchronize', StringRequest, self.synchronizeServiceCallback)
34  self.subscriber = rospy.Subscriber('drive/execute_flag', Bool, self.executeFlagCallback)
35 
36  def joyCB(self, status, history):
37  latest = history.latest()
38  handle_resolution = 0.025 # rad
39  neck_y_resolution = 0.1 # deg
40  neck_y_angle_max = 35.0
41  neck_y_angle_min = -35.0
42  neck_p_resolution = 0.1 # deg
43  neck_p_angle_max = 35.0
44  neck_p_angle_min = -35.0
45  max_accel_resolution = 0.05
46  max_brake_resolution = 1.0
47 
48  if not self.execute_flag or not latest:
49  return
50 
51  # handle command
52  if status.left_analog_x:
53  self.command_states["handle"].command = self.command_states["handle"].command + handle_resolution * status.left_analog_x
54  # neck_y command
55  if status.left:
56  self.command_states["neck_y"].command = self.commandJointAngle(self.command_states["neck_y"].command, neck_y_resolution, neck_y_angle_max, neck_y_angle_min)
57  elif status.right:
58  self.command_states["neck_y"].command = self.commandJointAngle(self.command_states["neck_y"].command, -neck_y_resolution, neck_y_angle_max, neck_y_angle_min)
59  # neck_p command (head goes down when neck_p incleases)
60  if status.up:
61  self.command_states["neck_p"].command = self.commandJointAngle(self.command_states["neck_p"].command, -neck_p_resolution, neck_p_angle_max, neck_p_angle_min)
62  elif status.down:
63  self.command_states["neck_p"].command = self.commandJointAngle(self.command_states["neck_p"].command, neck_p_resolution, neck_p_angle_max, neck_p_angle_min)
64  # accel command
65  if status.right_analog_y:
66  self.command_states["accel"].command = max(status.right_analog_y, 0.0)
67  if status.right_analog_y < -0.5:
68  self.command_states["brake"].command = 1.0
69  else:
70  self.command_states["brake"].command = 0.0
71  else:
72  self.command_states["accel"].command = 0.0
73  self.command_states["brake"].command = 0.0
74 
75  for command in self.command_states.values():
76  command.publishCommand()
77 
78  def commandJointAngle(self, current_value, resolution, max_value, min_value): # max_value assumed to be positive
79  next_value = current_value + resolution
80  if next_value > max_value:
81  next_value = max_value
82  elif next_value < min_value:
83  next_value = min_value
84  return next_value
85 
87  for command in self.command_states.values():
88  command.initialize()
89 
90  def synchronizeCommand(self, key):
91  print("Sync " + key, file=sys.stderr)
92  self.command_states[key].synchronize()
93 
95  for key in self.command_states.keys():
96  self.synchronizeCommand(key)
97 
98  def executeFlagCallback(self, msg):
99  self.execute_flag = msg.data
100  print("set execute_flag as " + str(self.execute_flag), file=sys.stderr)
101 
103  print("initialize vehicle joy", file=sys.stderr)
104  self.initializeAllCommand()
105  return EmptyResponse()
106 
108  key = req.data.lower()
109  if key in self.command_states:
110  self.synchronizeCommand(key)
111  elif key == "neck":
112  self.synchronizeCommand("neck_p")
113  self.synchronizeCommand("neck_y")
114  elif key == "all":
115  self.synchronizeAllCommand()
116  else:
117  print("Invalid key", file=sys.stderr)
118  return StringRequestResponse()
119 
121  def __init__(self, command_name, pub_type, sub_type, robot_topic):
122  self.command_name = command_name
123  self.pub_type = pub_type
124  self.sub_type = sub_type
126  self.robot_topic = robot_topic
127  self.publisher = rospy.Publisher("drive/operation/" + self.command_name + "_cmd_fast", self.pub_type)
128  if robot_topic and self.sub_type:
129  self.subscriber = rospy.Subscriber(robot_topic, self.sub_type, self.callback)
130  else:
131  self.subscriber = None
132  self.initialize()
133  def callback(self, msg):
134  self.robot_value = msg.data
135  def initialize(self):
136  self.command = 0.0
137  self.robot_value = 0.0
138  def synchronize(self, timeout = 1.0):
139  if self.robot_topic != None and self.sub_type != None:
140  try:
141  if self.wait_for_message_flag:
142  rospy.wait_for_message(self.robot_topic, self.sub_type, timeout)
143  self.wait_for_message_flag = False
144  print("Sync with " + self.robot_topic, file=sys.stderr)
145  print("%s -> %s" % (str(self.command), str(self.robot_value)))
146  self.command = self.robot_value
147  except rospy.ROSException as e:
148  print("Cannot subscribe " + self.robot_topic, file=sys.stderr)
149  def publishCommand(self):
150  pub_msg = self.pub_type(data = self.command)
151  self.publisher.publish(pub_msg)
152 
154  def callback(self, msg):
155  self.robot_value = msg.data * numpy.pi / 180.0 # goal_handle_angle[deg] -> command[rad]
156  # def synchronize(self):
157  # self.command = self.robot_value * numpy.pi / 180.0 # robot_value[deg] -> command[rad]
158 
def commandJointAngle(self, current_value, resolution, max_value, min_value)
Definition: vehicle.py:78
def __init__(self, command_name, pub_type, sub_type, robot_topic)
Definition: vehicle.py:121


jsk_teleop_joy
Author(s): Ryohei Ueda
autogenerated on Fri May 14 2021 02:52:11