1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28 import errno
29
30 import rospy
31
32 import baxter_dataflow
33
34 from baxter_core_msgs.msg import (
35 AnalogIOState,
36 AnalogOutputCommand,
37 )
38
39
41 """
42 Interface class for a simple Analog Input and/or Output on the
43 Baxter robot.
44
45 Input
46 - read input state
47 Output
48 - set new output state
49 - read current output state
50 """
52 """
53 Constructor.
54
55 @param component_id: unique id of analog component
56 """
57 self._id = component_id
58 self._component_type = 'analog_io'
59 self._is_output = False
60
61 self._state = dict()
62
63 type_ns = '/robot/' + self._component_type
64 topic_base = type_ns + '/' + self._id
65
66 self._sub_state = rospy.Subscriber(
67 topic_base + '/state',
68 AnalogIOState,
69 self._on_io_state)
70
71 baxter_dataflow.wait_for(
72 lambda: len(self._state.keys()) != 0,
73 timeout=2.0,
74 timeout_msg="Failed to get current analog_io state from %s" \
75 % (topic_base,),
76 )
77
78
79 if self._is_output:
80 self._pub_output = rospy.Publisher(
81 type_ns + '/command',
82 AnalogOutputCommand,
83 queue_size=10)
84
86 """
87 Updates the internally stored state of the Analog Input/Output.
88 """
89 self._is_output = not msg.isInputOnly
90 self._state['value'] = msg.value
91
93 """
94 Return the latest value of the Analog Input/Output.
95 """
96 return self._state['value']
97
99 """
100 Accessor to check if IO is capable of output.
101 """
102 return self._is_output
103
105 """
106 Control the state of the Analog Output.
107
108 @type value: uint16
109 @param value: new state of the Output.
110 @type timeout: float
111 @param timeout: Seconds to wait for the io to reflect command.
112 If 0, just command once and return. [0]
113 """
114 if not self._is_output:
115 raise IOError(errno.EACCES, "Component is not an output [%s: %s]" %
116 (self._component_type, self._id))
117 cmd = AnalogOutputCommand()
118 cmd.name = self._id
119 cmd.value = value
120 self._pub_output.publish(cmd)
121
122 if not timeout == 0:
123 baxter_dataflow.wait_for(
124 test=lambda: self.state() == value,
125 timeout=timeout,
126 rate=100,
127 timeout_msg=("Failed to command analog io to: %d" % (value,)),
128 body=lambda: self._pub_output.publish(cmd)
129 )
130