controller_ros.py
Go to the documentation of this file.
1 from .controller import Controller
2 
3 import rospy
4 from sensor_msgs.msg import BatteryState
5 from sensor_msgs.msg import Joy
6 from sensor_msgs.msg import JoyFeedback
7 from sensor_msgs.msg import JoyFeedbackArray
8 from sensor_msgs.msg import Imu
9 from ds4_driver.msg import Feedback
10 from ds4_driver.msg import Report
11 from ds4_driver.msg import Status
12 
13 import copy
14 import math
15 
16 
17 class ControllerRos(Controller):
18  def __init__(self):
19  super(ControllerRos, self).__init__()
20 
21  self.use_standard_msgs = rospy.get_param('~use_standard_msgs', False)
22  self.deadzone = rospy.get_param('~deadzone', 0.1)
23  self.frame_id = rospy.get_param('~frame_id', 'ds4')
24  self.imu_frame_id = rospy.get_param('~imu_frame_id', 'ds4_imu')
25  # Only publish Joy messages on change
26  self._autorepeat_rate = rospy.get_param('~autorepeat_rate', 0)
27  self._prev_joy = None
28 
29  # Use ROS-standard messages (like sensor_msgs/Joy)
30  if self.use_standard_msgs:
31  self.pub_report = rospy.Publisher('raw_report', Report, queue_size=1)
32  self.pub_battery = rospy.Publisher('battery', BatteryState, queue_size=1)
33  self.pub_joy = rospy.Publisher('joy', Joy, queue_size=1)
34  self.pub_imu = rospy.Publisher('imu', Imu, queue_size=1)
35  self.sub_feedback = rospy.Subscriber('set_feedback', JoyFeedbackArray, self.cb_joy_feedback, queue_size=1)
36 
37  if self._autorepeat_rate != 0:
38  period = 1.0 / self._autorepeat_rate
39  rospy.Timer(rospy.Duration.from_sec(period), self.cb_joy_pub_timer)
40  else:
41  self.pub_status = rospy.Publisher('status', Status, queue_size=1)
42  self.sub_feedback = rospy.Subscriber('set_feedback', Feedback, self.cb_feedback, queue_size=1)
43 
44  def cb_report(self, report):
45  """
46  Callback method for ds4drv event loop
47  :param report:
48  :return:
49  """
50  report_msg = Report()
51  report_msg.header.frame_id = self.frame_id
52  report_msg.header.stamp = rospy.Time.now()
53  for attr in dir(report):
54  if attr.startswith('_'):
55  continue
56  if hasattr(report_msg, attr):
57  val = getattr(report, attr)
58  setattr(report_msg, attr, val)
59  # Fix (potentially) incorrect data reported from device
60  imu_data = Controller.get_imu_data(report)
61  report_msg.lin_acc_x = imu_data['lin_acc']['x']
62  report_msg.lin_acc_y = imu_data['lin_acc']['y']
63  report_msg.lin_acc_z = imu_data['lin_acc']['z']
64  report_msg.ang_vel_x = imu_data['ang_vel']['x']
65  report_msg.ang_vel_y = imu_data['ang_vel']['y']
66  report_msg.ang_vel_z = imu_data['ang_vel']['z']
67 
68  status_msg = self._report_to_status_(report_msg, self.deadzone)
69  status_msg.imu.header.frame_id = self.imu_frame_id
70 
71  if self.use_standard_msgs:
72  battery_msg = self._status_to_battery_(status_msg)
73  joy_msg = self._status_to_joy_(status_msg)
74  imu_msg = self._status_to_imu_(status_msg)
75  self.pub_report.publish(report_msg)
76  self.pub_battery.publish(battery_msg)
77  if self._prev_joy is None \
78  or joy_msg.axes != self._prev_joy.axes \
79  or joy_msg.buttons != self._prev_joy.buttons:
80  self.pub_joy.publish(joy_msg)
81  self.pub_imu.publish(imu_msg)
82 
83  self._prev_joy = joy_msg
84  else:
85  self.pub_status.publish(status_msg)
86 
87  def cb_feedback(self, msg):
88  """
89  Callback method for ds4_driver/Feedback
90  :param msg:
91  :type msg: Feedback
92  :return:
93  """
94  if self.device is None:
95  return
96 
97  def to_int(v): return int(v * 255)
98  self.control(
99  # LED color
100  led_red=to_int(msg.led_r) if msg.set_led else None,
101  led_green=to_int(msg.led_g) if msg.set_led else None,
102  led_blue=to_int(msg.led_b) if msg.set_led else None,
103  # Rumble
104  rumble_small=to_int(msg.rumble_small) if msg.set_rumble else None,
105  rumble_big=to_int(msg.rumble_big) if msg.set_rumble else None,
106  # Flash LED
107  flash_on=to_int(msg.led_flash_on / 2.5) if msg.set_led_flash else None,
108  flash_off=to_int(msg.led_flash_off / 2.5) if msg.set_led_flash else None,
109  )
110 
111  # Timer to stop rumble
112  if msg.set_rumble and msg.rumble_duration != 0:
113  rospy.Timer(rospy.Duration(msg.rumble_duration),
114  self.cb_stop_rumble,
115  oneshot=True)
116 
117  def cb_stop_rumble(self, event):
118  try:
119  self.control(rumble_small=0, rumble_big=0)
120  except AttributeError:
121  # The program exited and self.device was set to None
122  pass
123 
124  def cb_joy_feedback(self, msg):
125  """
126  Callback method for sensor_msgs/JoyFeedbackArray
127  The message contains the following feedback:
128  LED0: red
129  LED1: green
130  LED2: blue
131  RUMBLE0: rumble small
132  RUMBLE1: rumble big
133  :param msg:
134  :type msg: JoyFeedbackArray
135  :return:
136  """
137  feedback = Feedback()
138  for jf in msg.array:
139  if jf.type == JoyFeedback.TYPE_LED:
140  feedback.set_led = True
141  if jf.id == 0:
142  feedback.led_r = jf.intensity
143  elif jf.id == 1:
144  feedback.led_g = jf.intensity
145  elif jf.id == 2:
146  feedback.led_b = jf.intensity
147  elif jf.type == JoyFeedback.TYPE_RUMBLE:
148  feedback.set_rumble = True
149  if jf.id == 0:
150  feedback.rumble_small = jf.intensity
151  elif jf.id == 1:
152  feedback.rumble_big = jf.intensity
153 
154  self.cb_feedback(feedback)
155 
156  def cb_joy_pub_timer(self, _):
157  if self._prev_joy is not None:
158  self.pub_joy.publish(self._prev_joy)
159 
160  @staticmethod
161  def _report_to_status_(report_msg, deadzone=0.05):
162  status_msg = Status()
163  status_msg.header = copy.deepcopy(report_msg.header)
164 
165  # Sticks (signs are flipped for consistency with other joypads)
166  status_msg.axis_left_x = -ControllerRos._normalize_axis_(report_msg.left_analog_x, deadzone)
167  status_msg.axis_left_y = -ControllerRos._normalize_axis_(report_msg.left_analog_y, deadzone)
168  status_msg.axis_right_x = -ControllerRos._normalize_axis_(report_msg.right_analog_x, deadzone)
169  status_msg.axis_right_y = -ControllerRos._normalize_axis_(report_msg.right_analog_y, deadzone)
170 
171  # Shoulder buttons
172  status_msg.axis_l2 = report_msg.l2_analog / 255.0
173  status_msg.axis_r2 = report_msg.r2_analog / 255.0
174 
175  # Buttons
176  status_msg.button_dpad_up = report_msg.dpad_up
177  status_msg.button_dpad_down = report_msg.dpad_down
178  status_msg.button_dpad_left = report_msg.dpad_left
179  status_msg.button_dpad_right = report_msg.dpad_right
180  plug_attrs = [attr for attr in dir(report_msg) if attr.startswith('button_')]
181  for attr in plug_attrs:
182  val = getattr(report_msg, attr)
183  setattr(status_msg, attr, val)
184 
185  # IMU (X: right, Y: up, Z: towards user)
186  status_msg.imu.header = copy.deepcopy(status_msg.header)
187  # To m/s^2: 0.98 mg/LSB (BMI055 data sheet Chapter 5.2.1)
188  def to_mpss(v): return float(v) / (2**13 - 1) * 9.80665 * 0.98
189  # To rad/s: 32767: 2000 deg/s (BMI055 data sheet Chapter 7.2.1)
190  def to_radps(v): return float(v) / (2**15 - 1) * math.pi / 180 * 2000
191  # Convert
192  status_msg.imu.linear_acceleration.x = to_mpss(report_msg.lin_acc_x)
193  status_msg.imu.linear_acceleration.y = to_mpss(report_msg.lin_acc_y)
194  status_msg.imu.linear_acceleration.z = to_mpss(report_msg.lin_acc_z)
195  status_msg.imu.angular_velocity.x = to_radps(report_msg.ang_vel_x)
196  status_msg.imu.angular_velocity.y = to_radps(report_msg.ang_vel_y)
197  status_msg.imu.angular_velocity.z = to_radps(report_msg.ang_vel_z)
198  # No orientation reported
199  status_msg.imu.orientation_covariance[0] = -1
200 
201  # Trackpads
202  status_msg.touch0.id = report_msg.trackpad_touch0_id
203  status_msg.touch0.active = report_msg.trackpad_touch0_active
204  status_msg.touch0.x = report_msg.trackpad_touch0_x / float(Controller.TOUCHPAD_MAX_X)
205  status_msg.touch0.y = report_msg.trackpad_touch0_y / float(Controller.TOUCHPAD_MAX_Y)
206  status_msg.touch1.id = report_msg.trackpad_touch1_id
207  status_msg.touch1.active = report_msg.trackpad_touch1_active
208  status_msg.touch1.x = report_msg.trackpad_touch1_x / float(Controller.TOUCHPAD_MAX_X)
209  status_msg.touch1.y = report_msg.trackpad_touch1_y / float(Controller.TOUCHPAD_MAX_Y)
210 
211  # Battery
212  if report_msg.battery == Controller.BATTERY_FULL_CHARGING:
213  status_msg.battery_full_charging = True
214  status_msg.battery_percentage = 1.0
215  else:
216  status_msg.battery_full_charging = False
217  status_msg.battery_percentage = float(report_msg.battery) / Controller.BATTERY_MAX
218 
219  # Plugs
220  plug_attrs = [attr for attr in dir(report_msg) if attr.startswith('plug_')]
221  for attr in plug_attrs:
222  val = getattr(report_msg, attr)
223  setattr(status_msg, attr, val)
224 
225  return status_msg
226 
227  @staticmethod
228  def _normalize_axis_(val, deadzone=0.0):
229  """
230  Convert a value of [0, 255] to [-1.0, 1.0]
231  :param val:
232  :param deadzone:
233  :return:
234  """
235  norm_val = 2 * (val - 127.5) / 255
236  if abs(norm_val) < deadzone:
237  return 0.0
238  else:
239  return norm_val
240 
241  @staticmethod
242  def _status_to_joy_(status):
243  """
244  Converts a ds4_driver/Status message to sensor_msgs/Joy
245  :param status:
246  :type status: Status
247  :return:
248  """
249  msg = Joy()
250  msg.header = copy.deepcopy(status.header)
251  msg.axes = [
252  status.axis_left_x,
253  status.axis_left_y,
254  status.axis_right_x,
255  status.axis_right_y,
256  status.axis_l2,
257  status.axis_r2,
258  ]
259  msg.buttons = [
260  status.button_square,
261  status.button_triangle,
262  status.button_circle,
263  status.button_cross,
264  status.button_l1,
265  status.button_l2,
266  status.button_r1,
267  status.button_r2,
268  status.button_share,
269  status.button_options,
270  status.button_ps,
271  status.button_trackpad,
272  status.button_l3,
273  status.button_r3,
274  status.button_dpad_left,
275  status.button_dpad_up,
276  status.button_dpad_right,
277  status.button_dpad_down,
278  ]
279  return msg
280 
281  @staticmethod
282  def _status_to_battery_(status):
283  """
284  Converts a ds4_driver/Status to sensor_msgs/BatteryState
285  Reference: https://www.psdevwiki.com/ps4/DualShock_4#Specifications
286  :param status:
287  :type status: Status
288  :return:
289  """
290  msg = BatteryState()
291  msg.header = status.header
292  msg.percentage = status.battery_percentage
293  msg.voltage = Controller.MAX_VOLTAGE * msg.percentage
294  msg.current = float('NaN')
295  msg.charge = float('NaN')
296  msg.capacity = float('NaN')
297  msg.design_capacity = 1.0
298  if not status.plug_usb:
299  msg.power_supply_status = BatteryState.POWER_SUPPLY_STATUS_NOT_CHARGING
300  elif not status.battery_full_charging:
301  msg.power_supply_status = BatteryState.POWER_SUPPLY_STATUS_CHARGING
302  elif status.battery_full_charging:
303  msg.power_supply_status = BatteryState.POWER_SUPPLY_STATUS_FULL
304  msg.power_supply_technology = BatteryState.POWER_SUPPLY_TECHNOLOGY_LION
305  return msg
306 
307  @staticmethod
308  def _status_to_imu_(status):
309  """
310  Converts a ds4_driver/Status to sensor_msgs/Imu
311  :param status:
312  :type status: Status
313  :return:
314  """
315  return status.imu
316 
def _report_to_status_(report_msg, deadzone=0.05)


ds4_driver
Author(s): Naoki Mizuno
autogenerated on Fri May 1 2020 03:25:46