test_InPort.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 # -*- Python -*-
3 
4 
5 # \file test_InPort.py
6 # \brief test for InPort template class
7 # \date $Date: 2007/09/20 $
8 # \author Shinji Kurihara
9 #
10 # Copyright (C) 2003-2005
11 # Task-intelligence Research Group,
12 # Intelligent Systems Research Institute,
13 # National Institute of
14 # Advanced Industrial Science and Technology (AIST), Japan
15 # All rights reserved.
16 
17 
18 import sys
19 sys.path.insert(1,"../")
20 
21 import unittest
22 
23 from InPort import *
24 
25 import RTC, RTC__POA
26 import OpenRTM_aist
27 
28 ShareCount = 0
29 
30 class OnRWTest:
31  def __init__(self):
32  pass
33 
34  def echo(self, value=None):
35  print "OnRW Called"
36 
37 
39  def __init__(self):
40  pass
41 
42  def echo(self, value=None):
43  print "OnRWConvert Called"
44  return value
45 
46 
47 class BufferMock:
48  def __init__(self):
49  self._ShareCount = 0
50  self._data = None
51  return
52 
53  def write(self, data):
54  global ShareCount
55  ShareCount += 1
56  self._ShareCount = ShareCount
57  self._data = data
58  return OpenRTM_aist.BufferStatus.BUFFER_OK
59 
60  def read(self, value):
61  global ShareCount
62  ShareCount -= 1
63  if ShareCount < 0:
64  ShareCount = 0
65  self._ShareCount = ShareCount
66  if len(value) > 0:
67  value[0] = self._data
68  else:
69  value.append(self._data)
70  return OpenRTM_aist.BufferStatus.BUFFER_OK
71 
72  def readable(self):
73  return self._ShareCount
74 
75 
77  def __init__(self, buffer):
78  self._buffer = buffer
79  return
80 
81  def write(self, data):
82  self._buffer.write(data)
83  return OpenRTM_aist.BufferStatus.BUFFER_OK
84 
85  def read(self, data):
86  self._buffer.read(data)
87  return OpenRTM_aist.DataPortStatus.PORT_OK
88 
89  def getBuffer(self):
90  ret = self._buffer.readable()
91  return self._buffer
92 
93 
94 class TestInPort(unittest.TestCase):
95  def setUp(self):
96  OpenRTM_aist.Manager.instance()
98  self._ipn = InPort("in", RTC.TimedLong(RTC.Time(0,0), 0), self._buffer)
100  self._ipn._connectors = [self._connector]
101 
102  def tearDown(self):
103  OpenRTM_aist.Manager.instance().shutdownManager()
104  return
105 
106  def test_name(self):
107  self.assertEqual(self._ipn.name(), "in")
108 
109  def test_read(self):
110  self.assertEqual(self._ipn.isEmpty(), True)
111  self.assertEqual(self._ipn.isNew(), False)
112  self._connector.write(RTC.TimedLong(RTC.Time(0,0), 123))
113  self.assertEqual(self._ipn.isEmpty(), False)
114  self.assertEqual(self._ipn.isNew(), True)
115  read_data = self._ipn.read()
116  self.assertEqual(self._ipn.isEmpty(), True)
117  self.assertEqual(self._ipn.isNew(), False)
118  self.assertEqual(read_data.data, 123)
119  self._ipn.update()
120  return
121 
122  def test_OnRead(self):
123  self._connector.write(RTC.TimedLong(RTC.Time(0,0), 456))
124  self._ipn.setOnRead(OnRWTest().echo)
125  self._ipn.setOnReadConvert(OnRWConvertTest().echo)
126  read_data = self._ipn.read()
127  self.assertEqual(read_data.data, 456)
128  return
129 
130 
131 
132 if __name__ == '__main__':
133  unittest.main()
134 
def read(self, value)
Definition: test_InPort.py:60
name
Definition: setup.py:543
def write(self, data)
Definition: test_InPort.py:53
def __init__(self, buffer)
Definition: test_InPort.py:77
def write(self, data)
Definition: test_InPort.py:81
def read(self, data)
Definition: test_InPort.py:85
def echo(self, value=None)
Definition: test_InPort.py:42
def echo(self, value=None)
Definition: test_InPort.py:34


openrtm_aist_python
Author(s): Shinji Kurihara
autogenerated on Mon Feb 28 2022 23:01:07