test_rospy_transport.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 # Software License Agreement (BSD License)
3 #
4 # Copyright (c) 2008, Willow Garage, Inc.
5 # All rights reserved.
6 #
7 # Redistribution and use in source and binary forms, with or without
8 # modification, are permitted provided that the following conditions
9 # are met:
10 #
11 # * Redistributions of source code must retain the above copyright
12 # notice, this list of conditions and the following disclaimer.
13 # * Redistributions in binary form must reproduce the above
14 # copyright notice, this list of conditions and the following
15 # disclaimer in the documentation and/or other materials provided
16 # with the distribution.
17 # * Neither the name of Willow Garage, Inc. nor the names of its
18 # contributors may be used to endorse or promote products derived
19 # from this software without specific prior written permission.
20 #
21 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22 # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23 # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
24 # FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
25 # COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
26 # INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
27 # BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
28 # LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
29 # CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
30 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
31 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
32 # POSSIBILITY OF SUCH DAMAGE.
33 
34 import os
35 import sys
36 import struct
37 import unittest
38 import time
39 
40 class TestRospyTransport(unittest.TestCase):
41 
42  def test_Transport(self):
43  from rospy.impl.transport import Transport, INBOUND, OUTBOUND, BIDIRECTIONAL
44  ids = []
45  for d in [INBOUND, OUTBOUND, BIDIRECTIONAL]:
46  t = Transport(d)
47  self.assertEqual(d, t.direction)
48  self.assertEqual("UNKNOWN", t.transport_type)
49  self.assertFalse(t.done)
50  self.assertEqual(None, t.cleanup_cb)
51  self.assertEqual('', t.endpoint_id)
52  self.assertEqual('unnamed', t.name)
53  self.assertEqual(0, t.stat_bytes)
54  self.assertEqual(0, t.stat_num_msg)
55  self.assertFalse(t.id in ids)
56  ids.append(t.id)
57 
58  t = Transport(d, 'a name')
59  self.assertEqual(d, t.direction)
60  self.assertEqual('a name', t.name)
61 
62  # test cleanup with and without a callback
63  t = Transport(INBOUND)
64  t.close()
65  self.assertTrue(t.done)
66  t = Transport(INBOUND)
67 
68  self.cleanup_obj = None
69  def cleanup(obj):
70  self.cleanup_obj = obj
71 
72  t.set_cleanup_callback(cleanup)
73  self.assertEqual(t.cleanup_cb, cleanup)
74  t.close()
75  self.assertTrue(t.done)
76  self.assertEqual(self.cleanup_obj, t)
77 
78  t = Transport(OUTBOUND)
79  import traceback
80  try:
81  t.send_message('msg', 1)
82  self.fail("send_message() should be abstract")
83  except: pass
84  try:
85  t.write_data('data')
86  self.fail("write_data() should be abstract")
87  except: pass
88  t = Transport(INBOUND)
89  try:
90  t.receive_once()
91  self.fail("receive_once() should be abstract")
92  except: pass
93  t = Transport(INBOUND)
94  try:
95  t.receive_loop()
96  self.fail("receive_loop() should be abstract")
97  except: pass
98 
99  def test_DeadTransport(self):
100  from rospy.impl.transport import Transport, DeadTransport, INBOUND, OUTBOUND, BIDIRECTIONAL
101  t = Transport(INBOUND, 'foo')
102  t.stat_bytes = 1234
103  t.stat_num_msg = 5678
104  dead = DeadTransport(t)
105  self.assertEqual(INBOUND, dead.direction)
106  self.assertEqual('foo', dead.name)
107  self.assertEqual(1234, dead.stat_bytes)
108  self.assertEqual(5678, dead.stat_num_msg)
109  self.assertEqual(True, dead.done)
110  self.assertEqual('', dead.endpoint_id)
111 
112  t = Transport(OUTBOUND, 'bar')
113  t.endpoint_id = 'blah blah'
114  t.close()
115  dead = DeadTransport(t)
116  self.assertEqual(OUTBOUND, dead.direction)
117  self.assertEqual('bar', dead.name)
118  self.assertEqual(True, dead.done)
119  self.assertEqual(t.endpoint_id, dead.endpoint_id)
120 
122  # tripwire tests
123  from rospy.impl.transport import ProtocolHandler
124  h = ProtocolHandler()
125  self.assertFalse(h.supports('TCPROS'))
126  self.assertEqual([], h.get_supported())
127  try:
128  h.create_connection("/topic", 'http://localhost:1234', ['TCPROS'])
129  self.fail("create_connection should raise an exception")
130  except: pass
131  try:
132  h.init_publisher("/topic", 'TCPROS')
133  self.fail("init_publisher raise an exception")
134  except: pass
135  h.shutdown()
unit.test_rospy_transport.TestRospyTransport.test_ProtocolHandler
def test_ProtocolHandler(self)
Definition: test_rospy_transport.py:121
unit.test_rospy_transport.TestRospyTransport
Definition: test_rospy_transport.py:40
unit.test_rospy_transport.TestRospyTransport.cleanup_obj
cleanup_obj
Definition: test_rospy_transport.py:68
unit.test_rospy_transport.TestRospyTransport.test_Transport
def test_Transport(self)
Definition: test_rospy_transport.py:42
unit.test_rospy_transport.TestRospyTransport.test_DeadTransport
def test_DeadTransport(self)
Definition: test_rospy_transport.py:99


test_rospy
Author(s): Ken Conley, Dirk Thomas , Jacob Perron
autogenerated on Tue May 20 2025 03:00:44