34 from threading
import Event
49 def __init__(self, robot_name=None, init_node=True):
53 self.
ns =
'/' + robot_name +
'/' 62 return self.digital_states.get(addr,
False)
68 return self.analog_states.get(addr, 0)
73 if isinstance(val, list):
79 addr_list = range(addr, addr + len(val_list))
81 if all(map(
lambda a, v: v
not in [0, 1]
or self.
get_digital_val(a) == v, addr_list, val_list)):
84 self.digital_val_hooks.append(checker)
85 result = ev.wait(time_out)
86 self.digital_val_hooks.remove(checker)
92 self.
set_io(IOFunctionType.SET_DIGITAL_OUT, addr, list(val))
93 except rospy.ServiceException, e:
94 print "Service call failed: %s" % e
98 self.
set_io(IOFunctionType.SET_ADC_MODE, ch, [mode])
99 except rospy.ServiceException, e:
100 print "Service call failed: %s" % e
103 for x
in data.digital_in_states:
106 for x
in data.digital_out_states:
109 for x
in data.analog_in_states:
118 rospy.init_node(
'io_interface', anonymous =
True)
119 rospy.Subscriber(self.
ns +
'io_states', IOStates, self.
callback)
120 rospy.wait_for_message(self.
ns +
'io_states', IOStates, 5)
123 rospy.loginfo(self.
ns)
124 rospy.wait_for_service(self.
ns +
'set_io')
125 self.
set_io = rospy.ServiceProxy(self.
ns +
'set_io', SetIO)
127 if __name__ ==
'__main__':
129 io.set_analog_mode(0,1)
130 io.set_digital_val(16,1)
131 io.set_digital_val(17,0)
132 print(io.wait_digital_val(16, [1, 0], time_out=5))
133 print io.digital_states
134 print io.get_digital_val(0)
135 print io.get_digital_val(16)
136 print io.get_digital_val(17)
def set_analog_mode(self, ch, mode)
def wait_digital_val(self, addr, val, time_out=None)
dictionary digital_states
def __init__(self, robot_name=None, init_node=True)
def get_digital_vals(self)
def get_analog_val(self, addr)
def set_digital_val(self, addr, val)
def get_digital_val(self, addr)