Package baxter_interface :: Module digital_io
[hide private]
[frames] | no frames]

Source Code for Module baxter_interface.digital_io

  1  # Copyright (c) 2013-2015, Rethink Robotics 
  2  # All rights reserved. 
  3  # 
  4  # Redistribution and use in source and binary forms, with or without 
  5  # modification, are permitted provided that the following conditions are met: 
  6  # 
  7  # 1. Redistributions of source code must retain the above copyright notice, 
  8  #    this list of conditions and the following disclaimer. 
  9  # 2. Redistributions in binary form must reproduce the above copyright 
 10  #    notice, this list of conditions and the following disclaimer in the 
 11  #    documentation and/or other materials provided with the distribution. 
 12  # 3. Neither the name of the Rethink Robotics nor the names of its 
 13  #    contributors may be used to endorse or promote products derived from 
 14  #    this software without specific prior written permission. 
 15  # 
 16  # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" 
 17  # AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 
 18  # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 
 19  # ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE 
 20  # LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 
 21  # CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 
 22  # SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 
 23  # INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 
 24  # CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) 
 25  # ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 
 26  # POSSIBILITY OF SUCH DAMAGE. 
 27   
 28  import errno 
 29   
 30  import rospy 
 31   
 32  import baxter_dataflow 
 33   
 34  from baxter_core_msgs.msg import ( 
 35      DigitalIOState, 
 36      DigitalOutputCommand, 
 37  ) 
38 39 40 -class DigitalIO(object):
41 """ 42 Interface class for a simple Digital Input and/or Output on the 43 Baxter robot 44 45 Input 46 - read input state 47 Output 48 - turn output On/Off 49 - read current output state 50 """
51 - def __init__(self, component_id):
52 """ 53 Constructor. 54 55 @param component_id: unique id of the digital component 56 """ 57 self._id = component_id 58 self._component_type = 'digital_io' 59 self._is_output = False 60 self._state = None 61 62 self.state_changed = baxter_dataflow.Signal() 63 64 type_ns = '/robot/' + self._component_type 65 topic_base = type_ns + '/' + self._id 66 67 self._sub_state = rospy.Subscriber( 68 topic_base + '/state', 69 DigitalIOState, 70 self._on_io_state) 71 72 baxter_dataflow.wait_for( 73 lambda: self._state != None, 74 timeout=2.0, 75 timeout_msg="Failed to get current digital_io state from %s" \ 76 % (topic_base,), 77 ) 78 79 # check if output-capable before creating publisher 80 if self._is_output: 81 self._pub_output = rospy.Publisher( 82 type_ns + '/command', 83 DigitalOutputCommand, 84 queue_size=10)
85
86 - def _on_io_state(self, msg):
87 """ 88 Updates the internally stored state of the Digital Input/Output. 89 """ 90 new_state = (msg.state == DigitalIOState.PRESSED) 91 if self._state is None: 92 self._is_output = not msg.isInputOnly 93 old_state = self._state 94 self._state = new_state 95 96 # trigger signal if changed 97 if old_state is not None and old_state != new_state: 98 self.state_changed(new_state)
99 100 @property
101 - def is_output(self):
102 """ 103 Accessor to check if IO is capable of output. 104 """ 105 return self._is_output
106 107 @property
108 - def state(self):
109 """ 110 Current state of the Digital Input/Output. 111 """ 112 return self._state
113 114 @state.setter
115 - def state(self, value):
116 """ 117 Control the state of the Digital Output. (is_output must be True) 118 119 @type value: bool 120 @param value: new state to output {True, False} 121 """ 122 self.set_output(value)
123
124 - def set_output(self, value, timeout=2.0):
125 """ 126 Control the state of the Digital Output. 127 128 Use this function for finer control over the wait_for timeout. 129 130 @type value: bool 131 @param value: new state {True, False} of the Output. 132 @type timeout: float 133 @param timeout: Seconds to wait for the io to reflect command. 134 If 0, just command once and return. [0] 135 """ 136 if not self._is_output: 137 raise IOError(errno.EACCES, "Component is not an output [%s: %s]" % 138 (self._component_type, self._id)) 139 cmd = DigitalOutputCommand() 140 cmd.name = self._id 141 cmd.value = value 142 self._pub_output.publish(cmd) 143 144 if not timeout == 0: 145 baxter_dataflow.wait_for( 146 test=lambda: self.state == value, 147 timeout=timeout, 148 rate=100, 149 timeout_msg=("Failed to command digital io to: %r" % (value,)), 150 body=lambda: self._pub_output.publish(cmd) 151 )
152