test_rospy_transport.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 # Software License Agreement (BSD License)
3 #
4 # Copyright (c) 2008, Willow Garage, Inc.
5 # All rights reserved.
6 #
7 # Redistribution and use in source and binary forms, with or without
8 # modification, are permitted provided that the following conditions
9 # are met:
10 #
11 # * Redistributions of source code must retain the above copyright
12 # notice, this list of conditions and the following disclaimer.
13 # * Redistributions in binary form must reproduce the above
14 # copyright notice, this list of conditions and the following
15 # disclaimer in the documentation and/or other materials provided
16 # with the distribution.
17 # * Neither the name of Willow Garage, Inc. nor the names of its
18 # contributors may be used to endorse or promote products derived
19 # from this software without specific prior written permission.
20 #
21 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22 # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23 # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
24 # FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
25 # COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
26 # INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
27 # BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
28 # LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
29 # CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
30 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
31 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
32 # POSSIBILITY OF SUCH DAMAGE.
33 
34 import os
35 import sys
36 import struct
37 import unittest
38 import time
39 
40 class TestRospyTransport(unittest.TestCase):
41 
42  def test_Transport(self):
43  from rospy.impl.transport import Transport, INBOUND, OUTBOUND, BIDIRECTIONAL
44  ids = []
45  for d in [INBOUND, OUTBOUND, BIDIRECTIONAL]:
46  t = Transport(d)
47  self.assertEquals(d, t.direction)
48  self.assertEquals("UNKNOWN", t.transport_type)
49  self.failIf(t.done)
50  self.assertEquals(None, t.cleanup_cb)
51  self.assertEquals('', t.endpoint_id)
52  self.assertEquals('unnamed', t.name)
53  self.assertEquals(0, t.stat_bytes)
54  self.assertEquals(0, t.stat_num_msg)
55  self.failIf(t.id in ids)
56  ids.append(t.id)
57 
58  t = Transport(d, 'a name')
59  self.assertEquals(d, t.direction)
60  self.assertEquals('a name', t.name)
61 
62  # test cleanup with and without a callback
63  t = Transport(INBOUND)
64  t.close()
65  self.assert_(t.done)
66  t = Transport(INBOUND)
67 
68  self.cleanup_obj = None
69  def cleanup(obj):
70  self.cleanup_obj = obj
71 
72  t.set_cleanup_callback(cleanup)
73  self.assertEquals(t.cleanup_cb, cleanup)
74  t.close()
75  self.assert_(t.done)
76  self.assertEquals(self.cleanup_obj, t)
77 
78  t = Transport(OUTBOUND)
79  import traceback
80  try:
81  t.send_message('msg', 1)
82  self.fail("send_message() should be abstract")
83  except: pass
84  try:
85  t.write_data('data')
86  self.fail("write_data() should be abstract")
87  except: pass
88  t = Transport(INBOUND)
89  try:
90  t.receive_once()
91  self.fail("receive_once() should be abstract")
92  except: pass
93  t = Transport(INBOUND)
94  try:
95  t.receive_loop()
96  self.fail("receive_loop() should be abstract")
97  except: pass
98 
99  def test_DeadTransport(self):
100  from rospy.impl.transport import Transport, DeadTransport, INBOUND, OUTBOUND, BIDIRECTIONAL
101  t = Transport(INBOUND, 'foo')
102  t.stat_bytes = 1234
103  t.stat_num_msg = 5678
104  dead = DeadTransport(t)
105  self.assertEquals(INBOUND, dead.direction)
106  self.assertEquals('foo', dead.name)
107  self.assertEquals(1234, dead.stat_bytes)
108  self.assertEquals(5678, dead.stat_num_msg)
109  self.assertEquals(True, dead.done)
110  self.assertEquals('', dead.endpoint_id)
111 
112  t = Transport(OUTBOUND, 'bar')
113  t.endpoint_id = 'blah blah'
114  t.close()
115  dead = DeadTransport(t)
116  self.assertEquals(OUTBOUND, dead.direction)
117  self.assertEquals('bar', dead.name)
118  self.assertEquals(True, dead.done)
119  self.assertEquals(t.endpoint_id, dead.endpoint_id)
120 
122  # tripwire tests
123  from rospy.impl.transport import ProtocolHandler
124  h = ProtocolHandler()
125  self.failIf(h.supports('TCPROS'))
126  self.assertEquals([], h.get_supported())
127  try:
128  h.create_connection("/topic", 'http://localhost:1234', ['TCPROS'])
129  self.fail("create_connection should raise an exception")
130  except: pass
131  try:
132  h.init_publisher("/topic", 'TCPROS')
133  self.fail("init_publisher raise an exception")
134  except: pass
135  h.shutdown()


test_rospy
Author(s): Ken Conley
autogenerated on Sun Feb 3 2019 03:30:22