Go to the documentation of this file.00001
00002 """
00003 test_xbee.py
00004
00005 By Paul Malmsten, 2010
00006 pmalmsten@gmail.com
00007
00008 Tests the XBeeBase superclass module for XBee API conformance.
00009 """
00010 import unittest
00011 from xbee.base import XBeeBase
00012 from xbee.tests.Fake import FakeDevice, FakeReadDevice
00013
00014 class TestWriteToDevice(unittest.TestCase):
00015 """
00016 XBeeBase class should properly._write binary data in a valid API
00017 frame to a given serial device.
00018 """
00019
00020 def test_write(self):
00021 """
00022 _write method should write the expected data to the serial
00023 device
00024 """
00025 device = FakeDevice()
00026
00027 xbee = XBeeBase(device)
00028 xbee._write('\x00')
00029
00030
00031 expected_frame = '\x7E\x00\x01\x00\xFF'
00032 self.assertEqual(device.data, expected_frame)
00033
00034 def test_write_again(self):
00035 """
00036 _write method should write the expected data to the serial
00037 device
00038 """
00039 device = FakeDevice()
00040
00041 xbee = XBeeBase(device)
00042 xbee._write('\x00\x01\x02')
00043
00044
00045 expected_frame = '\x7E\x00\x03\x00\x01\x02\xFC'
00046 self.assertEqual(device.data, expected_frame)
00047
00048 def test_write_escaped(self):
00049 """
00050 _write method should write the expected data to the serial
00051 device
00052 """
00053 device = FakeDevice()
00054
00055 xbee = XBeeBase(device,escaped=True)
00056 xbee._write('\x7E\x01\x7D\x11\x13')
00057
00058
00059 expected_frame = '\x7E\x00\x05\x7D\x5E\x01\x7D\x5D\x7D\x31\x7D\x33\xDF'
00060 self.assertEqual(device.data, expected_frame)
00061
00062 class TestReadFromDevice(unittest.TestCase):
00063 """
00064 XBeeBase class should properly read and extract data from a valid
00065 API frame
00066 """
00067 def test_read(self):
00068 """
00069 _wait_for_frame should properly read a frame of data
00070 """
00071 device = FakeReadDevice('\x7E\x00\x01\x00\xFF')
00072 xbee = XBeeBase(device)
00073
00074 frame = xbee._wait_for_frame()
00075 self.assertEqual(frame.data, '\x00')
00076
00077 def test_read_invalid_followed_by_valid(self):
00078 """
00079 _wait_for_frame should skip invalid data
00080 """
00081 device = FakeReadDevice(
00082 '\x7E\x00\x01\x00\xFA' + '\x7E\x00\x01\x05\xFA')
00083 xbee = XBeeBase(device)
00084
00085 frame = xbee._wait_for_frame()
00086 self.assertEqual(frame.data, '\x05')
00087
00088 def test_read_escaped(self):
00089 """
00090 _wait_for_frame should properly read a frame of data
00091 Verify that API mode 2 escaped bytes are read correctly
00092 """
00093 device = FakeReadDevice('\x7E\x00\x04\x7D\x5E\x7D\x5D\x7D\x31\x7D\x33\xE0')
00094
00095 xbee = XBeeBase(device,escaped=True)
00096
00097 frame = xbee._wait_for_frame()
00098 self.assertEqual(frame.data, '\x7E\x7D\x11\x13')
00099
00100 class TestNotImplementedFeatures(unittest.TestCase):
00101 """
00102 In order to properly use the XBeeBase class for most situations,
00103 it must be subclassed with the proper attributes definined. If
00104 this is not the case, then a NotImplemented exception should be
00105 raised as appropriate.
00106 """
00107
00108 def setUp(self):
00109 """
00110 Set up a base class XBeeBase object which does not have
00111 api_commands or api_responses defined
00112 """
00113 self.xbee = XBeeBase(None)
00114
00115 def test_build_command(self):
00116 """
00117 _build_command should raise NotImplemented
00118 """
00119 self.assertRaises(NotImplementedError, self.xbee._build_command, "at")
00120
00121 def test_split_response(self):
00122 """
00123 split_command should raise NotImplemented
00124 """
00125 self.assertRaises(NotImplementedError, self.xbee._split_response, "\00")
00126
00127 def test_shorthand(self):
00128 """
00129 Shorthand calls should raise NotImplementedError
00130 """
00131 try:
00132 self.xbee.at
00133 except NotImplementedError:
00134 pass
00135 else:
00136 self.fail("Shorthand call on XBeeBase base class should raise NotImplementedError")
00137
00138 class TestAsyncCallback(unittest.TestCase):
00139 """
00140 XBeeBase constructor should accept an optional callback function
00141 argument. When provided, this will put the module into a threaded
00142 mode, in which it will call the provided function with any API
00143 frame data received.
00144
00145 As it would be very difficult to sanely test an asynchonous callback
00146 routine with a synchronous test process, proper callback behavior
00147 is not tested automatically at this time. Theoretically, the
00148 callback implementation logic is simple, but use it at your own risk.
00149 """
00150
00151 def setUp(self):
00152 self.xbee = None
00153 self.serial = FakeReadDevice([], silent_on_empty=True)
00154 self.callback = lambda data: None
00155
00156 def tearDown(self):
00157
00158 self.xbee.halt()
00159
00160 def test_provide_callback(self):
00161 """
00162 XBeeBase constructor should accept a callback function
00163 """
00164 self.xbee = XBeeBase(self.serial, callback=self.callback)
00165
00166 class TestInitialization(unittest.TestCase):
00167 """
00168 Ensures that XBeeBase objects are properly constructed
00169 """
00170
00171 def setUp(self):
00172 self.base = XBeeBase(None)
00173
00174 def test_thread_always_initialized(self):
00175 """
00176 Even when a callback method is not supplied to the XBeeBase
00177 constructor, it must be properly initalized as a
00178 threading.Thread object
00179 """
00180 self.assertFalse(self.base.is_alive())
00181
00182 if __name__ == '__main__':
00183 unittest.main()