sr_spi_test.py
Go to the documentation of this file.
00001 #!/usr/bin/env python
00002 # ####################################################################
00003 # Copyright (c) 2013, Shadow Robot Company, All rights reserved.
00004 #
00005 # This library is free software; you can redistribute it and/or
00006 # modify it under the terms of the GNU Lesser General Public
00007 # License as published by the Free Software Foundation; either
00008 # version 3.0 of the License, or (at your option) any later version.
00009 #
00010 # This library is distributed in the hope that it will be useful,
00011 # but WITHOUT ANY WARRANTY; without even the implied warranty of
00012 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
00013 # Lesser General Public License for more details.
00014 #
00015 # You should have received a copy of the GNU Lesser General Public
00016 # License along with this library.
00017 # ####################################################################
00018 
00019 from controller_manager_msgs.srv import ListControllers
00020 import dynamic_reconfigure.client
00021 import rospy
00022 import sys
00023 import getopt
00024 import unittest
00025 import rostest
00026 import os
00027 from sr_ronex_msgs.srv import SPI
00028 from sr_ronex_msgs.srv import SPIRequest
00029 from sr_ronex_msgs.msg import SPIState
00030 from std_msgs.msg import Bool
00031 from threading import Lock
00032 from numpy import uint8
00033 
00034 from time import sleep
00035 
00036 
00037 class TestSPIWithHardware(unittest.TestCase):
00038     '''
00039     A class used to test the Shadow Robot SPI HW.
00040     '''
00041 
00042     def setUp(self):
00043         self.find_spi_ronexes()
00044 
00045         self.controllers_list = ["ronex_" + ronex_id + "_passthrough" for ronex_id in self.ronex_ids]
00046 
00047         self.init_service_servers()
00048         self.state_lock = Lock()
00049 
00050         rospy.sleep(0.4)  # wait for self.state to be updated for the first time
00051 
00052     def tearDown(self):
00053         pass
00054 
00055     def find_spi_ronexes(self):
00056         attempts = 50
00057         rospy.loginfo('Waiting for the ronex to be loaded properly.')
00058         while attempts and not rospy.has_param('/ronex/devices/0/ronex_id'):
00059             rospy.sleep(0.1)
00060             attempts -= 1
00061 
00062         self.assertNotEqual(attempts, 0, 'Failed to get ronex devices from parameter server')
00063 
00064         all_ronexes = rospy.get_param('/ronex/devices')
00065         self.ronex_devs = []
00066         for i in all_ronexes:
00067             if "spi" == all_ronexes[i]["product_name"]:
00068                 self.ronex_devs.append(all_ronexes[i])
00069 
00070         self.assertEqual(len(self.ronex_devs), 1, 'Error. Connect a ronex bridge with at least 1 spi module.')
00071 
00072         self.ronex_ids = [self.ronex_devs[0]['ronex_id']]
00073 
00074     def generalIOState_callback(self, data):
00075         # Note that the type of data.analogue is tuple.
00076         self.analogue_in = data.analogue_in
00077 
00078     def init_service_servers(self):
00079         # testing first spi ronex connected
00080         ronex = "/ronex/spi/" + str(self.ronex_devs[0]["ronex_id"])
00081         self.spi_srv = [rospy.ServiceProxy(ronex + "/command/passthrough/" + str(i), SPI) for i in xrange(4)]
00082         self.dyn_rcf_client = dynamic_reconfigure.client.Client(ronex)
00083         # subscribing to analogue input topic
00084         topic = ronex + "/state"
00085         rospy.Subscriber(topic, SPIState, self.generalIOState_callback)
00086 
00087 
00088 ############################################
00089 # TEST START
00090 ############################################
00091 
00092     def test_if_controllers_are_loaded(self):
00093         list_controllers = rospy.ServiceProxy('controller_manager/list_controllers', ListControllers)
00094         available_controllers = list_controllers()
00095 
00096         for ctrl in available_controllers.controller:
00097             self.assertTrue(ctrl.name in self.controllers_list,
00098                             msg="Available controllers: " + str(available_controllers.controller) +
00099                                 " / expected controller: " + str(self.controllers_list))
00100 
00101     def test_all_cases(self):
00102 
00103         # Test 1, turn on all the digital outputs
00104         self.run_test_case([True, True, True, True, True, True],
00105                            [[3947, 3942, 3180, 3941], [3951, 3940, 3180, 3940]], [3563, 3542, 3540, 3535, 3544, 3761])
00106         # Test 2, turn off DIO_0
00107         self.run_test_case([False, True, True, True, True, True],
00108                            [[3950, 3830, 2415, 3941], [3951, 327, 3180, 3831]], [3563, 3542, 3540, 3438, 3445, 671])
00109         # Test 3, turn off DIO_1
00110         self.run_test_case([True, False, True, True, True, True],
00111                            [[3949, 326, 2415, 3941], [3951, 3833, 3188, 3831]], [3544, 3546, 3545, 3439, 310, 3666])
00112         # Test 4, turn off DIO_2
00113         self.run_test_case([True, True, False, True, True, True],
00114                            [[3950, 3832, 2415, 3943], [3949, 3833, 3187, 325]], [3547, 3546, 3545, 310, 3445, 3668])
00115         # Test 5, turn off DIO_3
00116         self.run_test_case([True, True, True, False, True, True],
00117                            [[3842, 3942, 3180, 327], [3842, 3945, 2423, 3942]], [3449, 3449, 312, 3534, 3542, 3762])
00118         # Test 6, turn off DIO_4
00119         self.run_test_case([True, True, True, True, False, True],
00120                            [[3842, 3944, 3181, 3831], [322, 3943, 2420, 3941]], [3450, 306, 3442, 3534, 3544, 3758])
00121         # Test 7, turn off DIO_5
00122         self.run_test_case([True, True, True, True, True, False],
00123                            [[323, 3943, 3180, 3831], [3843, 3943, 2419, 3942]], [308, 3446, 3445, 3536, 3542, 3761])
00124 
00125     def run_test_case(self, digital_states, expected_as, expected_analogue):
00126         success = True
00127         self.set_DIO_states(digital_states)
00128         # check channel 0 of all the spi modules
00129         results = []
00130         for adc_number in range(len(self.spi_srv)):
00131             results.append(self.read_adc(adc_number, 0))
00132             self.assertAlmostEquals(results[adc_number], expected_as[0][adc_number],
00133                                     msg="Testing channel 0 of " + str(adc_number) + "failed (delta = " +
00134                                         str(results[adc_number] - expected_as[0][adc_number]) + " / received = " +
00135                                         str(results[adc_number]) + ").", delta=400)
00136 
00137         # check channel 1 of all the spi modules
00138         results = []
00139         for adc_number in range(len(self.spi_srv)):
00140             results.append(self.read_adc(adc_number, 1))
00141             self.assertAlmostEquals(results[adc_number], expected_as[1][adc_number],
00142                                     msg="Testing channel 1 of " + str(adc_number) + "failed (delta = " +
00143                                         str(results[adc_number] - expected_as[1][adc_number]) + " / received = " +
00144                                         str(results[adc_number]) + ").", delta=400)
00145 
00146         # check all the analogue inputs
00147         for analogue_id in range(0, 6):
00148             self.assertAlmostEquals(self.analogue_in[analogue_id], expected_analogue[analogue_id],
00149                                     msg="Testing analogue input" + str(analogue_id) + "failed (delta = " +
00150                                         str(self.analogue_in[analogue_id] - expected_analogue[analogue_id]) +
00151                                         " / received = " + str(self.analogue_in[analogue_id]) + ").", delta=400)
00152 
00153     def set_DIO_states(self, digital_states):
00154         # set pre / post states
00155         if len(digital_states) == 6:
00156             cfg = {"pin_output_state_pre_DIO_0": digital_states[0], "pin_output_state_post_DIO_0": digital_states[0],
00157                    "pin_output_state_pre_DIO_1": digital_states[1], "pin_output_state_post_DIO_1": digital_states[1],
00158                    "pin_output_state_pre_DIO_2": digital_states[2], "pin_output_state_post_DIO_2": digital_states[2],
00159                    "pin_output_state_pre_DIO_3": digital_states[3], "pin_output_state_post_DIO_3": digital_states[3],
00160                    "pin_output_state_pre_DIO_4": digital_states[4], "pin_output_state_post_DIO_4": digital_states[4],
00161                    "pin_output_state_pre_DIO_5": digital_states[5], "pin_output_state_post_DIO_5": digital_states[5]}
00162         else:
00163             cfg = {"pin_output_state_pre_DIO_0": True, "pin_output_state_post_DIO_0": True,
00164                    "pin_output_state_pre_DIO_1": True, "pin_output_state_post_DIO_1": True,
00165                    "pin_output_state_pre_DIO_2": True, "pin_output_state_post_DIO_2": True,
00166                    "pin_output_state_pre_DIO_3": True, "pin_output_state_post_DIO_3": True,
00167                    "pin_output_state_pre_DIO_4": True, "pin_output_state_post_DIO_4": True,
00168                    "pin_output_state_pre_DIO_5": True, "pin_output_state_post_DIO_5": True}
00169         self.dyn_rcf_client.update_configuration(cfg)
00170 
00171     def read_adc(self, adc_number, channel_number):
00172         req = SPIRequest()
00173         # set data
00174         req.data = [0x0]*3
00175         req.data[0] = 0x01
00176         if channel_number == 0:
00177             req.data[1] = 0xA0
00178         else:
00179             req.data[1] = 0xE0
00180         req.data[2] = 0x01
00181         spi_res = None
00182         # The SPI response is generated only after the first packet is
00183         # sent so we need to send a message twice.
00184         for i in range(2):
00185             try:
00186                 spi_res = self.spi_srv[adc_number](req)
00187             except rospy.ServiceException as exc:
00188                 self.assertTrue(False, "Failed to send data to "+str(adc_number) +
00189                                 " channel "+str(channel_number)+" -> " + str(exc))
00190         result = int(spi_res.data[1])
00191         result <<= 8
00192         result |= int(spi_res.data[2])
00193         result &= 0x0FFF
00194         return result
00195 
00196 ############################################
00197 # TEST END
00198 ############################################
00199 
00200 if __name__ == '__main__':
00201     rospy.init_node('sr_spi_test')
00202     rostest.rosrun('sr_spi_test', 'sr_spi_test', TestSPIWithHardware)


sr_ronex_test
Author(s): Manos Nikolaidis
autogenerated on Thu Jun 6 2019 21:21:53