eus_teleop_status_sounder.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 
3 import os
4 import rospkg
5 import rospy
6 
7 from jsk_topic_tools import ConnectionBasedTransport
8 
9 from eus_teleop.msg import EusTeleopStatusArray
10 from sound_play.msg import SoundRequest
11 
12 
13 class EusTeleopStatusSounder(ConnectionBasedTransport):
14 
15  def __init__(self):
16  super(EusTeleopStatusSounder, self).__init__()
17  self.pub = self.advertise('~output/sound', SoundRequest, queue_size=1)
18  self.enable = {'larm': False, 'rarm': False}
19  self.prev_enable = {'larm': None, 'rarm': None}
20  self.collision = {'larm': False, 'rarm': False}
21  self.track_error = {'larm': False, 'rarm': False}
22  self.hand_close = {'larm': False, 'rarm': False}
23  self.prev_hand_close = {'larm': None, 'rarm': None}
24  self.timer = rospy.Timer(rospy.Duration(0.5), self._timer_cb)
25  self.rospack = rospkg.RosPack()
26 
27  def subscribe(self):
28  self.status_sub = rospy.Subscriber(
29  '~input/status', EusTeleopStatusArray, self._status_cb)
30 
31  def unsubscribe(self):
32  self.status_sub.unregister()
33 
34  def _timer_cb(self, event):
35  larm_enable = self.enable['larm']
36  if self.prev_enable['larm'] is None:
37  larm_start = False
38  larm_stop = False
39  else:
40  larm_start = larm_enable and not self.prev_enable['larm']
41  larm_stop = not larm_enable and self.prev_enable['larm']
42  if self.prev_hand_close['larm'] is None:
43  lhand_open = False
44  lhand_close = False
45  else:
46  lhand_open = (not self.hand_close['larm']
47  and self.prev_hand_close['larm'])
48  lhand_close = (self.hand_close['larm']
49  and not self.prev_hand_close['larm'])
50  larm_collision = self.collision['larm'] if larm_enable else False
51  larm_track_error = self.track_error['larm'] if larm_enable else False
52 
53  rarm_enable = self.enable['rarm']
54  if self.prev_enable['rarm'] is None:
55  rarm_start = False
56  rarm_stop = False
57  else:
58  rarm_start = rarm_enable and not self.prev_enable['rarm']
59  rarm_stop = not rarm_enable and self.prev_enable['rarm']
60  if self.prev_hand_close['rarm'] is None:
61  rhand_open = False
62  rhand_close = False
63  else:
64  rhand_open = (not self.hand_close['rarm']
65  and self.prev_hand_close['rarm'])
66  rhand_close = (self.hand_close['rarm']
67  and not self.prev_hand_close['rarm'])
68  rarm_collision = self.collision['rarm'] if rarm_enable else False
69  rarm_collision = self.collision['rarm'] if rarm_enable else False
70  rarm_track_error = self.track_error['rarm'] if rarm_enable else False
71 
72  # reset
73  self.collision['larm'] = False
74  self.track_error['larm'] = False
75  self.collision['rarm'] = False
76  self.track_error['rarm'] = False
77  self.prev_hand_close = self.hand_close.copy()
78  self.prev_enable = self.enable.copy()
79 
80  # enable
81  if larm_start or rarm_start:
82  sound_msg = SoundRequest()
83  sound_msg.sound = SoundRequest.PLAY_FILE
84  sound_msg.command = SoundRequest.PLAY_ONCE
85  sound_msg.volume = 0.6
86  sound_msg.arg = os.path.join(
87  self.rospack.get_path('eus_teleop'), 'sounds/start.wav')
88  self.pub.publish(sound_msg)
89  rospy.sleep(1.0)
90  warning_msg = SoundRequest()
91  warning_msg.sound = SoundRequest.SAY
92  warning_msg.command = SoundRequest.PLAY_ONCE
93  warning_msg.volume = 0.6
94  if larm_start and rarm_start:
95  warning_msg.arg = "both arm starting"
96  elif larm_start:
97  warning_msg.arg = "left arm starting"
98  else:
99  warning_msg.arg = "right arm starting"
100  self.pub.publish(warning_msg)
101  elif larm_stop or rarm_stop:
102  sound_msg = SoundRequest()
103  sound_msg.sound = SoundRequest.PLAY_FILE
104  sound_msg.command = SoundRequest.PLAY_ONCE
105  sound_msg.volume = 0.6
106  sound_msg.arg = os.path.join(
107  self.rospack.get_path('eus_teleop'), 'sounds/stop.wav')
108  self.pub.publish(sound_msg)
109  rospy.sleep(1.0)
110  warning_msg = SoundRequest()
111  warning_msg.sound = SoundRequest.SAY
112  warning_msg.command = SoundRequest.PLAY_ONCE
113  warning_msg.volume = 0.6
114  if larm_stop and rarm_stop:
115  warning_msg.arg = "both arm stopping"
116  elif larm_stop:
117  warning_msg.arg = "left arm stopping"
118  else:
119  warning_msg.arg = "right arm stopping"
120  self.pub.publish(warning_msg)
121  # gripper opening
122  elif lhand_open or rhand_open:
123  sound_msg = SoundRequest()
124  sound_msg.sound = SoundRequest.PLAY_FILE
125  sound_msg.command = SoundRequest.PLAY_ONCE
126  sound_msg.volume = 0.6
127  sound_msg.arg = os.path.join(
128  self.rospack.get_path('eus_teleop'), 'sounds/gripper.wav')
129  self.pub.publish(sound_msg)
130  rospy.sleep(1.0)
131  warning_msg = SoundRequest()
132  warning_msg.sound = SoundRequest.SAY
133  warning_msg.command = SoundRequest.PLAY_ONCE
134  warning_msg.volume = 0.6
135  if lhand_open and rhand_open:
136  warning_msg.arg = "both hand opening"
137  elif lhand_open:
138  warning_msg.arg = "left hand opening"
139  else:
140  warning_msg.arg = "right hand opening"
141  self.pub.publish(warning_msg)
142  rospy.sleep(1.0)
143  # gripper closing
144  elif lhand_close or rhand_close:
145  sound_msg = SoundRequest()
146  sound_msg.sound = SoundRequest.PLAY_FILE
147  sound_msg.command = SoundRequest.PLAY_ONCE
148  sound_msg.volume = 0.6
149  sound_msg.arg = os.path.join(
150  self.rospack.get_path('eus_teleop'), 'sounds/gripper.wav')
151  self.pub.publish(sound_msg)
152  rospy.sleep(1.0)
153  warning_msg = SoundRequest()
154  warning_msg.sound = SoundRequest.SAY
155  warning_msg.command = SoundRequest.PLAY_ONCE
156  warning_msg.volume = 0.6
157  if lhand_close and rhand_close:
158  warning_msg.arg = "both hand closing"
159  elif lhand_close:
160  warning_msg.arg = "left hand closing"
161  else:
162  warning_msg.arg = "right hand closing"
163  self.pub.publish(warning_msg)
164  rospy.sleep(1.0)
165  # collision and tracking error
166  elif larm_collision or rarm_collision:
167  sound_msg = SoundRequest()
168  sound_msg.sound = SoundRequest.PLAY_FILE
169  sound_msg.command = SoundRequest.PLAY_ONCE
170  sound_msg.volume = 0.6
171  sound_msg.arg = os.path.join(
172  self.rospack.get_path('eus_teleop'), 'sounds/alert.wav')
173  self.pub.publish(sound_msg)
174  rospy.sleep(1.0)
175  warning_msg = SoundRequest()
176  warning_msg.sound = SoundRequest.SAY
177  warning_msg.command = SoundRequest.PLAY_ONCE
178  warning_msg.volume = 0.6
179  if larm_collision and rarm_collision:
180  warning_msg.arg = "both arm collision error"
181  elif larm_collision:
182  warning_msg.arg = "left arm collision error"
183  else:
184  warning_msg.arg = "right arm collision error"
185  self.pub.publish(warning_msg)
186  rospy.sleep(2.0)
187  elif larm_track_error or rarm_track_error:
188  sound_msg = SoundRequest()
189  sound_msg.sound = SoundRequest.PLAY_FILE
190  sound_msg.command = SoundRequest.PLAY_ONCE
191  sound_msg.volume = 0.6
192  sound_msg.arg = os.path.join(
193  self.rospack.get_path('eus_teleop'), 'sounds/warn.wav')
194  self.pub.publish(sound_msg)
195  rospy.sleep(1.0)
196  warning_msg = SoundRequest()
197  warning_msg.sound = SoundRequest.SAY
198  warning_msg.command = SoundRequest.PLAY_ONCE
199  warning_msg.volume = 0.6
200  if larm_track_error and rarm_track_error:
201  warning_msg.arg = "both arm tracking error"
202  elif larm_track_error:
203  warning_msg.arg = "left arm tracking error"
204  else:
205  warning_msg.arg = "right arm tracking error"
206  self.pub.publish(warning_msg)
207  rospy.sleep(2.0)
208 
209  def _status_cb(self, msg):
210  for status in msg.status:
211  if status.part_name in self.enable:
212  self.enable[status.part_name] = status.enable
213  if self.prev_enable[status.part_name] is None:
214  self.prev_enable[status.part_name] = status.enable
215  if status.part_name in self.collision:
216  self.collision[status.part_name] = status.collision
217  if status.part_name in self.track_error:
218  self.track_error[status.part_name] = status.track_error
219  if status.part_name in self.hand_close:
220  self.hand_close[status.part_name] = status.hand_close
221  if self.prev_hand_close[status.part_name] is None:
222  self.prev_hand_close[status.part_name] = status.hand_close
223 
224 
225 if __name__ == '__main__':
226  rospy.init_node('eus_teleop_status_sounder')
228  rospy.spin()
eus_teleop_status_sounder.EusTeleopStatusSounder
Definition: eus_teleop_status_sounder.py:13
eus_teleop_status_sounder.EusTeleopStatusSounder.unsubscribe
def unsubscribe(self)
Definition: eus_teleop_status_sounder.py:31
eus_teleop_status_sounder.EusTeleopStatusSounder.timer
timer
Definition: eus_teleop_status_sounder.py:24
eus_teleop_status_sounder.EusTeleopStatusSounder.subscribe
def subscribe(self)
Definition: eus_teleop_status_sounder.py:27
eus_teleop_status_sounder.EusTeleopStatusSounder.prev_hand_close
prev_hand_close
Definition: eus_teleop_status_sounder.py:23
eus_teleop_status_sounder.EusTeleopStatusSounder.__init__
def __init__(self)
Definition: eus_teleop_status_sounder.py:15
eus_teleop_status_sounder.EusTeleopStatusSounder.track_error
track_error
Definition: eus_teleop_status_sounder.py:21
eus_teleop_status_sounder.EusTeleopStatusSounder.collision
collision
Definition: eus_teleop_status_sounder.py:20
eus_teleop_status_sounder.EusTeleopStatusSounder.enable
enable
Definition: eus_teleop_status_sounder.py:18
eus_teleop_status_sounder.EusTeleopStatusSounder.status_sub
status_sub
Definition: eus_teleop_status_sounder.py:28
eus_teleop_status_sounder.EusTeleopStatusSounder.prev_enable
prev_enable
Definition: eus_teleop_status_sounder.py:19
eus_teleop_status_sounder.EusTeleopStatusSounder._timer_cb
def _timer_cb(self, event)
Definition: eus_teleop_status_sounder.py:34
eus_teleop_status_sounder.EusTeleopStatusSounder._status_cb
def _status_cb(self, msg)
Definition: eus_teleop_status_sounder.py:209
eus_teleop_status_sounder.EusTeleopStatusSounder.hand_close
hand_close
Definition: eus_teleop_status_sounder.py:22
eus_teleop_status_sounder.EusTeleopStatusSounder.rospack
rospack
Definition: eus_teleop_status_sounder.py:25
eus_teleop_status_sounder.EusTeleopStatusSounder.pub
pub
Definition: eus_teleop_status_sounder.py:17


eus_teleop
Author(s): Shingo Kitagawa
autogenerated on Mon Dec 9 2024 04:10:50