14 from sensor_msgs.msg
import LaserScan, Imu
15 from geometry_msgs.msg
import Twist
16 from kobuki_msgs.msg
import ScanAngle
23 rot = PyKDL.Rotation.Quaternion(quat.x, quat.y, quat.z, quat.w)
24 return rot.GetRPY()[2]
27 if angle <= math.pi
and angle >= -math.pi:
30 return math.fmod(angle-math.pi,2.0*math.pi)+math.pi
32 return math.fmod(angle+math.pi,2.0*math.pi)-math.pi;
39 def __init__(self, scan_topic, scan_angle_topic):
42 self.
lock = threading.Lock()
46 def init(self, min_angle=-0.3, max_angle=0.3):
51 print "Killing off scan angle publisher" 52 if not rospy.is_shutdown():
54 self.scan_subscriber.unregister()
56 self._laser_scan_angle_publisher.unregister()
62 d_angle = msg.angle_increment
70 x = math.sin(angle) * r
71 y = math.cos(angle) * r
79 denominator = num*sum_xx-sum_x*sum_x
81 angle=math.atan2((-sum_x*sum_y+num*sum_xy)/(denominator), 1)
84 relay.header = msg.header
85 relay.scan_angle = angle
88 self._laser_scan_angle_publisher.publish(relay)
90 rospy.logerr(
"Please point me at a wall.")
93 def __init__(self, laser_scan_angle_topic, gyro_scan_angle_topic, error_scan_angle_topic, cmd_vel_topic, gyro_topic):
94 self.
lock = threading.Lock()
130 if not rospy.is_shutdown():
131 print "Shutting down drift estimation" 132 self._gyro_scan_angle_publisher.unregister()
134 self._laser_scan_angle_subscriber.unregister()
136 self._error_scan_angle_publisher.unregister()
138 self.gyro_subscriber.unregister()
140 self.cmd_vel_publisher.unregister()
148 Drop this into threading.Thread or QThread for execution 151 rospy.logerr(
"Kobuki TestSuite: already executing, ignoring the request")
156 rospy.loginfo(
"Kobuki Testsuite: could not align, please point me at a wall.")
165 last_gyro_time = rospy.get_rostime()
166 last_scan_time = rospy.get_rostime()
169 gyro_timeout_count = 0
170 while not self.
_stop and not rospy.is_shutdown():
176 if gyro_time > last_gyro_time:
177 last_gyro_time = gyro_time
180 turn_count = turn_count + 1
187 cmd.angular.z = yaw_rate_cmd
188 self.cmd_vel_publisher.publish(cmd)
189 if scan_time > last_scan_time:
190 rospy.loginfo(
"Kobuki Testsuite: gyro, laser angle comparison [%s,%s]"%(gyro_angle - self.
_initial_gyro_offset, scan_angle))
191 last_scan_time = scan_time
193 gyro_timeout_count = gyro_timeout_count + 1
194 if gyro_timeout_count > 50:
195 rospy.logerr(
"Kobuki Testsuite: no gyro data for a long time.")
198 rospy.loginfo(
"Kobuki Testsuite: aligning.....")
202 if not rospy.is_shutdown():
205 self.cmd_vel_publisher.publish(cmd)
219 if no_data_count == 40:
221 if self.
_stop or rospy.is_shutdown():
227 self.cmd_vel_publisher.publish(cmd)
229 if math.fabs(angle) < 0.05:
236 self.cmd_vel_publisher.publish(cmd)
252 gyro_scan_angle = ScanAngle()
253 gyro_scan_angle.header = msg.header
255 self._gyro_scan_angle_publisher.publish(gyro_scan_angle)
257 error_scan_angle = ScanAngle()
258 error_scan_angle.header = msg.header
261 self._error_scan_angle_publisher.publish(error_scan_angle)
def __init__(self, scan_topic, scan_angle_topic)
def scan_callback(self, msg)
_laser_scan_angle_subscriber
_inital_wall_angle
Parameters.
_laser_scan_angle_publisher
def scan_callback(self, msg)
def gyro_callback(self, msg)
Ros Callbacks.
_error_scan_angle_publisher
_gyro_scan_angle_publisher
def quat_to_angle(quat)
Utilities.
def __init__(self, laser_scan_angle_topic, gyro_scan_angle_topic, error_scan_angle_topic, cmd_vel_topic, gyro_topic)
def init(self, min_angle=-0.3, max_angle=0.3)