test_rospy_transport.py
Go to the documentation of this file.
00001 #!/usr/bin/env python
00002 # Software License Agreement (BSD License)
00003 #
00004 # Copyright (c) 2008, Willow Garage, Inc.
00005 # All rights reserved.
00006 #
00007 # Redistribution and use in source and binary forms, with or without
00008 # modification, are permitted provided that the following conditions
00009 # are met:
00010 #
00011 #  * Redistributions of source code must retain the above copyright
00012 #    notice, this list of conditions and the following disclaimer.
00013 #  * Redistributions in binary form must reproduce the above
00014 #    copyright notice, this list of conditions and the following
00015 #    disclaimer in the documentation and/or other materials provided
00016 #    with the distribution.
00017 #  * Neither the name of Willow Garage, Inc. nor the names of its
00018 #    contributors may be used to endorse or promote products derived
00019 #    from this software without specific prior written permission.
00020 #
00021 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
00022 # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
00023 # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
00024 # FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
00025 # COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
00026 # INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
00027 # BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
00028 # LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
00029 # CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
00030 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
00031 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
00032 # POSSIBILITY OF SUCH DAMAGE.
00033 
00034 import os
00035 import sys
00036 import struct
00037 import unittest
00038 import time
00039 
00040 class TestRospyTransport(unittest.TestCase):
00041 
00042     def test_Transport(self):
00043         from rospy.impl.transport import Transport, INBOUND, OUTBOUND, BIDIRECTIONAL
00044         ids = []
00045         for d in [INBOUND, OUTBOUND, BIDIRECTIONAL]:
00046             t = Transport(d)
00047             self.assertEquals(d, t.direction)
00048             self.assertEquals("UNKNOWN", t.transport_type)            
00049             self.failIf(t.done)
00050             self.assertEquals(None, t.cleanup_cb)
00051             self.assertEquals('', t.endpoint_id)            
00052             self.assertEquals('unnamed', t.name)
00053             self.assertEquals(0, t.stat_bytes)            
00054             self.assertEquals(0, t.stat_num_msg)            
00055             self.failIf(t.id in ids)
00056             ids.append(t.id)
00057 
00058             t = Transport(d, 'a name')
00059             self.assertEquals(d, t.direction)
00060             self.assertEquals('a name', t.name)
00061 
00062         # test cleanup with and without a callback
00063         t = Transport(INBOUND)
00064         t.close()
00065         self.assert_(t.done)
00066         t = Transport(INBOUND)
00067 
00068         self.cleanup_obj = None
00069         def cleanup(obj):
00070             self.cleanup_obj = obj
00071 
00072         t.set_cleanup_callback(cleanup)
00073         self.assertEquals(t.cleanup_cb, cleanup)
00074         t.close()
00075         self.assert_(t.done)
00076         self.assertEquals(self.cleanup_obj, t)
00077 
00078         t = Transport(OUTBOUND)
00079         import traceback
00080         try:
00081             t.send_message('msg', 1)
00082             self.fail("send_message() should be abstract")
00083         except: pass
00084         try:
00085             t.write_data('data')
00086             self.fail("write_data() should be abstract")
00087         except: pass
00088         t = Transport(INBOUND)
00089         try:
00090             t.receive_once()
00091             self.fail("receive_once() should be abstract")
00092         except: pass
00093         t = Transport(INBOUND)
00094         try:
00095             t.receive_loop()
00096             self.fail("receive_loop() should be abstract")
00097         except: pass
00098         
00099     def test_DeadTransport(self):
00100         from rospy.impl.transport import Transport, DeadTransport, INBOUND, OUTBOUND, BIDIRECTIONAL
00101         t = Transport(INBOUND, 'foo')
00102         t.stat_bytes = 1234
00103         t.stat_num_msg = 5678
00104         dead = DeadTransport(t)
00105         self.assertEquals(INBOUND, dead.direction)
00106         self.assertEquals('foo', dead.name)
00107         self.assertEquals(1234, dead.stat_bytes)
00108         self.assertEquals(5678, dead.stat_num_msg)        
00109         self.assertEquals(True, dead.done)
00110         self.assertEquals('', dead.endpoint_id)
00111 
00112         t = Transport(OUTBOUND, 'bar')
00113         t.endpoint_id = 'blah blah'
00114         t.close()
00115         dead = DeadTransport(t)
00116         self.assertEquals(OUTBOUND, dead.direction)
00117         self.assertEquals('bar', dead.name)
00118         self.assertEquals(True, dead.done)
00119         self.assertEquals(t.endpoint_id, dead.endpoint_id)
00120         
00121     def test_ProtocolHandler(self):
00122         # tripwire tests
00123         from rospy.impl.transport import ProtocolHandler
00124         h = ProtocolHandler()
00125         self.failIf(h.supports('TCPROS'))
00126         self.assertEquals([], h.get_supported())
00127         try:
00128             h.create_connection("/topic", 'http://localhost:1234', ['TCPROS'])
00129             self.fail("create_connection should raise an exception")
00130         except: pass
00131         try:
00132             h.init_publisher("/topic", 'TCPROS')
00133             self.fail("init_publisher raise an exception")
00134         except: pass
00135         h.shutdown()


test_rospy
Author(s): Ken Conley
autogenerated on Thu Jun 6 2019 21:10:57