switchbot_ros_server.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 
3 import actionlib
4 import os.path
5 from requests import ConnectionError
6 import rospy
7 from switchbot_ros.msg import SwitchBotCommandAction
8 from switchbot_ros.msg import SwitchBotCommandFeedback
9 from switchbot_ros.msg import SwitchBotCommandResult
10 from switchbot_ros.msg import Device
11 from switchbot_ros.msg import DeviceArray
12 from switchbot_ros.switchbot import SwitchBotAPIClient
13 from switchbot_ros.switchbot import DeviceError, SwitchBotAPIError
14 
15 
17  """
18  Control your switchbot with ROS and SwitchBot API
19  """
20  def __init__(self):
21  # SwitchBot configs
22  # '~token' can be file path or raw characters
23  token = rospy.get_param('~token')
24  if os.path.exists(token):
25  with open(token) as f:
26  self.token = f.read().replace('\n', '')
27  else:
28  self.token = token
29  # Initialize switchbot client
30  self.bots = self.get_switchbot_client()
31  self.print_devices()
32  # Actionlib
34  '~switch', SwitchBotCommandAction,
35  execute_cb=self.execute_cb, auto_start=False)
36  self._as.start()
37  # Topic
38  self.pub = rospy.Publisher('~devices', DeviceArray, queue_size=1, latch=True)
39  self.published = False
40 
42  try:
43  client = SwitchBotAPIClient(token=self.token)
44  rospy.loginfo('Switchbot API Client initialized.')
45  return client
46  except ConnectionError: # If the machine is not connected to the internet
47  rospy.logwarn_once('Failed to connect to the switchbot server. The client would try connecting to it when subscribes the ActionGoal topic.')
48  return None
49 
50  def spin(self):
51  rate = rospy.Rate(1)
52  while not rospy.is_shutdown():
53  rate.sleep()
54  if self.bots is None:
55  self.bots = self.get_switchbot_client()
56  elif not self.published:
57  self.publish_devices()
58  self.published = True
59 
60  def print_devices(self):
61  if self.bots is None:
62  return
63  device_list_str = 'Switchbot device list:\n'
64  device_list = sorted(
65  self.bots.device_list,
66  key=lambda device: str(device['deviceName']))
67  for device in device_list:
68  device_list_str += 'Name: ' + str(device['deviceName'])
69  device_list_str += ', Type: ' + str(device['deviceType'])
70  device_list_str += '\n'
71  rospy.loginfo(device_list_str)
72 
73  def publish_devices(self):
74  if self.bots is None:
75  return
76  msg = DeviceArray()
77  device_list = sorted(
78  self.bots.device_list,
79  key=lambda device: str(device['deviceName']))
80  for device in device_list:
81  msg_device = Device()
82  msg_device.name = str(device['deviceName'])
83  msg_device.type = str(device['deviceType'])
84  msg.devices.append(msg_device)
85  self.pub.publish(msg)
86 
87  def execute_cb(self, goal):
88  feedback = SwitchBotCommandFeedback()
89  result = SwitchBotCommandResult()
90  r = rospy.Rate(1)
91  success = True
92  # start executing the action
93  parameter, command_type = goal.parameter, goal.command_type
94  if not parameter:
95  parameter = 'default'
96  if not command_type:
97  command_type = 'command'
98  try:
99  if not self.bots:
100  self.bots = SwitchBotAPIClient(token=self.token)
101  feedback.status = str(
102  self.bots.control_device(
103  command=goal.command,
104  parameter=parameter,
105  command_type=command_type,
106  device_name=goal.device_name
107  ))
108  except (DeviceError, SwitchBotAPIError, KeyError) as e:
109  rospy.logerr(str(e))
110  feedback.status = str(e)
111  success = False
112  finally:
113  self._as.publish_feedback(feedback)
114  r.sleep()
115  result.done = success
116  self._as.set_succeeded(result)
117 
118 
119 if __name__ == '__main__':
120  rospy.init_node('switchbot')
121  server = SwitchBotAction()
122  server.spin()


switchbot_ros
Author(s): Yoshiki Obinata
autogenerated on Sat Jun 24 2023 02:40:43