emulation_base.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 
3 import copy
4 import math
5 
6 import actionlib
7 import rospy
8 import tf
9 import tf2_ros
10 from actionlib_msgs.msg import GoalStatus
11 from geometry_msgs.msg import Twist, Transform, TransformStamped, PoseStamped
12 from geometry_msgs.msg import Pose, PoseWithCovarianceStamped, Quaternion
13 from move_base_msgs.msg import MoveBaseAction, MoveBaseGoal, MoveBaseResult
14 from nav_msgs.msg import Odometry
15 from std_srvs.srv import Trigger, TriggerRequest, TriggerResponse
16 
17 class EmulationBase(object):
18  def __init__(self):
19  # this node emulates a base including localization
20  #
21  # interfaces
22  # - subscribers:
23  # - /base/twist_controller/command [geometry_msgs/Twist]
24  # - /initialpose [geometry_msgs/PoseWithCovarianceStamped]
25  # - publishers:
26  # - /base/odometry_controller/odometry [nav_msgs/Odometry]
27  # - tf (odom_combined --> base_footprint, map --> odom_combined)
28  # - actions:
29  # - move_base [move_base_msgs/MoveBaseAction] (optional)
30 
31 
32  # TODO
33  # - add service reset odometry (/base/odometry_controller/reset_odometry [std_srvs::Trigger])
34  # - speed factor
35  # - handle cancel on move_base action
36 
37  rospy.Subscriber("/base/twist_controller/command", Twist, self.twist_callback, queue_size=1)
38  rospy.Subscriber("/initialpose", PoseWithCovarianceStamped, self.initalpose_callback, queue_size=1)
39  self.pub_odom = rospy.Publisher("/base/odometry_controller/odometry", Odometry, queue_size=1)
40  rospy.Service("/base/odometry_controller/reset_odometry", Trigger, self.reset_odometry)
44 
45  self.timestamp_last_update = rospy.Time.now()
46 
47  self.twist = Twist()
48  self.timestamp_last_twist = rospy.Time(0)
49 
50  self.odom = Odometry()
51  self.odom.header.frame_id = "odom_combined"
52  self.odom.child_frame_id = "base_footprint"
53  self.odom.pose.pose.orientation.w = 1 # initialize orientation with a valid quaternion
54 
55 
56 
57  initialpose = rospy.get_param("~initialpose", None)
58  if type(initialpose) == list:
59  rospy.loginfo("using initialpose from parameter server: %s", str(initialpose))
60  elif type(initialpose) == str:
61  rospy.loginfo("using initialpose from script server: %s", str(initialpose))
62  initialpose = rospy.get_param("/script_server/base/" + initialpose)
63  else:
64  rospy.loginfo("initialpose not set, using [0, 0, 0] as initialpose")
65  initialpose = [0, 0, 0]
66 
67  self.initial_pose = Transform()
68  self.initial_pose.translation.x = initialpose[0]
69  self.initial_pose.translation.y = initialpose[1]
70  quat = tf.transformations.quaternion_from_euler(0, 0, initialpose[2])
71  self.initial_pose.rotation = Quaternion(*quat)
72 
73  rospy.Timer(rospy.Duration(0.1), self.timer_cb)
74 
75  rospy.loginfo("Emulation for base running")
76 
77  # Optional move_base action
78  self.move_base_mode = rospy.get_param("~move_base_mode", None)
79  if self.move_base_mode == None:
80  rospy.loginfo("Emulation running without move_base")
81  elif self.move_base_mode == "beam" or self.move_base_mode == "linear_nav":
82  self.move_base_action_name = "/move_base"
83  rospy.Subscriber(self.move_base_action_name + "_simple/goal", PoseStamped, self.move_base_simple_callback, queue_size=1)
84  self.as_move_base = actionlib.SimpleActionServer(self.move_base_action_name, MoveBaseAction, execute_cb=self.move_base_cb, auto_start = False)
85  self.as_move_base.start()
86  rospy.loginfo("Emulation running for action %s of type MoveBaseAction with mode '%s'"%(self.move_base_action_name, self.move_base_mode))
87  else:
88  rospy.logwarn("Emulation running without move_base due to invalid value for parameter move_base_mode: '%s'", self.move_base_mode)
89 
90  def initalpose_callback(self, msg):
91  self.initial_pose.translation.x = msg.pose.pose.position.x
92  self.initial_pose.translation.y = msg.pose.pose.position.y
93  self.initial_pose.translation.z = msg.pose.pose.position.z
94 
95  yaw1 = tf.transformations.euler_from_quaternion(
96  [msg.pose.pose.orientation.x,
97  msg.pose.pose.orientation.y,
98  msg.pose.pose.orientation.z,
99  msg.pose.pose.orientation.w])[2]
100  yaw2 = tf.transformations.euler_from_quaternion(
101  [self.odom.pose.pose.orientation.x,
102  self.odom.pose.pose.orientation.y,
103  self.odom.pose.pose.orientation.z,
104  self.odom.pose.pose.orientation.w])[2]
105 
106  self.initial_pose.rotation = Quaternion(*[msg.pose.pose.orientation.x,
107  msg.pose.pose.orientation.y,
108  msg.pose.pose.orientation.z,
109  msg.pose.pose.orientation.w])
110  self.odom.pose.pose = Pose()
111  self.odom.pose.pose.orientation.w = 1
112 
113  def reset_odometry(self, req):
114  transform = self.tf_buffer.lookup_transform("map", "base_link", rospy.Time(0))
115 
116  self.initial_pose = transform.transform
117 
118  self.odom.pose.pose = Pose()
119  self.odom.pose.pose.orientation.w = 1
120 
121  return TriggerResponse(True, "odometry resetted")
122 
123  def twist_callback(self, msg):
124  self.twist = msg
125  self.timestamp_last_twist = rospy.Time.now()
126 
127  def timer_cb(self, event):
128  # move robot (calculate new pose)
129  dt = rospy.Time.now() - self.timestamp_last_update
130  self.timestamp_last_update = rospy.Time.now()
131  time_since_last_twist = rospy.Time.now() - self.timestamp_last_twist
132 
133  #print "dt =", dt.to_sec(), ". duration since last twist =", time_since_last_twist.to_sec()
134  # we assume we're not moving any more if there is no new twist after 0.1 sec
135  if time_since_last_twist < rospy.Duration(0.1):
136  new_pose = copy.deepcopy(self.odom.pose.pose)
137  yaw = tf.transformations.euler_from_quaternion([self.odom.pose.pose.orientation.x, self.odom.pose.pose.orientation.y, self.odom.pose.pose.orientation.z, self.odom.pose.pose.orientation.w])[2] + self.twist.angular.z * dt.to_sec()
138  quat = tf.transformations.quaternion_from_euler(0, 0, yaw)
139  new_pose.orientation.x = quat[0]
140  new_pose.orientation.y = quat[1]
141  new_pose.orientation.z = quat[2]
142  new_pose.orientation.w = quat[3]
143  new_pose.position.x += self.twist.linear.x * dt.to_sec() * math.cos(yaw) - self.twist.linear.y * dt.to_sec() * math.sin(yaw)
144  new_pose.position.y += self.twist.linear.x * dt.to_sec() * math.sin(yaw) + self.twist.linear.y * dt.to_sec() * math.cos(yaw)
145  self.odom.pose.pose = new_pose
146 
147  # we're moving, so we set a non-zero twist
148  self.odom.twist.twist.linear.x = self.twist.linear.x
149  self.odom.twist.twist.linear.y = self.twist.linear.y
150  self.odom.twist.twist.angular.z = self.twist.angular.z
151  else:
152  # reset twist as we're not moving anymore
153  self.odom.twist.twist = Twist()
154 
155  # publish odometry
156  odom = copy.deepcopy(self.odom)
157  odom.header.stamp = rospy.Time.now() # update to current time stamp
158  self.pub_odom.publish(odom)
159 
160  # publish tf
161  # pub base_footprint --> odom_combined
162  t_loc = TransformStamped()
163  t_loc.header.stamp = rospy.Time.now()
164  t_loc.header.frame_id = "odom_combined"
165  t_loc.child_frame_id = "base_footprint"
166  t_loc.transform.translation = self.odom.pose.pose.position
167  t_loc.transform.rotation = self.odom.pose.pose.orientation
168 
169  # pub odom_combined --> map
170  t_odom = TransformStamped()
171  t_odom.header.stamp = rospy.Time.now()
172  t_odom.header.frame_id = "map"
173  t_odom.child_frame_id = "odom_combined"
174  t_odom.transform = self.initial_pose
175 
176  transforms = [t_loc, t_odom]
177 
178  self.br.sendTransform(transforms)
179 
180  def move_base_cb(self, goal):
181  pwcs = PoseWithCovarianceStamped()
182  pwcs.header = goal.target_pose.header
183  pwcs.pose.pose = goal.target_pose.pose
184  if self.move_base_mode == "beam":
185  rospy.loginfo("move_base: beaming robot to new goal")
186  self.initalpose_callback(pwcs)
187  elif self.move_base_mode == "linear_nav":
188  move_base_linear_action_name = "/move_base_linear"
189  ac_move_base_linear = actionlib.SimpleActionClient(move_base_linear_action_name, MoveBaseAction)
190  rospy.loginfo("Waiting for ActionServer: %s", move_base_linear_action_name)
191  if not ac_move_base_linear.wait_for_server(rospy.Duration(1)):
192  rospy.logerr("Emulator move_base failed because move_base_linear action server is not available")
193  self.as_move_base.set_aborted()
194  return
195  rospy.loginfo("send goal to %s", move_base_linear_action_name)
196  ac_move_base_linear.send_goal(goal)
197  ac_move_base_linear.wait_for_result()
198  ac_move_base_linear_status = ac_move_base_linear.get_state()
199  ac_move_base_linear_result = ac_move_base_linear.get_result()
200  if ac_move_base_linear_status != GoalStatus.SUCCEEDED:
201  rospy.logerr("Emulator move_base failed because move_base_linear failed")
202  self.as_move_base.set_aborted()
203  return
204  else:
205  rospy.logerr("Invalid move_base_action_mode")
206  self.as_move_base.set_aborted()
207  return
208  rospy.loginfo("Emulator move_base succeeded")
209  self.as_move_base.set_succeeded(MoveBaseResult())
210 
212  goal = MoveBaseGoal()
213  goal.target_pose = msg
214  ac_move_base = actionlib.SimpleActionClient(self.move_base_action_name, MoveBaseAction)
215  rospy.loginfo("Waiting for ActionServer: %s", self.move_base_action_name)
216  if not ac_move_base.wait_for_server(rospy.Duration(1)):
217  rospy.logerr("Emulator move_base simple failed because move_base action server is not available")
218  return
219  rospy.loginfo("send goal to %s", self.move_base_action_name)
220  ac_move_base.send_goal(goal)
221  # ac_move_base.wait_for_result() # no need to wait for the result as this is the topic interface to move_base without feedback
222 
223 if __name__ == '__main__':
224  rospy.init_node('emulation_base')
225  EmulationBase()
226  rospy.spin()
def initalpose_callback(self, msg)
def move_base_simple_callback(self, msg)


cob_hardware_emulation
Author(s): Florian Weisshardt
autogenerated on Thu Apr 8 2021 02:39:41