1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28 import errno
29
30 import rospy
31
32 import baxter_dataflow
33
34 from baxter_core_msgs.msg import (
35 DigitalIOState,
36 DigitalOutputCommand,
37 )
41 """
42 Interface class for a simple Digital Input and/or Output on the
43 Baxter robot
44
45 Input
46 - read input state
47 Output
48 - turn output On/Off
49 - read current output state
50 """
52 """
53 Constructor.
54
55 @param component_id: unique id of the digital component
56 """
57 self._id = component_id
58 self._component_type = 'digital_io'
59 self._is_output = False
60 self._state = None
61
62 self.state_changed = baxter_dataflow.Signal()
63
64 type_ns = '/robot/' + self._component_type
65 topic_base = type_ns + '/' + self._id
66
67 self._sub_state = rospy.Subscriber(
68 topic_base + '/state',
69 DigitalIOState,
70 self._on_io_state)
71
72 baxter_dataflow.wait_for(
73 lambda: self._state != None,
74 timeout=2.0,
75 timeout_msg="Failed to get current digital_io state from %s" \
76 % (topic_base,),
77 )
78
79
80 if self._is_output:
81 self._pub_output = rospy.Publisher(
82 type_ns + '/command',
83 DigitalOutputCommand,
84 queue_size=10)
85
87 """
88 Updates the internally stored state of the Digital Input/Output.
89 """
90 new_state = (msg.state == DigitalIOState.PRESSED)
91 if self._state is None:
92 self._is_output = not msg.isInputOnly
93 old_state = self._state
94 self._state = new_state
95
96
97 if old_state is not None and old_state != new_state:
98 self.state_changed(new_state)
99
100 @property
102 """
103 Accessor to check if IO is capable of output.
104 """
105 return self._is_output
106
107 @property
109 """
110 Current state of the Digital Input/Output.
111 """
112 return self._state
113
114 @state.setter
116 """
117 Control the state of the Digital Output. (is_output must be True)
118
119 @type value: bool
120 @param value: new state to output {True, False}
121 """
122 self.set_output(value)
123
125 """
126 Control the state of the Digital Output.
127
128 Use this function for finer control over the wait_for timeout.
129
130 @type value: bool
131 @param value: new state {True, False} of the Output.
132 @type timeout: float
133 @param timeout: Seconds to wait for the io to reflect command.
134 If 0, just command once and return. [0]
135 """
136 if not self._is_output:
137 raise IOError(errno.EACCES, "Component is not an output [%s: %s]" %
138 (self._component_type, self._id))
139 cmd = DigitalOutputCommand()
140 cmd.name = self._id
141 cmd.value = value
142 self._pub_output.publish(cmd)
143
144 if not timeout == 0:
145 baxter_dataflow.wait_for(
146 test=lambda: self.state == value,
147 timeout=timeout,
148 rate=100,
149 timeout_msg=("Failed to command digital io to: %r" % (value,)),
150 body=lambda: self._pub_output.publish(cmd)
151 )
152