Go to the documentation of this file.00001
00002
00003
00004
00005
00006
00007
00008
00009
00010 import rospy
00011 import copy
00012 import math
00013 import PyKDL
00014 from nav_msgs.msg import Odometry
00015 from geometry_msgs.msg import Twist
00016 from std_msgs.msg import String
00017 from sensor_msgs.msg import Imu
00018
00019
00020
00021
00022
00023 class Pose2D(object):
00024 def __init__(self):
00025 self.x = None
00026 self.y = None
00027 self.heading = None
00028
00029 def configured(self):
00030 return self.x and self.y and self.heading
00031
00032 def __str__(self):
00033 return ("[%s, %s, %s]"%(self.x, self.y, self.heading))
00034
00035 '''
00036 Implement a square motion
00037 '''
00038 class Square(object):
00039 STATE_FORWARD = "forward"
00040 STATE_STOP_FORWARD = "stop_forward"
00041 STATE_TURN = "turn"
00042 STATE_STOP_TURN = "stop_turn"
00043
00044 def __init__(self,cmd_vel_topic, odom_topic, gyro_topic):
00045 self._rate = rospy.Rate(20)
00046 self._speed = 0.4
00047 self._side_distance = 1.0
00048
00049 self._stop = False
00050 self._running = False
00051 self._turn_count = 0
00052
00053 self._current_pose = Pose2D()
00054 self._starting_pose = None
00055 self._state = Square.STATE_FORWARD
00056 self.last_state = Square.STATE_FORWARD
00057
00058 self.odom_subscriber = rospy.Subscriber(odom_topic, Odometry, self.odometry_callback)
00059 self.cmd_vel_publisher = rospy.Publisher(cmd_vel_topic,Twist)
00060 self.gyro_subscriber = rospy.Subscriber(gyro_topic, Imu, self.heading_callback)
00061
00062 def init(self, speed, side_distance):
00063
00064 self._speed = speed
00065 self._side_distance = side_distance
00066
00067 def shutdown(self):
00068 self.stop()
00069 while self._running:
00070 rospy.sleep(0.5)
00071 self.cmd_vel_publisher.unregister()
00072 self.cmd_vel_publisher = None
00073 self.odom_subscriber.unregister()
00074 self.odom_subscriber = None
00075 self.gyro_subscriber.unregister()
00076 self.gyro_subscriber = None
00077
00078 def stop(self):
00079 self._stop = True
00080
00081 def execute(self):
00082 '''
00083 Drop this into threading.Thread or QThread for execution
00084 '''
00085 if self._running:
00086 rospy.logerr("Kobuki TestSuite: already executing a motion, ignoring the request")
00087 return
00088 timeout = rospy.Time.now() + rospy.Duration(5.0)
00089 while not self._current_pose.configured():
00090 if rospy.Time.now() > timeout:
00091 rospy.logerr("Kobuki TestSuite: no odometry data, aborting")
00092 return
00093 rospy.sleep(1.0)
00094 rospy.logwarn("Kobuki TestSuite: haven't received any odometry data yet")
00095
00096 self._stop = False
00097 self._running = True
00098 start = rospy.get_rostime()
00099 rospy.sleep(0.5)
00100
00101
00102 rospy.loginfo("Kobuki Testsuite: executing a square motion pattern [%s]"%self._side_distance)
00103 self._starting_pose = copy.deepcopy(self._current_pose)
00104
00105 while not self._stop and not rospy.is_shutdown():
00106 if self._state == Square.STATE_FORWARD:
00107 self._forward()
00108 elif self._state == Square.STATE_STOP_FORWARD:
00109 self._stop_forward()
00110 elif self._state == Square.STATE_TURN:
00111 self._turn()
00112 elif self._state == Square.STATE_STOP_TURN:
00113 self._stop_turn()
00114 if self._turn_count == 4:
00115 break
00116 self._rate.sleep()
00117 if not rospy.is_shutdown():
00118 self._command(0.0, 0.0)
00119 self._running = False
00120
00121
00122
00123
00124
00125 def _has_reached_forward_goal(self):
00126 dx = self._current_pose.x - self._starting_pose.x
00127 dy = self._current_pose.y - self._starting_pose.y
00128 travelled_sq = dx*dx + dy*dy
00129 if travelled_sq > self._side_distance*self._side_distance:
00130 return True
00131 else:
00132 return False
00133
00134 def _has_reached_turning_goal(self):
00135
00136 if math.fabs(self._current_pose.heading - self._starting_pose.heading) > math.pi/2.0:
00137 return True
00138 else:
00139 return False
00140
00141 def _command(self, linear, angular):
00142 '''
00143 Publishes a cmd_vel message for the kobuki.
00144 '''
00145 cmd = Twist()
00146 cmd.linear.x = linear
00147 cmd.angular.z = angular
00148 self.cmd_vel_publisher.publish(cmd)
00149
00150 def _forward(self):
00151 if self._has_reached_forward_goal():
00152 rospy.loginfo("Kobuki TestSuite: commencing turn")
00153 self._state = Square.STATE_STOP_FORWARD
00154
00155 else:
00156 self._command(self._speed, 0.0)
00157
00158 def _stop_forward(self):
00159 self._starting_pose = copy.deepcopy(self._current_pose)
00160 self._state = Square.STATE_TURN
00161 alpha = 0.5
00162 self._command(alpha*self._speed, -alpha*self._speed/(self._side_distance/4.0))
00163
00164 def _turn(self):
00165 if self._has_reached_turning_goal():
00166 self._state = Square.STATE_STOP_TURN
00167
00168 else:
00169
00170
00171 alpha = 0.5
00172 self._command(alpha*self._speed, -alpha*self._speed/(self._side_distance/4.0))
00173
00174 def _stop_turn(self):
00175 self._turn_count = self._turn_count + 1
00176 rospy.loginfo("Kobuki TestSuite: finished turn %s"%self._turn_count)
00177 self._starting_pose = copy.deepcopy(self._current_pose)
00178 self._state = Square.STATE_FORWARD
00179 if self._turn_count == 4:
00180 self._command(0.0, 0.0)
00181 else:
00182 self._command(self._speed, 0.0)
00183
00184
00185
00186
00187
00188
00189 def odometry_callback(self, data):
00190 self._current_pose.x = data.pose.pose.position.x
00191 self._current_pose.y = data.pose.pose.position.y
00192
00193
00194
00195 def heading_callback(self, data):
00196 quat = data.orientation
00197 rot = PyKDL.Rotation.Quaternion(quat.x, quat.y, quat.z, quat.w)
00198 self._current_pose.heading = rot.GetRPY()[2]
00199
00200
00201 if __name__ == '__main__':
00202 rospy.init_node('square')
00203 square = Square("/cmd_vel","/odom")
00204 rospy.spin()