00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019 from controller_manager_msgs.srv import ListControllers
00020 import dynamic_reconfigure.client
00021 import rospy
00022 import sys
00023 import getopt
00024 import unittest
00025 import rostest
00026 import os
00027 from sr_ronex_msgs.srv import SPI
00028 from sr_ronex_msgs.srv import SPIRequest
00029 from sr_ronex_msgs.msg import SPIState
00030 from std_msgs.msg import Bool
00031 from threading import Lock
00032 from numpy import uint8
00033
00034 from time import sleep
00035
00036
00037 class TestSPIWithHardware(unittest.TestCase):
00038 '''
00039 A class used to test the Shadow Robot SPI HW.
00040 '''
00041
00042 def setUp(self):
00043 self.find_spi_ronexes()
00044
00045 self.controllers_list = ["ronex_" + ronex_id + "_passthrough" for ronex_id in self.ronex_ids]
00046
00047 self.init_service_servers()
00048 self.state_lock = Lock()
00049
00050 rospy.sleep(0.4)
00051
00052 def tearDown(self):
00053 pass
00054
00055 def find_spi_ronexes(self):
00056 attempts = 50
00057 rospy.loginfo('Waiting for the ronex to be loaded properly.')
00058 while attempts and not rospy.has_param('/ronex/devices/0/ronex_id'):
00059 rospy.sleep(0.1)
00060 attempts -= 1
00061
00062 self.assertNotEqual(attempts, 0, 'Failed to get ronex devices from parameter server')
00063
00064 all_ronexes = rospy.get_param('/ronex/devices')
00065 self.ronex_devs = []
00066 for i in all_ronexes:
00067 if "spi" == all_ronexes[i]["product_name"]:
00068 self.ronex_devs.append(all_ronexes[i])
00069
00070 self.assertEqual(len(self.ronex_devs), 1, 'Error. Connect a ronex bridge with at least 1 spi module.')
00071
00072 self.ronex_ids = [self.ronex_devs[0]['ronex_id']]
00073
00074 def generalIOState_callback(self, data):
00075
00076 self.analogue_in = data.analogue_in
00077
00078 def init_service_servers(self):
00079
00080 ronex = "/ronex/spi/" + str(self.ronex_devs[0]["ronex_id"])
00081 self.spi_srv = [rospy.ServiceProxy(ronex + "/command/passthrough/" + str(i), SPI) for i in xrange(4)]
00082 self.dyn_rcf_client = dynamic_reconfigure.client.Client(ronex)
00083
00084 topic = ronex + "/state"
00085 rospy.Subscriber(topic, SPIState, self.generalIOState_callback)
00086
00087
00088
00089
00090
00091
00092 def test_if_controllers_are_loaded(self):
00093 list_controllers = rospy.ServiceProxy('controller_manager/list_controllers', ListControllers)
00094 available_controllers = list_controllers()
00095
00096 for ctrl in available_controllers.controller:
00097 self.assertTrue(ctrl.name in self.controllers_list,
00098 msg="Available controllers: " + str(available_controllers.controller) +
00099 " / expected controller: " + str(self.controllers_list))
00100
00101 def test_all_cases(self):
00102
00103
00104 self.run_test_case([True, True, True, True, True, True],
00105 [[3947, 3942, 3180, 3941], [3951, 3940, 3180, 3940]], [3563, 3542, 3540, 3535, 3544, 3761])
00106
00107 self.run_test_case([False, True, True, True, True, True],
00108 [[3950, 3830, 2415, 3941], [3951, 327, 3180, 3831]], [3563, 3542, 3540, 3438, 3445, 671])
00109
00110 self.run_test_case([True, False, True, True, True, True],
00111 [[3949, 326, 2415, 3941], [3951, 3833, 3188, 3831]], [3544, 3546, 3545, 3439, 310, 3666])
00112
00113 self.run_test_case([True, True, False, True, True, True],
00114 [[3950, 3832, 2415, 3943], [3949, 3833, 3187, 325]], [3547, 3546, 3545, 310, 3445, 3668])
00115
00116 self.run_test_case([True, True, True, False, True, True],
00117 [[3842, 3942, 3180, 327], [3842, 3945, 2423, 3942]], [3449, 3449, 312, 3534, 3542, 3762])
00118
00119 self.run_test_case([True, True, True, True, False, True],
00120 [[3842, 3944, 3181, 3831], [322, 3943, 2420, 3941]], [3450, 306, 3442, 3534, 3544, 3758])
00121
00122 self.run_test_case([True, True, True, True, True, False],
00123 [[323, 3943, 3180, 3831], [3843, 3943, 2419, 3942]], [308, 3446, 3445, 3536, 3542, 3761])
00124
00125 def run_test_case(self, digital_states, expected_as, expected_analogue):
00126 success = True
00127 self.set_DIO_states(digital_states)
00128
00129 results = []
00130 for adc_number in range(len(self.spi_srv)):
00131 results.append(self.read_adc(adc_number, 0))
00132 self.assertAlmostEquals(results[adc_number], expected_as[0][adc_number],
00133 msg="Testing channel 0 of " + str(adc_number) + "failed (delta = " +
00134 str(results[adc_number] - expected_as[0][adc_number]) + " / received = " +
00135 str(results[adc_number]) + ").", delta=400)
00136
00137
00138 results = []
00139 for adc_number in range(len(self.spi_srv)):
00140 results.append(self.read_adc(adc_number, 1))
00141 self.assertAlmostEquals(results[adc_number], expected_as[1][adc_number],
00142 msg="Testing channel 1 of " + str(adc_number) + "failed (delta = " +
00143 str(results[adc_number] - expected_as[1][adc_number]) + " / received = " +
00144 str(results[adc_number]) + ").", delta=400)
00145
00146
00147 for analogue_id in range(0, 6):
00148 self.assertAlmostEquals(self.analogue_in[analogue_id], expected_analogue[analogue_id],
00149 msg="Testing analogue input" + str(analogue_id) + "failed (delta = " +
00150 str(self.analogue_in[analogue_id] - expected_analogue[analogue_id]) +
00151 " / received = " + str(self.analogue_in[analogue_id]) + ").", delta=400)
00152
00153 def set_DIO_states(self, digital_states):
00154
00155 if len(digital_states) == 6:
00156 cfg = {"pin_output_state_pre_DIO_0": digital_states[0], "pin_output_state_post_DIO_0": digital_states[0],
00157 "pin_output_state_pre_DIO_1": digital_states[1], "pin_output_state_post_DIO_1": digital_states[1],
00158 "pin_output_state_pre_DIO_2": digital_states[2], "pin_output_state_post_DIO_2": digital_states[2],
00159 "pin_output_state_pre_DIO_3": digital_states[3], "pin_output_state_post_DIO_3": digital_states[3],
00160 "pin_output_state_pre_DIO_4": digital_states[4], "pin_output_state_post_DIO_4": digital_states[4],
00161 "pin_output_state_pre_DIO_5": digital_states[5], "pin_output_state_post_DIO_5": digital_states[5]}
00162 else:
00163 cfg = {"pin_output_state_pre_DIO_0": True, "pin_output_state_post_DIO_0": True,
00164 "pin_output_state_pre_DIO_1": True, "pin_output_state_post_DIO_1": True,
00165 "pin_output_state_pre_DIO_2": True, "pin_output_state_post_DIO_2": True,
00166 "pin_output_state_pre_DIO_3": True, "pin_output_state_post_DIO_3": True,
00167 "pin_output_state_pre_DIO_4": True, "pin_output_state_post_DIO_4": True,
00168 "pin_output_state_pre_DIO_5": True, "pin_output_state_post_DIO_5": True}
00169 self.dyn_rcf_client.update_configuration(cfg)
00170
00171 def read_adc(self, adc_number, channel_number):
00172 req = SPIRequest()
00173
00174 req.data = [0x0]*3
00175 req.data[0] = 0x01
00176 if channel_number == 0:
00177 req.data[1] = 0xA0
00178 else:
00179 req.data[1] = 0xE0
00180 req.data[2] = 0x01
00181 spi_res = None
00182
00183
00184 for i in range(2):
00185 try:
00186 spi_res = self.spi_srv[adc_number](req)
00187 except rospy.ServiceException as exc:
00188 self.assertTrue(False, "Failed to send data to "+str(adc_number) +
00189 " channel "+str(channel_number)+" -> " + str(exc))
00190 result = int(spi_res.data[1])
00191 result <<= 8
00192 result |= int(spi_res.data[2])
00193 result &= 0x0FFF
00194 return result
00195
00196
00197
00198
00199
00200 if __name__ == '__main__':
00201 rospy.init_node('sr_spi_test')
00202 rostest.rosrun('sr_spi_test', 'sr_spi_test', TestSPIWithHardware)