$search
00001 #!/usr/bin/env python 00002 # Software License Agreement (BSD License) 00003 # 00004 # Copyright (c) 2008, Willow Garage, Inc. 00005 # All rights reserved. 00006 # 00007 # Redistribution and use in source and binary forms, with or without 00008 # modification, are permitted provided that the following conditions 00009 # are met: 00010 # 00011 # * Redistributions of source code must retain the above copyright 00012 # notice, this list of conditions and the following disclaimer. 00013 # * Redistributions in binary form must reproduce the above 00014 # copyright notice, this list of conditions and the following 00015 # disclaimer in the documentation and/or other materials provided 00016 # with the distribution. 00017 # * Neither the name of Willow Garage, Inc. nor the names of its 00018 # contributors may be used to endorse or promote products derived 00019 # from this software without specific prior written permission. 00020 # 00021 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 00022 # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 00023 # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS 00024 # FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE 00025 # COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, 00026 # INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, 00027 # BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 00028 # LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER 00029 # CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 00030 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN 00031 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 00032 # POSSIBILITY OF SUCH DAMAGE. 00033 # 00034 # Revision $Id$ 00035 00036 import roslib; roslib.load_manifest('test_rospy') 00037 00038 import os 00039 import sys 00040 import struct 00041 import unittest 00042 import time 00043 00044 class TestRospyTransport(unittest.TestCase): 00045 00046 def test_Transport(self): 00047 from rospy.impl.transport import Transport, INBOUND, OUTBOUND, BIDIRECTIONAL 00048 ids = [] 00049 for d in [INBOUND, OUTBOUND, BIDIRECTIONAL]: 00050 t = Transport(d) 00051 self.assertEquals(d, t.direction) 00052 self.assertEquals("UNKNOWN", t.transport_type) 00053 self.failIf(t.done) 00054 self.assertEquals(None, t.cleanup_cb) 00055 self.assertEquals('', t.endpoint_id) 00056 self.assertEquals('unnamed', t.name) 00057 self.assertEquals(0, t.stat_bytes) 00058 self.assertEquals(0, t.stat_num_msg) 00059 self.failIf(t.id in ids) 00060 ids.append(t.id) 00061 00062 t = Transport(d, 'a name') 00063 self.assertEquals(d, t.direction) 00064 self.assertEquals('a name', t.name) 00065 00066 # test cleanup with and without a callback 00067 t = Transport(INBOUND) 00068 t.close() 00069 self.assert_(t.done) 00070 t = Transport(INBOUND) 00071 00072 self.cleanup_obj = None 00073 def cleanup(obj): 00074 self.cleanup_obj = obj 00075 00076 t.set_cleanup_callback(cleanup) 00077 self.assertEquals(t.cleanup_cb, cleanup) 00078 t.close() 00079 self.assert_(t.done) 00080 self.assertEquals(self.cleanup_obj, t) 00081 00082 t = Transport(OUTBOUND) 00083 import traceback 00084 try: 00085 t.send_message('msg', 1) 00086 self.fail("send_message() should be abstract") 00087 except: 00088 traceback.print_exc() 00089 try: 00090 t.write_data('data') 00091 self.fail("write_data() should be abstract") 00092 except: 00093 traceback.print_exc() 00094 t = Transport(INBOUND) 00095 try: 00096 t.receive_once() 00097 self.fail("receive_once() should be abstract") 00098 except: pass 00099 t = Transport(INBOUND) 00100 try: 00101 t.receive_loop() 00102 self.fail("receive_loop() should be abstract") 00103 except: pass 00104 00105 def test_DeadTransport(self): 00106 from rospy.impl.transport import Transport, DeadTransport, INBOUND, OUTBOUND, BIDIRECTIONAL 00107 t = Transport(INBOUND, 'foo') 00108 t.stat_bytes = 1234 00109 t.stat_num_msg = 5678 00110 dead = DeadTransport(t) 00111 self.assertEquals(INBOUND, dead.direction) 00112 self.assertEquals('foo', dead.name) 00113 self.assertEquals(1234, dead.stat_bytes) 00114 self.assertEquals(5678, dead.stat_num_msg) 00115 self.assertEquals(True, dead.done) 00116 self.assertEquals('', dead.endpoint_id) 00117 00118 t = Transport(OUTBOUND, 'bar') 00119 t.endpoint_id = 'blah blah' 00120 t.close() 00121 dead = DeadTransport(t) 00122 self.assertEquals(OUTBOUND, dead.direction) 00123 self.assertEquals('bar', dead.name) 00124 self.assertEquals(True, dead.done) 00125 self.assertEquals(t.endpoint_id, dead.endpoint_id) 00126 00127 def test_ProtocolHandler(self): 00128 # tripwire tests 00129 from rospy.impl.transport import ProtocolHandler 00130 h = ProtocolHandler() 00131 self.failIf(h.supports('TCPROS')) 00132 self.assertEquals([], h.get_supported()) 00133 try: 00134 h.create_connection("/topic", 'http://localhost:1234', ['TCPROS']) 00135 self.fail("create_connection should raise an exception") 00136 except: pass 00137 try: 00138 h.init_publisher("/topic", 'TCPROS') 00139 self.fail("init_publisher raise an exception") 00140 except: pass 00141 h.shutdown() 00142 00143 if __name__ == '__main__': 00144 import rostest 00145 rostest.unitrun('test_rospy', sys.argv[0], TestRospyTransport, coverage_packages=['rospy.impl.transport'])