5 from requests
import ConnectionError
7 from switchbot_ros.msg
import SwitchBotCommandAction
8 from switchbot_ros.msg
import SwitchBotCommandFeedback
9 from switchbot_ros.msg
import SwitchBotCommandResult
10 from switchbot_ros.msg
import Device
11 from switchbot_ros.msg
import DeviceArray
18 Control your switchbot with ROS and SwitchBot API 23 token = rospy.get_param(
'~token')
24 if os.path.exists(token):
25 with open(token)
as f:
26 self.
token = f.read().replace(
'\n',
'')
34 '~switch', SwitchBotCommandAction,
38 self.
pub = rospy.Publisher(
'~devices', DeviceArray, queue_size=1, latch=
True)
44 rospy.loginfo(
'Switchbot API Client initialized.')
46 except ConnectionError:
47 rospy.logwarn_once(
'Failed to connect to the switchbot server. The client would try connecting to it when subscribes the ActionGoal topic.')
52 while not rospy.is_shutdown():
63 device_list_str =
'Switchbot device list:\n' 65 self.
bots.device_list,
66 key=
lambda device: str(device[
'deviceName']))
67 for device
in device_list:
68 device_list_str +=
'Name: ' + str(device[
'deviceName'])
69 device_list_str +=
', Type: ' + str(device[
'deviceType'])
70 device_list_str +=
'\n' 71 rospy.loginfo(device_list_str)
78 self.
bots.device_list,
79 key=
lambda device: str(device[
'deviceName']))
80 for device
in device_list:
82 msg_device.name = str(device[
'deviceName'])
83 msg_device.type = str(device[
'deviceType'])
84 msg.devices.append(msg_device)
88 feedback = SwitchBotCommandFeedback()
89 result = SwitchBotCommandResult()
93 parameter, command_type = goal.parameter, goal.command_type
97 command_type =
'command' 101 feedback.status = str(
102 self.
bots.control_device(
103 command=goal.command,
105 command_type=command_type,
106 device_name=goal.device_name
108 except (DeviceError, SwitchBotAPIError, KeyError)
as e:
110 feedback.status = str(e)
113 self.
_as.publish_feedback(feedback)
115 result.done = success
116 self.
_as.set_succeeded(result)
119 if __name__ ==
'__main__':
120 rospy.init_node(
'switchbot')
def get_switchbot_client(self)
def publish_devices(self)
def execute_cb(self, goal)