Package baxter_interface :: Module analog_io
[hide private]
[frames] | no frames]

Source Code for Module baxter_interface.analog_io

  1  # Copyright (c) 2013-2015, Rethink Robotics 
  2  # All rights reserved. 
  3  # 
  4  # Redistribution and use in source and binary forms, with or without 
  5  # modification, are permitted provided that the following conditions are met: 
  6  # 
  7  # 1. Redistributions of source code must retain the above copyright notice, 
  8  #    this list of conditions and the following disclaimer. 
  9  # 2. Redistributions in binary form must reproduce the above copyright 
 10  #    notice, this list of conditions and the following disclaimer in the 
 11  #    documentation and/or other materials provided with the distribution. 
 12  # 3. Neither the name of the Rethink Robotics nor the names of its 
 13  #    contributors may be used to endorse or promote products derived from 
 14  #    this software without specific prior written permission. 
 15  # 
 16  # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" 
 17  # AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 
 18  # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 
 19  # ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE 
 20  # LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 
 21  # CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 
 22  # SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 
 23  # INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 
 24  # CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) 
 25  # ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 
 26  # POSSIBILITY OF SUCH DAMAGE. 
 27   
 28  import errno 
 29   
 30  import rospy 
 31   
 32  import baxter_dataflow 
 33   
 34  from baxter_core_msgs.msg import ( 
 35      AnalogIOState, 
 36      AnalogOutputCommand, 
 37  ) 
 38   
 39   
40 -class AnalogIO(object):
41 """ 42 Interface class for a simple Analog Input and/or Output on the 43 Baxter robot. 44 45 Input 46 - read input state 47 Output 48 - set new output state 49 - read current output state 50 """
51 - def __init__(self, component_id):
52 """ 53 Constructor. 54 55 @param component_id: unique id of analog component 56 """ 57 self._id = component_id 58 self._component_type = 'analog_io' 59 self._is_output = False 60 61 self._state = dict() 62 63 type_ns = '/robot/' + self._component_type 64 topic_base = type_ns + '/' + self._id 65 66 self._sub_state = rospy.Subscriber( 67 topic_base + '/state', 68 AnalogIOState, 69 self._on_io_state) 70 71 baxter_dataflow.wait_for( 72 lambda: len(self._state.keys()) != 0, 73 timeout=2.0, 74 timeout_msg="Failed to get current analog_io state from %s" \ 75 % (topic_base,), 76 ) 77 78 # check if output-capable before creating publisher 79 if self._is_output: 80 self._pub_output = rospy.Publisher( 81 type_ns + '/command', 82 AnalogOutputCommand, 83 queue_size=10)
84
85 - def _on_io_state(self, msg):
86 """ 87 Updates the internally stored state of the Analog Input/Output. 88 """ 89 self._is_output = not msg.isInputOnly 90 self._state['value'] = msg.value
91
92 - def state(self):
93 """ 94 Return the latest value of the Analog Input/Output. 95 """ 96 return self._state['value']
97
98 - def is_output(self):
99 """ 100 Accessor to check if IO is capable of output. 101 """ 102 return self._is_output
103
104 - def set_output(self, value, timeout=2.0):
105 """ 106 Control the state of the Analog Output. 107 108 @type value: uint16 109 @param value: new state of the Output. 110 @type timeout: float 111 @param timeout: Seconds to wait for the io to reflect command. 112 If 0, just command once and return. [0] 113 """ 114 if not self._is_output: 115 raise IOError(errno.EACCES, "Component is not an output [%s: %s]" % 116 (self._component_type, self._id)) 117 cmd = AnalogOutputCommand() 118 cmd.name = self._id 119 cmd.value = value 120 self._pub_output.publish(cmd) 121 122 if not timeout == 0: 123 baxter_dataflow.wait_for( 124 test=lambda: self.state() == value, 125 timeout=timeout, 126 rate=100, 127 timeout_msg=("Failed to command analog io to: %d" % (value,)), 128 body=lambda: self._pub_output.publish(cmd) 129 )
130