1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28 import errno
29
30 import rospy
31
32 import baxter_dataflow
33
34 from baxter_core_msgs.msg import (
35 DigitalIOState,
36 DigitalOutputCommand,
37 )
41 """
42 Interface class for a simple Digital Input and/or Output on the
43 Baxter robot
44
45 Input
46 - read input state
47 Output
48 - turn output On/Off
49 - read current output state
50 """
52 """
53 Constructor.
54
55 @param component_id: unique id of the digital component
56 """
57 self._id = component_id
58 self._component_type = 'digital_io'
59 self._is_output = False
60 self._state = None
61
62 self.state_changed = baxter_dataflow.Signal()
63
64 type_ns = '/robot/' + self._component_type
65 topic_base = type_ns + '/' + self._id
66
67 self._sub_state = rospy.Subscriber(
68 topic_base + '/state',
69 DigitalIOState,
70 self._on_io_state)
71
72 baxter_dataflow.wait_for(
73 lambda: self._state != None,
74 timeout=2.0,
75 timeout_msg="Failed to get current digital_io state from %s" \
76 % (topic_base,),
77 )
78
79
80 if self._is_output:
81 self._pub_output = rospy.Publisher(
82 type_ns + '/command',
83 DigitalOutputCommand)
84
86 """
87 Updates the internally stored state of the Digital Input/Output.
88 """
89 new_state = (msg.state == DigitalIOState.PRESSED)
90 if self._state is None:
91 self._is_output = not msg.isInputOnly
92 old_state = self._state
93 self._state = new_state
94
95
96 if old_state is not None and old_state != new_state:
97 self.state_changed(new_state)
98
99 @property
101 """
102 Accessor to check if IO is capable of output.
103 """
104 return self._is_output
105
106 @property
108 """
109 Current state of the Digital Input/Output.
110 """
111 return self._state
112
113 @state.setter
115 """
116 Control the state of the Digital Output. (is_output must be True)
117
118 @type value: bool
119 @param value: new state to output {True, False}
120 """
121 self.set_output(value)
122
124 """
125 Control the state of the Digital Output.
126
127 Use this function for finer control over the wait_for timeout.
128
129 @type value: bool
130 @param value: new state {True, False} of the Output.
131 @type timeout: float
132 @param timeout: Seconds to wait for the io to reflect command.
133 If 0, just command once and return. [0]
134 """
135 if not self._is_output:
136 raise IOError(errno.EACCES, "Component is not an output [%s: %s]" %
137 (self._component_type, self._id))
138 cmd = DigitalOutputCommand()
139 cmd.name = self._id
140 cmd.value = value
141 self._pub_output.publish(cmd)
142
143 if not timeout == 0:
144 baxter_dataflow.wait_for(
145 test=lambda: self.state == value,
146 timeout=timeout,
147 rate=100,
148 timeout_msg=("Failed to command digital io to: %r" % (value,)),
149 body=lambda: self._pub_output.publish(cmd)
150 )
151