test_rospy_tcpros_base.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 # Software License Agreement (BSD License)
3 #
4 # Copyright (c) 2008, Willow Garage, Inc.
5 # All rights reserved.
6 #
7 # Redistribution and use in source and binary forms, with or without
8 # modification, are permitted provided that the following conditions
9 # are met:
10 #
11 # * Redistributions of source code must retain the above copyright
12 # notice, this list of conditions and the following disclaimer.
13 # * Redistributions in binary form must reproduce the above
14 # copyright notice, this list of conditions and the following
15 # disclaimer in the documentation and/or other materials provided
16 # with the distribution.
17 # * Neither the name of Willow Garage, Inc. nor the names of its
18 # contributors may be used to endorse or promote products derived
19 # from this software without specific prior written permission.
20 #
21 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22 # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23 # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
24 # FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
25 # COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
26 # INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
27 # BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
28 # LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
29 # CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
30 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
31 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
32 # POSSIBILITY OF SUCH DAMAGE.
33 
34 import os
35 import sys
36 import struct
37 import unittest
38 import time
39 try:
40  from cStringIO import StringIO
41 except ImportError:
42  from io import StringIO
43 
44 import rospy
45 
46 g_fileno = 1
47 
48 class MockSock:
49  def __init__(self, buff):
50  global g_fileno
51  g_fileno += 1
52  self.buff = buff
53  self._fileno = g_fileno
54  def fileno(self):
55  return self._fileno
56  def recv(self, buff_size):
57  return self.buff[:buff_size]
58  def close(self):
59  self.buff = None
60  def getsockname(self):
61  return (None, None)
63  def __init__(self):
64  global g_fileno
65  g_fileno += 1
66  self._fileno = g_fileno
67  def fileno(self):
68  return self._fileno
69  def recv(self, buff_size):
70  return ''
71  def close(self):
72  self.buff = None
73 
74 class TestRospyTcprosBase(unittest.TestCase):
75 
76  def test_constants(self):
77  self.assertEquals("TCPROS", rospy.impl.tcpros_base.TCPROS)
78  self.assert_(type(rospy.impl.tcpros_base.DEFAULT_BUFF_SIZE), int)
79 
80  def test_recv_buff(self):
81  from rospy.impl.tcpros_base import recv_buff
82 
83 
84  buff = StringIO()
85  try:
86  recv_buff(MockEmptySock(), buff, 1)
87  self.fail("recv_buff should have raised TransportTerminated")
88  except rospy.impl.tcpros_base.TransportTerminated:
89  self.assertEquals('', buff.getvalue())
90 
91  self.assertEquals(5, recv_buff(MockSock('1234567890'), buff, 5))
92  self.assertEquals('12345', buff.getvalue())
93  buff = StringIO()
94 
95  self.assertEquals(10, recv_buff(MockSock('1234567890'), buff, 100))
96  self.assertEquals('1234567890', buff.getvalue())
97 
98  def test_TCPServer(self):
99  from rospy.impl.tcpros_base import TCPServer
100  def handler(sock, addr):
101  pass
102  s = None
103  try:
104  s = TCPServer(handler)
105  self.assert_(s.port > 0)
106  addr, port = s.get_full_addr()
107  self.assert_(type(addr) == str)
108  self.assertEquals(handler, s.inbound_handler)
109  self.failIf(s.is_shutdown)
110  finally:
111  if s is not None:
112  s.shutdown()
113  self.assert_(s.is_shutdown)
114 
116  import rospy
117  import random
118 
119  from rospy.impl.tcpros_base import TCPROSTransportProtocol
120  from rospy.impl.transport import BIDIRECTIONAL
121 
122  p = TCPROSTransportProtocol('Bob', rospy.AnyMsg)
123  self.assertEquals('Bob', p.resolved_name)
124  self.assertEquals(rospy.AnyMsg, p.recv_data_class)
125  self.assertEquals(BIDIRECTIONAL, p.direction)
126  self.assertEquals({}, p.get_header_fields())
127  self.assertEquals(rospy.impl.tcpros_base.DEFAULT_BUFF_SIZE, p.buff_size)
128 
129  v = random.randint(1, 100)
130  p = TCPROSTransportProtocol('Bob', rospy.AnyMsg, queue_size=v)
131  self.assertEquals(v, p.queue_size)
132 
133  v = random.randint(1, 100)
134  p = TCPROSTransportProtocol('Bob', rospy.AnyMsg, buff_size=v)
135  self.assertEquals(v, p.buff_size)
136 
138  import rospy.impl.tcpros_base
139  from rospy.impl.tcpros_base import TCPROSTransport, TCPROSTransportProtocol
140  from rospy.impl.transport import OUTBOUND
141  p = TCPROSTransportProtocol('Bob', rospy.AnyMsg)
142  p.direction = OUTBOUND
143 
144  try:
145  TCPROSTransport(p, '')
146  self.fail("TCPROSTransport should not accept bad name")
147  except rospy.impl.tcpros_base.TransportInitError: pass
148 
149  t = TCPROSTransport(p, 'transport-name')
150  self.assert_(t.socket is None)
151  self.assert_(t.md5sum is None)
152  self.assert_(t.type is None)
153  self.assertEquals(p, t.protocol)
154  self.assertEquals('TCPROS', t.transport_type)
155  self.assertEquals(OUTBOUND, t.direction)
156  self.assertEquals('unknown', t.endpoint_id)
157  self.assertEquals(b'', t.read_buff.getvalue())
158  self.assertEquals(b'', t.write_buff.getvalue())
159 
160  s = MockSock('12345')
161  t.set_socket(s, 'new_endpoint_id')
162  self.assertEquals('new_endpoint_id', t.endpoint_id)
163  self.assertEquals(s, t.socket)
164 
165  t.close()
166  self.assert_(t.socket is None)
167  self.assert_(t.read_buff is None)
168  self.assert_(t.write_buff is None)
169  self.assert_(t.protocol is None)


test_rospy
Author(s): Ken Conley, Dirk Thomas
autogenerated on Mon Nov 2 2020 03:52:56