motion_square.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 #
3 # License: BSD
4 # https://raw.github.com/yujinrobot/kobuki/hydro-devel/kobuki_testsuite/LICENSE
5 #
6 ##############################################################################
7 # Imports
8 ##############################################################################
9 
10 import rospy
11 import copy
12 import math
13 import PyKDL
14 from nav_msgs.msg import Odometry
15 from geometry_msgs.msg import Twist
16 from std_msgs.msg import String
17 from sensor_msgs.msg import Imu
18 
19 ##############################################################################
20 # Classes
21 ##############################################################################
22 
23 class Pose2D(object):
24  def __init__(self):
25  self.x = None
26  self.y = None
27  self.heading = None
28 
29  def configured(self):
30  return self.x and self.y and self.heading
31 
32  def __str__(self):
33  return ("[%s, %s, %s]"%(self.x, self.y, self.heading))
34 
35 '''
36  Implement a square motion
37 '''
38 class Square(object):
39  STATE_FORWARD = "forward"
40  STATE_STOP_FORWARD = "stop_forward"
41  STATE_TURN = "turn"
42  STATE_STOP_TURN = "stop_turn"
43 
44  def __init__(self,cmd_vel_topic, odom_topic, gyro_topic):
45  self._rate = rospy.Rate(20)
46  self._speed = 0.4
47  self._side_distance = 1.0
48 
49  self._stop = False
50  self._running = False
51  self._turn_count = 0
52 
54  self._starting_pose = None
55  self._state = Square.STATE_FORWARD
56  self.last_state = Square.STATE_FORWARD
57 
58  self.odom_subscriber = rospy.Subscriber(odom_topic, Odometry, self.odometry_callback)
59  self.cmd_vel_publisher = rospy.Publisher(cmd_vel_topic,Twist)
60  self.gyro_subscriber = rospy.Subscriber(gyro_topic, Imu, self.heading_callback)
61 
62  def init(self, speed, side_distance):
63  # check for suitable values here
64  self._speed = speed
65  self._side_distance = side_distance
66 
67  def shutdown(self):
68  self.stop()
69  while self._running:
70  rospy.sleep(0.5)
71  self.cmd_vel_publisher.unregister()
72  self.cmd_vel_publisher = None
73  self.odom_subscriber.unregister()
74  self.odom_subscriber = None
75  self.gyro_subscriber.unregister()
76  self.gyro_subscriber = None
77 
78  def stop(self):
79  self._stop = True
80 
81  def execute(self):
82  '''
83  Drop this into threading.Thread or QThread for execution
84  '''
85  if self._running:
86  rospy.logerr("Kobuki TestSuite: already executing a motion, ignoring the request")
87  return
88  timeout = rospy.Time.now() + rospy.Duration(5.0)
89  while not self._current_pose.configured():
90  if rospy.Time.now() > timeout:
91  rospy.logerr("Kobuki TestSuite: no odometry data, aborting")
92  return
93  rospy.sleep(1.0)
94  rospy.logwarn("Kobuki TestSuite: haven't received any odometry data yet")
95 
96  self._stop = False
97  self._running = True
98  start = rospy.get_rostime()
99  rospy.sleep(0.5)
100 
101  #turning_arc_radius = side_distance/4.0
102  rospy.loginfo("Kobuki Testsuite: executing a square motion pattern [%s]"%self._side_distance)
103  self._starting_pose = copy.deepcopy(self._current_pose)
104 
105  while not self._stop and not rospy.is_shutdown():
106  if self._state == Square.STATE_FORWARD:
107  self._forward()
108  elif self._state == Square.STATE_STOP_FORWARD:
109  self._stop_forward()
110  elif self._state == Square.STATE_TURN:
111  self._turn()
112  elif self._state == Square.STATE_STOP_TURN:
113  self._stop_turn()
114  if self._turn_count == 4:
115  break
116  self._rate.sleep()
117  if not rospy.is_shutdown():
118  self._command(0.0, 0.0)
119  self._running = False
120 
121  ##########################################################################
122  # Utilities
123  ##########################################################################
124 
126  dx = self._current_pose.x - self._starting_pose.x
127  dy = self._current_pose.y - self._starting_pose.y
128  travelled_sq = dx*dx + dy*dy
129  if travelled_sq > self._side_distance*self._side_distance:
130  return True
131  else:
132  return False
133 
135  #rospy.loginfo("Poses [%s]-[%s]"%(self._current_pose.heading,self._starting_pose.heading))
136  if math.fabs(self._current_pose.heading - self._starting_pose.heading) > math.pi/2.0:
137  return True
138  else:
139  return False
140 
141  def _command(self, linear, angular):
142  '''
143  Publishes a cmd_vel message for the kobuki.
144  '''
145  cmd = Twist()
146  cmd.linear.x = linear
147  cmd.angular.z = angular
148  self.cmd_vel_publisher.publish(cmd)
149 
150  def _forward(self):
151  if self._has_reached_forward_goal():
152  rospy.loginfo("Kobuki TestSuite: commencing turn")
153  self._state = Square.STATE_STOP_FORWARD
154  #self._command(0.0,0.0)
155  else:
156  self._command(self._speed, 0.0)
157 
158  def _stop_forward(self):
159  self._starting_pose = copy.deepcopy(self._current_pose)
160  self._state = Square.STATE_TURN
161  alpha = 0.5
162  self._command(alpha*self._speed, -alpha*self._speed/(self._side_distance/4.0))
163 
164  def _turn(self):
165  if self._has_reached_turning_goal():
166  self._state = Square.STATE_STOP_TURN
167  #self._command(0.0,0.0)
168  else:
169  # set up a turning arc, 1/4 the length of the side
170  # actually, this almost always creates incredibly fast turns, so crank it down by alpha
171  alpha = 0.5
172  self._command(alpha*self._speed, -alpha*self._speed/(self._side_distance/4.0))
173 
174  def _stop_turn(self):
175  self._turn_count = self._turn_count + 1
176  rospy.loginfo("Kobuki TestSuite: finished turn %s"%self._turn_count)
177  self._starting_pose = copy.deepcopy(self._current_pose)
178  self._state = Square.STATE_FORWARD
179  if self._turn_count == 4:
180  self._command(0.0, 0.0)
181  else:
182  self._command(self._speed, 0.0)
183 
184 
185  ##########################################################################
186  # Ros Callbacks
187  ##########################################################################
188 
189  def odometry_callback(self, data):
190  self._current_pose.x = data.pose.pose.position.x
191  self._current_pose.y = data.pose.pose.position.y
192  # if self._current_pose.configured():
193  # rospy.loginfo("TesSuite: %s"%self._current_pose)
194 
195  def heading_callback(self, data):
196  quat = data.orientation
197  rot = PyKDL.Rotation.Quaternion(quat.x, quat.y, quat.z, quat.w)
198  self._current_pose.heading = rot.GetRPY()[2]
199 
200 
201 if __name__ == '__main__':
202  rospy.init_node('square')
203  square = Square("/cmd_vel","/odom")
204  rospy.spin()
def _has_reached_forward_goal(self)
Utilities.
def odometry_callback(self, data)
Ros Callbacks.
def _command(self, linear, angular)
def __init__(self, cmd_vel_topic, odom_topic, gyro_topic)
def init(self, speed, side_distance)


kobuki_testsuite
Author(s): Jorge Santos Simon
autogenerated on Mon Jun 10 2019 13:45:22