test_rospy_transport.py
Go to the documentation of this file.
00001 #!/usr/bin/env python
00002 # Software License Agreement (BSD License)
00003 #
00004 # Copyright (c) 2008, Willow Garage, Inc.
00005 # All rights reserved.
00006 #
00007 # Redistribution and use in source and binary forms, with or without
00008 # modification, are permitted provided that the following conditions
00009 # are met:
00010 #
00011 #  * Redistributions of source code must retain the above copyright
00012 #    notice, this list of conditions and the following disclaimer.
00013 #  * Redistributions in binary form must reproduce the above
00014 #    copyright notice, this list of conditions and the following
00015 #    disclaimer in the documentation and/or other materials provided
00016 #    with the distribution.
00017 #  * Neither the name of Willow Garage, Inc. nor the names of its
00018 #    contributors may be used to endorse or promote products derived
00019 #    from this software without specific prior written permission.
00020 #
00021 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
00022 # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
00023 # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
00024 # FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
00025 # COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
00026 # INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
00027 # BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
00028 # LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
00029 # CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
00030 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
00031 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
00032 # POSSIBILITY OF SUCH DAMAGE.
00033 
00034 import os
00035 import sys
00036 import struct
00037 import unittest
00038 import time
00039 
00040 class TestRospyTransport(unittest.TestCase):
00041 
00042     def test_Transport(self):
00043         from rospy.impl.transport import Transport, INBOUND, OUTBOUND, BIDIRECTIONAL
00044         ids = []
00045         for d in [INBOUND, OUTBOUND, BIDIRECTIONAL]:
00046             t = Transport(d)
00047             self.assertEquals(d, t.direction)
00048             self.assertEquals("UNKNOWN", t.transport_type)            
00049             self.failIf(t.done)
00050             self.assertEquals(None, t.cleanup_cb)
00051             self.assertEquals('', t.endpoint_id)            
00052             self.assertEquals('unnamed', t.name)
00053             self.assertEquals(0, t.stat_bytes)            
00054             self.assertEquals(0, t.stat_num_msg)            
00055             self.failIf(t.id in ids)
00056             ids.append(t.id)
00057 
00058             t = Transport(d, 'a name')
00059             self.assertEquals(d, t.direction)
00060             self.assertEquals('a name', t.name)
00061 
00062         # test cleanup with and without a callback
00063         t = Transport(INBOUND)
00064         t.close()
00065         self.assert_(t.done)
00066         t = Transport(INBOUND)
00067 
00068         self.cleanup_obj = None
00069         def cleanup(obj):
00070             self.cleanup_obj = obj
00071 
00072         t.set_cleanup_callback(cleanup)
00073         self.assertEquals(t.cleanup_cb, cleanup)
00074         t.close()
00075         self.assert_(t.done)
00076         self.assertEquals(self.cleanup_obj, t)
00077 
00078         t = Transport(OUTBOUND)
00079         import traceback
00080         try:
00081             t.send_message('msg', 1)
00082             self.fail("send_message() should be abstract")
00083         except:
00084             traceback.print_exc()
00085         try:
00086             t.write_data('data')
00087             self.fail("write_data() should be abstract")
00088         except:
00089             traceback.print_exc()            
00090         t = Transport(INBOUND)
00091         try:
00092             t.receive_once()
00093             self.fail("receive_once() should be abstract")
00094         except: pass
00095         t = Transport(INBOUND)
00096         try:
00097             t.receive_loop()
00098             self.fail("receive_loop() should be abstract")
00099         except: pass
00100         
00101     def test_DeadTransport(self):
00102         from rospy.impl.transport import Transport, DeadTransport, INBOUND, OUTBOUND, BIDIRECTIONAL
00103         t = Transport(INBOUND, 'foo')
00104         t.stat_bytes = 1234
00105         t.stat_num_msg = 5678
00106         dead = DeadTransport(t)
00107         self.assertEquals(INBOUND, dead.direction)
00108         self.assertEquals('foo', dead.name)
00109         self.assertEquals(1234, dead.stat_bytes)
00110         self.assertEquals(5678, dead.stat_num_msg)        
00111         self.assertEquals(True, dead.done)
00112         self.assertEquals('', dead.endpoint_id)
00113 
00114         t = Transport(OUTBOUND, 'bar')
00115         t.endpoint_id = 'blah blah'
00116         t.close()
00117         dead = DeadTransport(t)
00118         self.assertEquals(OUTBOUND, dead.direction)
00119         self.assertEquals('bar', dead.name)
00120         self.assertEquals(True, dead.done)
00121         self.assertEquals(t.endpoint_id, dead.endpoint_id)
00122         
00123     def test_ProtocolHandler(self):
00124         # tripwire tests
00125         from rospy.impl.transport import ProtocolHandler
00126         h = ProtocolHandler()
00127         self.failIf(h.supports('TCPROS'))
00128         self.assertEquals([], h.get_supported())
00129         try:
00130             h.create_connection("/topic", 'http://localhost:1234', ['TCPROS'])
00131             self.fail("create_connection should raise an exception")
00132         except: pass
00133         try:
00134             h.init_publisher("/topic", 'TCPROS')
00135             self.fail("init_publisher raise an exception")
00136         except: pass
00137         h.shutdown()


test_rospy
Author(s): Ken Conley
autogenerated on Mon Oct 6 2014 11:47:19