$search
00001 #! /usr/bin/python 00002 """ 00003 test_xbee.py 00004 00005 By Paul Malmsten, 2010 00006 pmalmsten@gmail.com 00007 00008 Tests the XBeeBase superclass module for XBee API conformance. 00009 """ 00010 import unittest 00011 from xbee.base import XBeeBase 00012 from xbee.tests.Fake import FakeDevice, FakeReadDevice 00013 00014 class TestWriteToDevice(unittest.TestCase): 00015 """ 00016 XBeeBase class should properly._write binary data in a valid API 00017 frame to a given serial device. 00018 """ 00019 00020 def test_write(self): 00021 """ 00022 _write method should write the expected data to the serial 00023 device 00024 """ 00025 device = FakeDevice() 00026 00027 xbee = XBeeBase(device) 00028 xbee._write('\x00') 00029 00030 # Check resuting state of fake device 00031 expected_frame = '\x7E\x00\x01\x00\xFF' 00032 self.assertEqual(device.data, expected_frame) 00033 00034 def test_write_again(self): 00035 """ 00036 _write method should write the expected data to the serial 00037 device 00038 """ 00039 device = FakeDevice() 00040 00041 xbee = XBeeBase(device) 00042 xbee._write('\x00\x01\x02') 00043 00044 # Check resuting state of fake device 00045 expected_frame = '\x7E\x00\x03\x00\x01\x02\xFC' 00046 self.assertEqual(device.data, expected_frame) 00047 00048 def test_write_escaped(self): 00049 """ 00050 _write method should write the expected data to the serial 00051 device 00052 """ 00053 device = FakeDevice() 00054 00055 xbee = XBeeBase(device,escaped=True) 00056 xbee._write('\x7E\x01\x7D\x11\x13') 00057 00058 # Check resuting state of fake device 00059 expected_frame = '\x7E\x00\x05\x7D\x5E\x01\x7D\x5D\x7D\x31\x7D\x33\xDF' 00060 self.assertEqual(device.data, expected_frame) 00061 00062 class TestReadFromDevice(unittest.TestCase): 00063 """ 00064 XBeeBase class should properly read and extract data from a valid 00065 API frame 00066 """ 00067 def test_read(self): 00068 """ 00069 _wait_for_frame should properly read a frame of data 00070 """ 00071 device = FakeReadDevice('\x7E\x00\x01\x00\xFF') 00072 xbee = XBeeBase(device) 00073 00074 frame = xbee._wait_for_frame() 00075 self.assertEqual(frame.data, '\x00') 00076 00077 def test_read_invalid_followed_by_valid(self): 00078 """ 00079 _wait_for_frame should skip invalid data 00080 """ 00081 device = FakeReadDevice( 00082 '\x7E\x00\x01\x00\xFA' + '\x7E\x00\x01\x05\xFA') 00083 xbee = XBeeBase(device) 00084 00085 frame = xbee._wait_for_frame() 00086 self.assertEqual(frame.data, '\x05') 00087 00088 def test_read_escaped(self): 00089 """ 00090 _wait_for_frame should properly read a frame of data 00091 Verify that API mode 2 escaped bytes are read correctly 00092 """ 00093 device = FakeReadDevice('\x7E\x00\x04\x7D\x5E\x7D\x5D\x7D\x31\x7D\x33\xE0') 00094 00095 xbee = XBeeBase(device,escaped=True) 00096 00097 frame = xbee._wait_for_frame() 00098 self.assertEqual(frame.data, '\x7E\x7D\x11\x13') 00099 00100 class TestNotImplementedFeatures(unittest.TestCase): 00101 """ 00102 In order to properly use the XBeeBase class for most situations, 00103 it must be subclassed with the proper attributes definined. If 00104 this is not the case, then a NotImplemented exception should be 00105 raised as appropriate. 00106 """ 00107 00108 def setUp(self): 00109 """ 00110 Set up a base class XBeeBase object which does not have 00111 api_commands or api_responses defined 00112 """ 00113 self.xbee = XBeeBase(None) 00114 00115 def test_build_command(self): 00116 """ 00117 _build_command should raise NotImplemented 00118 """ 00119 self.assertRaises(NotImplementedError, self.xbee._build_command, "at") 00120 00121 def test_split_response(self): 00122 """ 00123 split_command should raise NotImplemented 00124 """ 00125 self.assertRaises(NotImplementedError, self.xbee._split_response, "\00") 00126 00127 def test_shorthand(self): 00128 """ 00129 Shorthand calls should raise NotImplementedError 00130 """ 00131 try: 00132 self.xbee.at 00133 except NotImplementedError: 00134 pass 00135 else: 00136 self.fail("Shorthand call on XBeeBase base class should raise NotImplementedError") 00137 00138 class TestAsyncCallback(unittest.TestCase): 00139 """ 00140 XBeeBase constructor should accept an optional callback function 00141 argument. When provided, this will put the module into a threaded 00142 mode, in which it will call the provided function with any API 00143 frame data received. 00144 00145 As it would be very difficult to sanely test an asynchonous callback 00146 routine with a synchronous test process, proper callback behavior 00147 is not tested automatically at this time. Theoretically, the 00148 callback implementation logic is simple, but use it at your own risk. 00149 """ 00150 00151 def setUp(self): 00152 self.xbee = None 00153 self.serial = FakeReadDevice([], silent_on_empty=True) 00154 self.callback = lambda data: None 00155 00156 def tearDown(self): 00157 # Ensure proper thread shutdown before continuing 00158 self.xbee.halt() 00159 00160 def test_provide_callback(self): 00161 """ 00162 XBeeBase constructor should accept a callback function 00163 """ 00164 self.xbee = XBeeBase(self.serial, callback=self.callback) 00165 00166 class TestInitialization(unittest.TestCase): 00167 """ 00168 Ensures that XBeeBase objects are properly constructed 00169 """ 00170 00171 def setUp(self): 00172 self.base = XBeeBase(None) 00173 00174 def test_thread_always_initialized(self): 00175 """ 00176 Even when a callback method is not supplied to the XBeeBase 00177 constructor, it must be properly initalized as a 00178 threading.Thread object 00179 """ 00180 self.assertFalse(self.base.is_alive()) 00181 00182 if __name__ == '__main__': 00183 unittest.main()