Package baxter_interface :: Module digital_io
[hide private]
[frames] | no frames]

Source Code for Module baxter_interface.digital_io

  1  # Copyright (c) 2013-2014, Rethink Robotics 
  2  # All rights reserved. 
  3  # 
  4  # Redistribution and use in source and binary forms, with or without 
  5  # modification, are permitted provided that the following conditions are met: 
  6  # 
  7  # 1. Redistributions of source code must retain the above copyright notice, 
  8  #    this list of conditions and the following disclaimer. 
  9  # 2. Redistributions in binary form must reproduce the above copyright 
 10  #    notice, this list of conditions and the following disclaimer in the 
 11  #    documentation and/or other materials provided with the distribution. 
 12  # 3. Neither the name of the Rethink Robotics nor the names of its 
 13  #    contributors may be used to endorse or promote products derived from 
 14  #    this software without specific prior written permission. 
 15  # 
 16  # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" 
 17  # AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 
 18  # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 
 19  # ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE 
 20  # LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 
 21  # CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 
 22  # SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 
 23  # INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 
 24  # CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) 
 25  # ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 
 26  # POSSIBILITY OF SUCH DAMAGE. 
 27   
 28  import errno 
 29   
 30  import rospy 
 31   
 32  import baxter_dataflow 
 33   
 34  from baxter_core_msgs.msg import ( 
 35      DigitalIOState, 
 36      DigitalOutputCommand, 
 37  ) 
38 39 40 -class DigitalIO(object):
41 """ 42 Interface class for a simple Digital Input and/or Output on the 43 Baxter robot 44 45 Input 46 - read input state 47 Output 48 - turn output On/Off 49 - read current output state 50 """
51 - def __init__(self, component_id):
52 """ 53 Constructor. 54 55 @param component_id: unique id of the digital component 56 """ 57 self._id = component_id 58 self._component_type = 'digital_io' 59 self._is_output = False 60 self._state = None 61 62 self.state_changed = baxter_dataflow.Signal() 63 64 type_ns = '/robot/' + self._component_type 65 topic_base = type_ns + '/' + self._id 66 67 self._sub_state = rospy.Subscriber( 68 topic_base + '/state', 69 DigitalIOState, 70 self._on_io_state) 71 72 baxter_dataflow.wait_for( 73 lambda: self._state != None, 74 timeout=2.0, 75 timeout_msg="Failed to get current digital_io state from %s" \ 76 % (topic_base,), 77 ) 78 79 # check if output-capable before creating publisher 80 if self._is_output: 81 self._pub_output = rospy.Publisher( 82 type_ns + '/command', 83 DigitalOutputCommand)
84
85 - def _on_io_state(self, msg):
86 """ 87 Updates the internally stored state of the Digital Input/Output. 88 """ 89 new_state = (msg.state == DigitalIOState.PRESSED) 90 if self._state is None: 91 self._is_output = not msg.isInputOnly 92 old_state = self._state 93 self._state = new_state 94 95 # trigger signal if changed 96 if old_state is not None and old_state != new_state: 97 self.state_changed(new_state)
98 99 @property
100 - def is_output(self):
101 """ 102 Accessor to check if IO is capable of output. 103 """ 104 return self._is_output
105 106 @property
107 - def state(self):
108 """ 109 Current state of the Digital Input/Output. 110 """ 111 return self._state
112 113 @state.setter
114 - def state(self, value):
115 """ 116 Control the state of the Digital Output. (is_output must be True) 117 118 @type value: bool 119 @param value: new state to output {True, False} 120 """ 121 self.set_output(value)
122
123 - def set_output(self, value, timeout=2.0):
124 """ 125 Control the state of the Digital Output. 126 127 Use this function for finer control over the wait_for timeout. 128 129 @type value: bool 130 @param value: new state {True, False} of the Output. 131 @type timeout: float 132 @param timeout: Seconds to wait for the io to reflect command. 133 If 0, just command once and return. [0] 134 """ 135 if not self._is_output: 136 raise IOError(errno.EACCES, "Component is not an output [%s: %s]" % 137 (self._component_type, self._id)) 138 cmd = DigitalOutputCommand() 139 cmd.name = self._id 140 cmd.value = value 141 self._pub_output.publish(cmd) 142 143 if not timeout == 0: 144 baxter_dataflow.wait_for( 145 test=lambda: self.state == value, 146 timeout=timeout, 147 rate=100, 148 timeout_msg=("Failed to command digital io to: %r" % (value,)), 149 body=lambda: self._pub_output.publish(cmd) 150 )
151