motion_wander.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 #
3 # License: BSD
4 # https://raw.github.com/yujinrobot/kobuki/hydro-devel/kobuki_testsuite/LICENSE
5 #
6 ##############################################################################
7 # Python Imports
8 ##############################################################################
9 import random
10 from math import degrees, radians
11 
12 ##############################################################################
13 # Ros Imports
14 ##############################################################################
15 import rospy
16 from tf.transformations import euler_from_quaternion
17 from geometry_msgs.msg import Twist
18 from nav_msgs.msg import Odometry
19 from kobuki_msgs.msg import BumperEvent, CliffEvent
20 
21 ##############################################################################
22 # Local imports
23 ##############################################################################
24 import utils
25 
26 '''
27  Implements a safe wandering random motion using bump and cliff sensors.
28 
29  API:
30  init(speed,distance) : (re)initialise parameters
31  stop() - stop.
32  execute() - pass this to a thread to run
33  shutdown() - cleanup
34 
35  @param topic names
36  @type strings
37 
38 '''
39 class SafeWandering(object):
40  '''
41  Initialise everything, then starts with start()
42 
43  API:
44  start() - start to wander.
45  stop() - stop wandering.
46  set_vels(lin_xvel,stepback_xvel,ang_zvel)
47 
48  @param topic names
49  @type strings
50  '''
51  def __init__(self, cmd_vel_topic, odom_topic, bumper_topic, cliff_topic ):
52 
53  self.bumper_subscriber = rospy.Subscriber(bumper_topic, BumperEvent, self.bumper_event_callback)
54  self.cliff_subscriber = rospy.Subscriber(cliff_topic, CliffEvent, self.cliff_event_callback)
55  self.odom_subscriber = rospy.Subscriber(odom_topic, Odometry, self.odometry_callback)
56  self.cmd_vel_publisher = rospy.Publisher(cmd_vel_topic, Twist, queue_size=10)
57  self.rate = rospy.Rate(50)
58 
59  self.ok = True
60  self.theta = 0.0
61  self.theta_goal = 0.0
62  self._stop = False
63  self._running = False
64 
65  self._lin_xvel = 0.18
66  self._stepback_xvel = -0.1
67  self._ang_zvel = 1.8
68 
69  def init(self, speed, stepback_speed, angular_speed):
70  self._lin_xvel = speed
71  self._stepback_xvel = stepback_speed
72  self._ang_zvel = angular_speed
73 
74 
75  def shutdown(self):
76  self.stop()
77  while self._running:
78  self.rate.sleep()
79  #self.cmd_vel_publisher.unregister() This one creates an error for some reason, probably because climbing_frame already unregisters.
80  self.odom_subscriber.unregister()
81  self.bumper_subscriber.unregister()
82  self.cliff_subscriber.unregister()
83 
84  def command(self, twist):
85  '''
86  Lowest level of command - i.e. this is where the fine grained looping happens
87  so we check for aborts here. Don't waste time doing anything if this is the case.
88  '''
89  if self._stop or rospy.is_shutdown():
90  return False
91  self.cmd_vel_publisher.publish(twist)
92  self.rate.sleep()
93  return True
94 
95  def go(self):
96  twist = Twist()
97  while self.ok:
98  twist.linear.x = self._lin_xvel
99  if not self.command(twist):
100  return False
101  return True
102 
103  def stepback(self):
104  twist = Twist()
105  for i in range(0,35):
106  twist.linear.x = self._stepback_xvel
107  if not self.command(twist):
108  return False
109  return True
110 
111  def turn(self):
112  twist = Twist()
113  while not self.reached():
114  twist.angular.z = self._ang_zvel * utils.sign(utils.wrap_to_pi(self.theta_goal - self.theta))
115  if not self.command(twist):
116  return False
117  self.ok = True
118  return True
119 
120  def reached(self):
121  if abs(utils.wrap_to_pi(self.theta_goal - self.theta)) < radians(5.0):
122  return True
123  else:
124  return False
125 
126  def stop(self):
127  self._stop = True
128 
129  def execute(self):
130  if self._running:
131  rospy.logerr("Kobuki TestSuite: already executing wandering, ignoring the request")
132  return
133  self._stop = False
134  self._running = True
135  while True:
136  if not self.go() or not self.stepback() or not self.turn():
137  break
138  self._running = False
139  if not rospy.is_shutdown():
140  cmd = Twist()
141  cmd.linear.x = 0.0
142  self.cmd_vel_publisher.publish(cmd)
143 
144  ##########################################################################
145  # Callbacks
146  ##########################################################################
147 
148  def odometry_callback(self, data):
149  quat = data.pose.pose.orientation
150  q = [quat.x, quat.y, quat.z, quat.w]
151  roll, pitch, yaw = euler_from_quaternion(q)
152  self.theta = yaw
153 
154 
155  def bumper_event_callback(self, data):
156  if data.state == BumperEvent.PRESSED:
157  self.ok = False
158  if data.bumper == BumperEvent.LEFT:
159  self.theta_goal = self.theta - 3.141592*random.uniform(0.2, 1.0)
160  elif data.bumper == BumperEvent.RIGHT:
161  self.theta_goal = self.theta + 3.141592*random.uniform(0.2, 1.0)
162  else:
163  self.theta_goal = utils.wrap_to_pi(self.theta + 3.141592*random.uniform(-1.0, 1.0))
164 
165  def cliff_event_callback(self, data):
166  if data.state == CliffEvent.CLIFF:
167  self.ok = False
168  # print "Cliff event: %s,%s"%(str(data.sensor),str(data.state))
169  if data.sensor == CliffEvent.LEFT:
170  self.theta_goal = self.theta - 3.141592*random.uniform(0.2, 1.0)
171  elif data.sensor == CliffEvent.RIGHT:
172  self.theta_goal = self.theta + 3.141592*random.uniform(0.2, 1.0)
173  else:
174  self.theta_goal = utils.wrap_to_pi(self.theta + 3.141592*random.uniform(-1.0, 1.0))
175 
def init(self, speed, stepback_speed, angular_speed)
def odometry_callback(self, data)
Callbacks.
def __init__(self, cmd_vel_topic, odom_topic, bumper_topic, cliff_topic)


kobuki_testsuite
Author(s): Jorge Santos Simon
autogenerated on Mon Jun 10 2019 13:45:22