00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019 import rospy
00020 import unittest
00021 import rostest
00022 import dynamic_reconfigure.client
00023
00024
00025 from threading import Lock
00026 from sr_ronex_msgs.msg import GeneralIOState, PWM
00027 from std_msgs.msg import Bool
00028 from controller_manager_msgs.srv import ListControllers
00029
00030
00031 class TestRonexWithHardware(unittest.TestCase):
00032 '''
00033 A class used to test the Shadow Robot ethercat board HW.
00034
00035 For the test to succeed 2 ronex modules are required
00036 each with a stacker on it
00037 The digital I/O of one module should be connected to the I/O of the other
00038 The analogue I/O of one of the modules should be connected to a constant voltage source
00039
00040 On each module
00041 12 digital inputs/outputs
00042 6 PWM outputs
00043 12 analogue inputs
00044
00045 The digital inputs will read back the value we set for the outputs.
00046 Then all the I/O should switch roles from Input to Output and vice versa
00047 The PWM outputs are set to a frequency of 0.5Hz, and the digital inputs used to check the on-off period
00048 Six of the analogue inputs should be wired to an appropriate constant voltage source
00049 '''
00050
00051 def setUp(self):
00052 self.find_ronexes()
00053 self.init_clients_publishers_subscribers()
00054 self.state_lock = Lock()
00055
00056 self.state = [None, None]
00057 self.result = False
00058
00059 self.params_i = {'input_mode_' + str(i): True for i in xrange(12)}
00060 self.params_o = {'input_mode_' + str(i): False for i in xrange(12)}
00061
00062 self.expected_analogue_values_a = [2928, 1432, 849, 478, 211, 91] + 6 * [0]
00063 self.expected_analogue_values_b = 6 * [0] + [2928, 1432, 849, 478, 211, 91]
00064
00065 self.controllers_list = ["ronex_" + ronex_id + "_passthrough" for ronex_id in self.ronex_ids]
00066
00067 pwm = PWM()
00068 pwm.pwm_period = 25600
00069 pwm.pwm_on_time_0 = 0
00070 pwm.pwm_on_time_1 = 0
00071 for p in xrange(6):
00072 self.pwm_publishers[0][p].publish(pwm)
00073
00074 rospy.sleep(0.4)
00075
00076 def tearDown(self):
00077 pwm = PWM()
00078 pwm.pwm_period = 25600
00079 pwm.pwm_on_time_0 = 0
00080 pwm.pwm_on_time_1 = 0
00081 for p in xrange(6):
00082 self.pwm_publishers[0][p].publish(pwm)
00083
00084 def find_ronexes(self):
00085 attempts = 50
00086 rospy.loginfo('Waiting for the ronex to be loaded properly.')
00087 while attempts and not rospy.has_param('/ronex/devices/0/ronex_id'):
00088 rospy.sleep(0.1)
00089 attempts -= 1
00090
00091 self.assertNotEqual(attempts, 0, 'Failed to get ronex devices from parameter server')
00092
00093 self.ronex_devs = rospy.get_param('/ronex/devices')
00094 self.assertEqual(len(self.ronex_devs), 2, 'Error. Connect a ronex bridge with 2 ronex devices')
00095
00096 def init_clients_publishers_subscribers(self):
00097 basic = '/ronex/general_io/'
00098 self.ronex_ids = [self.ronex_devs[str(i)]['ronex_id'] for i in xrange(2)]
00099 ron = [basic + str(id) for id in self.ronex_ids]
00100
00101 com_dig = '/command/digital/'
00102 com_pwm = '/command/pwm/'
00103 sta = '/state'
00104
00105 topics_list = rospy.get_published_topics()
00106 self.assertTrue([ron[0] + sta, 'sr_ronex_msgs/GeneralIOState'] in topics_list)
00107 self.assertTrue([ron[1] + sta, 'sr_ronex_msgs/GeneralIOState'] in topics_list)
00108
00109 self.clients = [dynamic_reconfigure.client.Client(r) for r in ron]
00110 self.digital_publishers = [[rospy.Publisher(r + com_dig + str(i), Bool, latch=True)
00111 for i in xrange(12)] for r in ron]
00112 self.pwm_publishers = [[rospy.Publisher(r + com_pwm + str(i), PWM, latch=True)
00113 for i in xrange(6)] for r in ron]
00114
00115 self.subscriber_0 = rospy.Subscriber(ron[0] + sta, GeneralIOState, self.state_callback_0)
00116 self.subscriber_1 = rospy.Subscriber(ron[1] + sta, GeneralIOState, self.state_callback_1)
00117
00118 def state_callback_0(self, msg):
00119 with self.state_lock:
00120 self.state[0] = msg
00121
00122 def state_callback_1(self, msg):
00123 with self.state_lock:
00124 self.state[1] = msg
00125
00126 def digital_test_case(self, outr, inr, message):
00127 self.result = False
00128 self.set_ronex_io_state(outr, inr)
00129 for i, bool in enumerate(message):
00130 self.digital_publishers[outr][i].publish(bool)
00131 rospy.sleep(0.3)
00132
00133 with self.state_lock:
00134 self.result = (self.state[inr].digital == message)
00135 self.assertTrue(self.result, 'digital i/o test failure')
00136
00137 def set_ronex_io_state(self, outr, inr):
00138 self.clients[outr].update_configuration(self.params_o)
00139 self.clients[inr].update_configuration(self.params_i)
00140
00141 rospy.sleep(0.4)
00142
00143
00144
00145
00146
00147
00148 def test_if_controllers_are_loaded(self):
00149 list_controllers = rospy.ServiceProxy('controller_manager/list_controllers', ListControllers)
00150 available_controllers = list_controllers()
00151 for ctrl in self.controllers_list:
00152 self.assertTrue(ctrl in available_controllers.controllers)
00153
00154 def test_change_state_one(self):
00155 self.set_ronex_io_state(0, 1)
00156
00157 config_0 = self.clients[0].get_configuration()
00158 config_1 = self.clients[1].get_configuration()
00159
00160 for param, value in self.params_o.iteritems():
00161 self.assertEqual(config_0[param], value, 'Failed in first attempt to set digital I/O state')
00162 for param, value in self.params_i.iteritems():
00163 self.assertEqual(config_1[param], value, 'Failed in first attempt to set digital I/O state')
00164
00165 def test_change_state_two(self):
00166 self.set_ronex_io_state(1, 0)
00167
00168 config_0 = self.clients[0].get_configuration()
00169 config_1 = self.clients[1].get_configuration()
00170
00171 for param, value in self.params_i.iteritems():
00172 self.assertEqual(config_0[param], value, 'Failed in second attempt to set digital I/O state')
00173 for param, value in self.params_o.iteritems():
00174 self.assertEqual(config_1[param], value, 'Failed in second attempt to set digital I/O state')
00175
00176 def test_digital_all_true(self):
00177 message = 12 * [True]
00178 self.digital_test_case(0, 1, message)
00179
00180 def test_digital_all_false(self):
00181 message = 12 * [False]
00182 self.digital_test_case(1, 0, message)
00183
00184
00185 def test_digital_odd_true(self):
00186 message = 6 * [True, False]
00187 self.digital_test_case(0, 1, message)
00188
00189 def test_digital_even_true(self):
00190 message = 6 * [False, True]
00191 self.digital_test_case(1, 0, message)
00192
00193 def test_analogue(self):
00194 with self.state_lock:
00195 result_0, result_1 = True, True
00196
00197 analogue, expected = [0], [50]
00198
00199 if self.state[0].analogue[0] > 0:
00200 analogue = self.state[0].analogue
00201 expected = self.expected_analogue_values_a
00202 elif self.state[0].analogue[6] > 0:
00203 analogue = self.state[0].analogue
00204 expected = self.expected_analogue_values_b
00205 elif self.state[1].analogue[0] > 0:
00206 analogue = self.state[1].analogue
00207 expected = self.expected_analogue_values_a
00208 elif self.state[1].analogue[6] > 0:
00209 analogue = self.state[1].analogue
00210 expected = self.expected_analogue_values_b
00211
00212 for ind, value in enumerate(expected):
00213 self.assertAlmostEqual(value, analogue[ind], delta=30)
00214
00215 def test_pwm_outputs(self):
00216
00217 message = 12 * [False]
00218 self.digital_test_case(0, 1, message)
00219
00220 params = {'pwm_clock_divider': 6400}
00221
00222 self.clients[0].update_configuration(params)
00223 self.clients[1].update_configuration(params)
00224
00225 rospy.sleep(0.5)
00226
00227 pwm = PWM()
00228 pwm.pwm_period = 249
00229 pwm.pwm_on_time_0 = pwm.pwm_period / 2
00230 pwm.pwm_on_time_1 = pwm.pwm_period / 2
00231
00232
00233
00234 num_of_samples = 120
00235 samples = 4 * [num_of_samples * [False]]
00236
00237 self.pwm_publishers[0][0].publish(pwm)
00238 self.pwm_publishers[0][5].publish(pwm)
00239
00240 for i in xrange(num_of_samples):
00241 samples[0][i] = self.state[1].digital[0]
00242 samples[1][i] = self.state[1].digital[1]
00243 samples[2][i] = self.state[1].digital[10]
00244 samples[3][i] = self.state[1].digital[11]
00245 rospy.sleep(0.2)
00246
00247 for sample in samples:
00248 self.assertAlmostEqual(sample.count(True), num_of_samples / 2, delta=num_of_samples / 3)
00249
00250
00251
00252
00253
00254
00255 if __name__ == '__main__':
00256 rospy.init_node('sr_ronex_test')
00257 rostest.rosrun('sr_ronex_test', 'sr_ronex_test', TestRonexWithHardware)