joy_footstep_planner.py
Go to the documentation of this file.
1 import rospy
2 
3 import imp
4 try:
5  imp.find_module("actionlib")
6 except:
7  import roslib; roslib.load_manifest('jsk_teleop_joy')
8 import threading
9 import actionlib
10 from jsk_teleop_joy.joy_pose_6d import JoyPose6D
11 from actionlib_msgs.msg import GoalStatusArray
12 from jsk_footstep_msgs.msg import PlanFootstepsAction, PlanFootstepsGoal, Footstep, FootstepArray, ExecFootstepsAction, ExecFootstepsGoal
13 from jsk_rviz_plugins.msg import OverlayMenu, OverlayText
14 from std_msgs.msg import UInt8, Empty
15 from std_srvs.srv import Empty as EmptySrv
16 import std_srvs.srv
17 import tf
18 from tf.transformations import *
19 from geometry_msgs.msg import PoseStamped
20 import jsk_teleop_joy.tf_ext as tf_ext
21 from jsk_footstep_planner.srv import ChangeSuccessor
22 class JoyFootstepPlanner(JoyPose6D):
23  EXECUTING = 1
24  PLANNING = 2
25  WAIT_FOR_CANCEL = 3
26  CANCELED = 4
27  CANCELED_MENUS = ["Cancel",
28  "Ignore and proceed",
29  "Use larger footsteps",
30  "Use middle footsteps",
31  "Use smaller footsteps"]
32  def __init__(self, name, args):
33  JoyPose6D.__init__(self, name, args)
34  self.usage_pub = rospy.Publisher("/joy/usage", OverlayText)
35  self.supportFollowView(True)
36  self.mode = self.PLANNING
37  self.snapped_pose = None
39  self.prev_mode = self.PLANNING
40  self.frame_id = self.getArg('frame_id', '/map')
41  self.lfoot_frame_id = rospy.get_param('~lfoot_frame_id', '/LLEG_LINK5')
42  self.rfoot_frame_id = rospy.get_param('~rfoot_frame_id', '/RLEG_LINK5')
43  self.lfoot_offset = tf_ext.xyzxyzwToMatrix(rospy.get_param('~lfoot_offset'))
44  self.rfoot_offset = tf_ext.xyzxyzwToMatrix(rospy.get_param('~rfoot_offset'))
45  self.lock_furutaractive = self.getArg("lock_furutaractive", False)
46  self.furutaractive_namespace = self.getArg("furutaractive_namespace", "urdf_model_marker")
47  self.command_pub = rospy.Publisher('/menu_command', UInt8)
48  self.execute_pub = rospy.Publisher(self.getArg('execute', 'execute'), Empty)
49  self.resume_pub = rospy.Publisher(self.getArg('resume', 'resume'), Empty)
51  self.menu_pub = rospy.Publisher("/overlay_menu", OverlayMenu)
52  # initialize self.pre_pose
53  rospy.loginfo("waiting %s" % (self.lfoot_frame_id))
54  self.tf_listener.waitForTransform(self.frame_id, self.lfoot_frame_id,
55  rospy.Time(0.0), rospy.Duration(100.0))
56  rospy.loginfo("waiting %s" % (self.rfoot_frame_id))
57  self.tf_listener.waitForTransform(self.frame_id, self.rfoot_frame_id,
58  rospy.Time(0.0), rospy.Duration(100.0))
59  self.exec_client = actionlib.SimpleActionClient('/footstep_controller',
60  ExecFootstepsAction)
61  self.status_sub = rospy.Subscriber("/footstep_controller/status", GoalStatusArray,
62  self.statusCB, queue_size=1)
63  self.snap_sub = rospy.Subscriber(self.getArg("snapped_pose", "/footstep_marker/snapped_pose"),
64  PoseStamped,
65  self.snapCB,
66  queue_size=1)
67  self.cancel_menu_sub = rospy.Subscriber("/footstep_cancel_broadcast", Empty,
68  self.cancelMenuCB, queue_size=1)
69  self.status_lock = threading.Lock()
71  self.resetGoalPose()
72  def lockFurutaractive(self):
73  try:
74  lock = rospy.ServiceProxy(self.furutaractive_namespace + '/lock_joint_states', EmptySrv)
75  lock()
76  except rospy.ServiceException as e:
77  rospy.logerror("failed to call service: %s" % (e.message))
79  try:
80  unlock = rospy.ServiceProxy(self.furutaractive_namespace + '/unlock_joint_states', EmptySrv)
81  unlock()
82  except rospy.ServiceException as e:
83  rospy.logerror("failed to call service: %s" % (e.message))
84  def cancelMenuCB(self, msg):
85  with self.status_lock:
86  self.mode = self.CANCELED
87  def snapCB(self, msg):
88  self.snapped_pose = msg
89  def statusCB(self, msg):
90  if len(msg.status_list) == 0:
91  return # do nothing
92  self.status_lock.acquire()
93  # msg.status_list[0].status == 0 -> done
94  if self.mode == self.EXECUTING and msg.status_list[0].status == 0: # ???
95  # done
96  if self.ignore_next_status_flag: # hack!!
97  self.ignore_next_status_flag = False
98  else:
99  self.mode = self.PLANNING
100  if self.lock_furutaractive:
101  self.unlockFurutaractive()
102  elif self.mode == self.WAIT_FOR_CANCEL and msg.status_list[0].status == 0:
103  self.mode = self.CANCELED
104  self.status_lock.release()
105  def publishUsage(self):
106  overlay_text = OverlayText()
107  overlay_text.text = """
108 Left Analog: x/y
109 L2/R2 : +-z
110 L1/R1 : +-yaw
111 Left/Right : +-roll
112 Up/Down : +-pitch
113 circle : Go
114 cross : Reset/Cancel
115 triangle : Clear maps and look around ground
116 up/down : Move menu cursors
117 """
118  overlay_text.width = 500
119  overlay_text.height = 500
120  overlay_text.text_size = 12
121  overlay_text.left = 10
122  overlay_text.top = 10
123  overlay_text.font = "Ubuntu Mono Regular"
124  overlay_text.bg_color.a = 0
125  overlay_text.fg_color.r = 25 / 255.0
126  overlay_text.fg_color.g = 1
127  overlay_text.fg_color.b = 1
128  overlay_text.fg_color.a = 1
129  self.usage_pub.publish(overlay_text)
130  def resetGoalPose(self):
131  # initial pose will be the center
132  # of self.lfoot_frame_id and self.rfoot_frame_id
133  lfoot_pose = tf_ext.transformToMatrix(self.tf_listener.lookupTransform(
134  self.frame_id,
135  self.lfoot_frame_id,
136  rospy.Time(0.0)))
137  rfoot_pose = tf_ext.transformToMatrix(self.tf_listener.lookupTransform(
138  self.frame_id,
139  self.rfoot_frame_id,
140  rospy.Time(0.0)))
141  # apply offset
142  lfoot_with_offset = numpy.dot(lfoot_pose, self.lfoot_offset)
143  rfoot_with_offset = numpy.dot(rfoot_pose, self.rfoot_offset)
144  (lfoot_pos, lfoot_q) = tf_ext.decomposeMatrix(lfoot_with_offset)
145  (rfoot_pos, rfoot_q) = tf_ext.decomposeMatrix(rfoot_with_offset)
146  # compute the center of the two transformations
147  mid_pos = (lfoot_pos + rfoot_pos) / 2.0
148  mid_quaternion = quaternion_slerp(lfoot_q, rfoot_q,
149  0.5)
150  self.pre_pose.pose.position.x = mid_pos[0]
151  self.pre_pose.pose.position.y = mid_pos[1]
152  self.pre_pose.pose.position.z = mid_pos[2]
153  self.pre_pose.pose.orientation.x = mid_quaternion[0]
154  self.pre_pose.pose.orientation.y = mid_quaternion[1]
155  self.pre_pose.pose.orientation.z = mid_quaternion[2]
156  self.pre_pose.pose.orientation.w = mid_quaternion[3]
157  def executePlan(self):
158  # publish execute with std_msgs/empty
159  self.execute_pub.publish(Empty())
160  def resumePlan(self):
161  # publish execute with std_msgs/empty
162  self.resume_pub.publish(Empty())
163  def publishMenu(self, close=False):
164  menu = OverlayMenu()
165  menu.menus = self.CANCELED_MENUS
166  menu.current_index = self.current_selecting_index
167  menu.title = "Canceled"
168  if close:
169  menu.action = OverlayMenu.ACTION_CLOSE
170  self.menu_pub.publish(menu)
171  def changePlanningSuccessor(self, successor_type):
172  try:
173  change_successor = rospy.ServiceProxy('/change_successor', ChangeSuccessor)
174  change_successor(successor_type)
175  except rospy.ServiceException as e:
176  rospy.logerror("failed to call service: %s" % (e.message))
177  def procCancelMenu(self, index):
178  selected_title = self.CANCELED_MENUS[index]
179  if selected_title == "Cancel":
180  self.status_lock.acquire()
181  self.mode = self.PLANNING
182  self.status_lock.release()
183  self.publishMenu(close=True)
184  elif selected_title == "Ignore and proceed":
185  # re-execute the plan left
186  self.resumePlan()
187  self.status_lock.acquire()
188  self.mode = self.EXECUTING
189  self.status_lock.release()
190  self.publishMenu(close=True)
191  elif selected_title == "Use smaller footsteps":
192  self.status_lock.acquire()
193  self.mode = self.PLANNING
194  self.changePlanningSuccessor("small")
195  self.status_lock.release()
196  self.publishMenu(close=True)
197  elif (selected_title == "Use larger footsteps" or
198  selected_title == "Use middle footsteps"):
199  self.status_lock.acquire()
200  self.mode = self.PLANNING
201  self.changePlanningSuccessor("normal")
202  self.status_lock.release()
203  self.publishMenu(close=True)
204  def lookAround(self):
205  try:
206  clear_maps = rospy.ServiceProxy('/env_server/clear_maps', std_srvs.srv.Empty)
207  clear_maps()
208  look_around = rospy.ServiceProxy('/lookaround_ground', std_srvs.srv.Empty)
209  look_around()
210  except Exception as e:
211  rospy.logerr("error when lookaround ground: %s", e.message)
212  def joyCB(self, status, history):
213  self.publishUsage()
214  if self.prev_mode != self.mode:
215  print(self.prev_mode, " -> ", self.mode)
216  if self.mode == self.PLANNING:
217  JoyPose6D.joyCB(self, status, history)
218  if history.new(status, "circle"):
219  self.status_lock.acquire()
220  self.mode = self.EXECUTING
221  if self.lock_furutaractive:
222  self.lockFurutaractive()
223  self.executePlan()
224  self.ignore_next_status_flag = True
225  self.status_lock.release()
226  elif history.new(status, "cross"):
227  self.resetGoalPose()
228  elif history.new(status, "triangle"):
229  self.lookAround()
230  elif self.mode == self.CANCELED:
231  # show menu
232  if history.new(status, "circle"):
233  # choosing
235  else:
236  if history.new(status, "up"):
238  elif history.new(status, "down"):
240  # menu is only cancel/ignore
241  if self.current_selecting_index < 0:
242  self.current_selecting_index = len(self.CANCELED_MENUS) - 1
243  elif self.current_selecting_index > len(self.CANCELED_MENUS) - 1:
244  self.current_selecting_index = 0
245  self.publishMenu()
246  elif self.mode == self.EXECUTING:
247  # if history.new(status, "triangle"):
248  # self.command_pub.publish(UInt8(1))
249  if history.new(status, "cross"):
250  self.status_lock.acquire()
251  self.exec_client.cancel_all_goals()
252  self.mode = self.WAIT_FOR_CANCEL
253  self.status_lock.release()
254  self.prev_mode = self.mode


jsk_teleop_joy
Author(s): Ryohei Ueda
autogenerated on Sun May 28 2023 03:03:37