test_InPort.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 # -*- Python -*-
3 
4 
5 # \file test_InPort.py
6 # \brief test for InPort template class
7 # \date $Date: 2007/09/20 $
8 # \author Shinji Kurihara
9 #
10 # Copyright (C) 2003-2005
11 # Task-intelligence Research Group,
12 # Intelligent Systems Research Institute,
13 # National Institute of
14 # Advanced Industrial Science and Technology (AIST), Japan
15 # All rights reserved.
16 
17 
18 import sys
19 sys.path.insert(1,"../")
20 
21 import unittest
22 
23 from InPort import *
24 
25 import RTC, RTC__POA
26 import OpenRTM_aist
27 
28 ShareCount = 0
29 
30 class OnRWTest:
31  def __init__(self):
32  pass
33 
34  def echo(self, value=None):
35  print("OnRW Called")
36 
37 
39  def __init__(self):
40  pass
41 
42  def echo(self, value=None):
43  print("OnRWConvert Called")
44  return value
45 
46 
47 class BufferMock:
48  def __init__(self):
49  self._ShareCount = 0
50  self._data = None
51  return
52 
53  def write(self, data):
54  global ShareCount
55  ShareCount += 1
56  self._ShareCount = ShareCount
57  self._data = data
58  return OpenRTM_aist.BufferStatus.BUFFER_OK
59 
60  def read(self, value):
61  global ShareCount
62  ShareCount -= 1
63  if ShareCount < 0:
64  ShareCount = 0
65  self._ShareCount = ShareCount
66  if len(value) > 0:
67  value[0] = self._data
68  else:
69  value.append(self._data)
70  return OpenRTM_aist.BufferStatus.BUFFER_OK
71 
72  def readable(self):
73  return self._ShareCount
74 
75 
77  def __init__(self, buffer):
78  self._buffer = buffer
79  return
80 
81  def write(self, data):
82  self._buffer.write(data)
83  return OpenRTM_aist.BufferStatus.BUFFER_OK
84 
85  def read(self, data):
86  self._buffer.read(data)
87  return OpenRTM_aist.DataPortStatus.PORT_OK
88 
89  def getBuffer(self):
90  ret = self._buffer.readable()
91  return self._buffer
92 
93 
94 class TestInPort(unittest.TestCase):
95  def setUp(self):
96  OpenRTM_aist.Manager.instance()
98  self._ipn = InPort("in", RTC.TimedLong(RTC.Time(0,0), 0), self._buffer)
100  self._ipn._connectors = [self._connector]
101 
102  def tearDown(self):
103  OpenRTM_aist.Manager.instance().shutdownManager()
104  return
105 
106  def test_name(self):
107  self.assertEqual(self._ipn.name(), "in")
108 
109  def test_read(self):
110  self.assertEqual(self._ipn.isEmpty(), True)
111  self.assertEqual(self._ipn.isNew(), False)
112  self._connector.write(RTC.TimedLong(RTC.Time(0,0), 123))
113  self.assertEqual(self._ipn.isEmpty(), False)
114  self.assertEqual(self._ipn.isNew(), True)
115  read_data = self._ipn.read()
116  self.assertEqual(self._ipn.isEmpty(), True)
117  self.assertEqual(self._ipn.isNew(), False)
118  self.assertEqual(read_data.data, 123)
119  self._ipn.update()
120  return
121 
122  def test_OnRead(self):
123  self._connector.write(RTC.TimedLong(RTC.Time(0,0), 456))
124  self._ipn.setOnRead(OnRWTest().echo)
125  self._ipn.setOnReadConvert(OnRWConvertTest().echo)
126  read_data = self._ipn.read()
127  self.assertEqual(read_data.data, 456)
128  return
129 
130 
131 
132 if __name__ == '__main__':
133  unittest.main()
134 
test_InPort.TestInPort.test_name
def test_name(self)
Definition: test_InPort.py:106
test_InPort.ConnectorMock.getBuffer
def getBuffer(self)
Definition: test_InPort.py:89
test_InPort.BufferMock
Definition: test_InPort.py:47
test_InPort.BufferMock._ShareCount
_ShareCount
Definition: test_InPort.py:49
test_InPort.TestInPort._connector
_connector
Definition: test_InPort.py:99
InPort
test_InPort.ConnectorMock.write
def write(self, data)
Definition: test_InPort.py:81
test_InPort.BufferMock.write
def write(self, data)
Definition: test_InPort.py:53
test_InPort.ConnectorMock._buffer
_buffer
Definition: test_InPort.py:78
test_InPort.OnRWConvertTest.echo
def echo(self, value=None)
Definition: test_InPort.py:42
test_InPort.OnRWConvertTest
Definition: test_InPort.py:38
test_InPort.TestInPort.setUp
def setUp(self)
Definition: test_InPort.py:95
test_InPort.TestInPort.tearDown
def tearDown(self)
Definition: test_InPort.py:102
test_InPort.OnRWTest.__init__
def __init__(self)
Definition: test_InPort.py:31
test_InPort.ConnectorMock.__init__
def __init__(self, buffer)
Definition: test_InPort.py:77
test_InPort.OnRWTest
Definition: test_InPort.py:30
test_InPort.TestInPort._ipn
_ipn
Definition: test_InPort.py:98
setup.name
name
Definition: setup.py:543
test_InPort.ConnectorMock
Definition: test_InPort.py:76
test_InPort.OnRWConvertTest.__init__
def __init__(self)
Definition: test_InPort.py:39
test_InPort.TestInPort.test_OnRead
def test_OnRead(self)
Definition: test_InPort.py:122
test_InPort.BufferMock.read
def read(self, value)
Definition: test_InPort.py:60
test_InPort.TestInPort._buffer
_buffer
Definition: test_InPort.py:97
test_InPort.ConnectorMock.read
def read(self, data)
Definition: test_InPort.py:85
test_InPort.BufferMock.readable
def readable(self)
Definition: test_InPort.py:72
test_InPort.OnRWTest.echo
def echo(self, value=None)
Definition: test_InPort.py:34
test_InPort.BufferMock._data
_data
Definition: test_InPort.py:50
test_InPort.BufferMock.__init__
def __init__(self)
Definition: test_InPort.py:48
test_InPort.TestInPort
Definition: test_InPort.py:94
test_InPort.TestInPort.test_read
def test_read(self)
Definition: test_InPort.py:109


openrtm_aist_python
Author(s): Shinji Kurihara
autogenerated on Mon Apr 21 2025 02:45:06