00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019 import rospy, sys, getopt, unittest, rostest, os
00020 import dynamic_reconfigure.client
00021
00022
00023 from threading import Lock
00024 from sr_ronex_msgs.msg import GeneralIOState, PWM
00025 from std_msgs.msg import Bool
00026 from controller_manager_msgs.srv import ListControllers
00027
00028 class TestRonexWithHardware(unittest.TestCase):
00029 '''
00030 A class used to test the Shadow Robot ethercat board HW.
00031
00032 For the test to succeed 2 ronex modules are required
00033 each with a stacker on it
00034 The digital I/O of one module should be connected to the I/O of the other
00035 The analogue I/O of one of the modules should be connected to a constant voltage source
00036
00037 On each module
00038 12 digital inputs/outputs
00039 6 PWM outputs
00040 12 analogue inputs
00041
00042 The digital inputs will read back the value we set for the outputs.
00043 Then all the I/O should switch roles from Input to Output and vice versa
00044 The PWM outputs are set to a frequency of 0.5Hz, and the digital inputs used to check the on-off period
00045 Six of the analogue inputs should be wired to an appropriate constant voltage source
00046 '''
00047
00048 def setUp(self):
00049 self.find_ronexes()
00050 self.init_clients_publishers_subscribers()
00051 self.state_lock = Lock()
00052
00053 self.state = [ None, None ]
00054 self.result = False
00055
00056 self.params_i = { 'input_mode_' + str(i) : True for i in xrange(12) }
00057 self.params_o = { 'input_mode_' + str(i) : False for i in xrange(12) }
00058
00059 self.expected_analogue_values_a = [2928, 1432, 849, 478, 211, 91] + 6 * [0]
00060 self.expected_analogue_values_b = 6 * [0] + [2928, 1432, 849, 478, 211, 91]
00061
00062 self.controllers_list = [ "ronex_" + ronex_id + "_passthrough" for ronex_id in self.ronex_ids ]
00063
00064 pwm = PWM()
00065 pwm.pwm_period = 25600
00066 pwm.pwm_on_time_0 = 0
00067 pwm.pwm_on_time_1 = 0
00068 for p in xrange(6):
00069 self.pwm_publishers[0][p].publish(pwm)
00070
00071 rospy.sleep(0.4)
00072
00073 def tearDown(self):
00074 pwm = PWM()
00075 pwm.pwm_period = 25600
00076 pwm.pwm_on_time_0 = 0
00077 pwm.pwm_on_time_1 = 0
00078 for p in xrange(6):
00079 self.pwm_publishers[0][p].publish(pwm)
00080
00081 def find_ronexes(self):
00082 attempts = 50
00083 rospy.loginfo('Waiting for the ronex to be loaded properly.')
00084 while attempts and not rospy.has_param('/ronex/devices/0/ronex_id'):
00085 rospy.sleep(0.1)
00086 attempts -= 1
00087
00088 self.assertNotEqual(attempts, 0, 'Failed to get ronex devices from parameter server')
00089
00090 self.ronex_devs = rospy.get_param('/ronex/devices')
00091 self.assertEqual(len(self.ronex_devs), 2, 'Error. Connect a ronex bridge with 2 ronex devices')
00092
00093 def init_clients_publishers_subscribers(self):
00094 basic = '/ronex/general_io/'
00095 self.ronex_ids = [ self.ronex_devs[str(i)]['ronex_id'] for i in xrange(2) ]
00096 ron = [ basic + str(id) for id in self.ronex_ids ]
00097
00098 com_dig = '/command/digital/'
00099 com_pwm = '/command/pwm/'
00100 sta = '/state'
00101
00102 topics_list = rospy.get_published_topics()
00103 self.assertTrue([ ron[0] + sta, 'sr_ronex_msgs/GeneralIOState' ] in topics_list)
00104 self.assertTrue([ ron[1] + sta, 'sr_ronex_msgs/GeneralIOState' ] in topics_list)
00105
00106 self.clients = [ dynamic_reconfigure.client.Client(r) for r in ron ]
00107 self.digital_publishers = [ [ rospy.Publisher(r + com_dig + str(i), Bool, latch = True) for i in xrange(12) ] for r in ron ]
00108 self.pwm_publishers = [ [ rospy.Publisher(r + com_pwm + str(i), PWM, latch = True) for i in xrange(6) ] for r in ron ]
00109
00110 self.subscriber_0 = rospy.Subscriber(ron[0] + sta, GeneralIOState, self.state_callback_0)
00111 self.subscriber_1 = rospy.Subscriber(ron[1] + sta, GeneralIOState, self.state_callback_1)
00112
00113
00114 def state_callback_0(self, msg):
00115 with self.state_lock:
00116 self.state[0] = msg
00117
00118 def state_callback_1(self, msg):
00119 with self.state_lock:
00120 self.state[1] = msg
00121
00122 def digital_test_case(self, outr, inr, message):
00123 self.result = False
00124 self.set_ronex_io_state(outr, inr)
00125 for i, bool in enumerate(message):
00126 self.digital_publishers[outr][i].publish(bool)
00127 rospy.sleep(0.3)
00128
00129 with self.state_lock:
00130 self.result = (self.state[inr].digital == message)
00131 self.assertTrue(self.result, 'digital i/o test failure')
00132
00133 def set_ronex_io_state(self, outr, inr):
00134 self.clients[outr].update_configuration(self.params_o)
00135 self.clients[inr].update_configuration(self.params_i)
00136
00137 rospy.sleep(0.4)
00138
00139
00140
00141
00142
00143
00144
00145
00146 def test_if_controllers_are_loaded(self):
00147 list_controllers = rospy.ServiceProxy('controller_manager/list_controllers', ListControllers)
00148 available_controllers = list_controllers()
00149 for ctrl in self.controllers_list:
00150 self.assertTrue(ctrl in available_controllers.controllers)
00151
00152 def test_change_state_one(self):
00153 self.set_ronex_io_state(0, 1)
00154
00155 config_0 = self.clients[0].get_configuration()
00156 config_1 = self.clients[1].get_configuration()
00157
00158 for param, value in self.params_o.iteritems():
00159 self.assertEqual(config_0[param], value, 'Failed in first attempt to set digital I/O state')
00160 for param, value in self.params_i.iteritems():
00161 self.assertEqual(config_1[param], value, 'Failed in first attempt to set digital I/O state')
00162
00163 def test_change_state_two(self):
00164 self.set_ronex_io_state(1, 0)
00165
00166 config_0 = self.clients[0].get_configuration()
00167 config_1 = self.clients[1].get_configuration()
00168
00169 for param, value in self.params_i.iteritems():
00170 self.assertEqual(config_0[param], value, 'Failed in second attempt to set digital I/O state')
00171 for param, value in self.params_o.iteritems():
00172 self.assertEqual(config_1[param], value, 'Failed in second attempt to set digital I/O state')
00173
00174 def test_digital_all_true(self):
00175 message = 12 * [True]
00176 self.digital_test_case(0, 1, message)
00177
00178 def test_digital_all_false(self):
00179 message = 12 * [False]
00180 self.digital_test_case(1, 0, message)
00181
00182
00183 def test_digital_odd_true(self):
00184 message = 6 * [True, False]
00185 self.digital_test_case(0, 1, message)
00186
00187 def test_digital_even_true(self):
00188 message = 6 * [False, True]
00189 self.digital_test_case(1, 0, message)
00190
00191 def test_analogue(self):
00192 with self.state_lock:
00193 result_0, result_1 = True, True
00194
00195 analogue, expected = [0], [50]
00196
00197 if self.state[0].analogue[0] > 0:
00198 analogue = self.state[0].analogue
00199 expected = self.expected_analogue_values_a
00200 elif self.state[0].analogue[6] > 0:
00201 analogue = self.state[0].analogue
00202 expected = self.expected_analogue_values_b
00203 elif self.state[1].analogue[0] > 0:
00204 analogue = self.state[1].analogue
00205 expected = self.expected_analogue_values_a
00206 elif self.state[1].analogue[6] > 0:
00207 analogue = self.state[1].analogue
00208 expected = self.expected_analogue_values_b
00209
00210 for ind, value in enumerate(expected):
00211 self.assertAlmostEqual(value, analogue[ind], delta = 30)
00212
00213 def test_pwm_outputs(self):
00214
00215 message = 12 * [False]
00216 self.digital_test_case(0, 1, message)
00217
00218 params = { 'pwm_clock_divider' : 6400 }
00219
00220 self.clients[0].update_configuration(params)
00221 self.clients[1].update_configuration(params)
00222
00223 rospy.sleep(0.5)
00224
00225 pwm = PWM()
00226 pwm.pwm_period = 249
00227 pwm.pwm_on_time_0 = pwm.pwm_period / 2
00228 pwm.pwm_on_time_1 = pwm.pwm_period / 2
00229
00230
00231
00232 num_of_samples = 120
00233 samples = 4 * [num_of_samples * [False]]
00234
00235 self.pwm_publishers[0][0].publish(pwm)
00236 self.pwm_publishers[0][5].publish(pwm)
00237
00238 for i in xrange(num_of_samples):
00239 samples[0][i] = self.state[1].digital[0]
00240 samples[1][i] = self.state[1].digital[1]
00241 samples[2][i] = self.state[1].digital[10]
00242 samples[3][i] = self.state[1].digital[11]
00243 rospy.sleep(0.2)
00244
00245 for sample in samples:
00246 self.assertAlmostEqual(sample.count(True), num_of_samples / 2, delta = num_of_samples / 3)
00247
00248
00249
00250
00251
00252
00253 if __name__ == '__main__':
00254 rospy.init_node('sr_ronex_test')
00255 rostest.rosrun('sr_ronex_test', 'sr_ronex_test', TestRonexWithHardware)