test_InPortCorbaCdrProvider.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 # -*- Python -*-
3 
4 
5 # \file test_InPortCorbaCdrProvider.py
6 # \brief test for InPortCorbaCdrProvider class
7 # \date $Date: 2007/09/20 $
8 # \author Shinji Kurihara
9 #
10 # Copyright (C) 2003-2005
11 # Task-intelligence Research Group,
12 # Intelligent Systems Research Institute,
13 # National Institute of
14 # Advanced Industrial Science and Technology (AIST), Japan
15 # All rights reserved.
16 
17 
18 from omniORB import *
19 from omniORB import any
20 
21 import sys
22 sys.path.insert(1,"../")
23 
24 import unittest
25 
26 from InPortCorbaCdrProvider import *
27 
28 import RTC, RTC__POA
29 import OpenRTM
30 import OpenRTM_aist
31 
32 
33 class BufferMock:
34  def __init__(self):
35  self._data = None
36  return
37 
38  def write(self, data):
39  self._data = data
40  return OpenRTM_aist.BufferStatus.BUFFER_OK
41 
42  def read(self, value):
43  if len(value) > 0:
44  value[0] = self._data
45  else:
46  value.append(self._data)
47  return OpenRTM_aist.BufferStatus.BUFFER_OK
48 
49  def full(self):
50  return False
51 
52 
54  def __init__(self, buffer):
55  self._buffer = buffer
56  return
57 
58  def write(self, data):
59  self._buffer.write(data)
60  return OpenRTM_aist.BufferStatus.BUFFER_OK
61 
62 
63 
64 class TestInPortCorbaCdrProvider(unittest.TestCase):
65  def setUp(self):
67  OpenRTM_aist.CdrRingBufferInit()
68  self._prov = OpenRTM_aist.InPortProviderFactory.instance().createObject("corba_cdr")
69  self._inp = OpenRTM_aist.InPort("in",RTC.TimedLong(RTC.Time(0,0),0))
70  self._orb = OpenRTM_aist.Manager.instance().getORB()
72  return
73 
74  def test_init(self):
75  self._prov.init(OpenRTM_aist.Properties())
76  return
77 
78  def test_setBuffer(self):
79  self._prov.setBuffer(self._buffer)
80  return
81 
82  def test_put(self):
83  self._con = ConnectorMock(self._buffer)
84  self._prov._connector = self._con
85  self._prov.setBuffer(self._buffer)
86  data = RTC.TimedLong(RTC.Time(0,0),123)
87  cdr = cdrMarshal(any.to_any(data).typecode(), data, 1)
88  self.assertEqual(self._prov.put(cdr),OpenRTM.PORT_OK)
89  self.assertEqual(self._prov.put(cdr),OpenRTM.PORT_OK)
90  self.assertEqual(self._prov.put(cdr),OpenRTM.PORT_OK)
91  self.assertEqual(self._prov.put(cdr),OpenRTM.PORT_OK)
92  self.assertEqual(self._prov.put(cdr),OpenRTM.PORT_OK)
93  self.assertEqual(self._prov.put(cdr),OpenRTM.PORT_OK)
94  self.assertEqual(self._prov.put(cdr),OpenRTM.PORT_OK)
95  self.assertEqual(self._prov.put(cdr),OpenRTM.PORT_OK)
96  self.assertEqual(self._prov.put(cdr),OpenRTM.PORT_OK)
97  self.assertEqual(self._prov.put(cdr),OpenRTM.PORT_OK)
98  self.assertEqual(self._prov.put(cdr),OpenRTM.PORT_OK)
99  val = []
100  self.assertEqual(self._buffer.read(val), OpenRTM_aist.BufferStatus.BUFFER_OK)
101  get_data = cdrUnmarshal(any.to_any(data).typecode(), val[0], 1)
102  self.assertEqual(get_data.data, 123)
103  return
104 
105 
106 
107 
108 if __name__ == '__main__':
109  unittest.main()
110 
test_InPortCorbaCdrProvider.TestInPortCorbaCdrProvider._prov
_prov
Definition: test_InPortCorbaCdrProvider.py:68
OpenRTM_aist.InPortCorbaCdrProvider.InPortCorbaCdrProviderInit
def InPortCorbaCdrProviderInit()
Definition: InPortCorbaCdrProvider.py:272
test_InPortCorbaCdrProvider.BufferMock.read
def read(self, value)
Definition: test_InPortCorbaCdrProvider.py:42
test_InPortCorbaCdrProvider.BufferMock.__init__
def __init__(self)
Definition: test_InPortCorbaCdrProvider.py:34
test_InPortCorbaCdrProvider.BufferMock._data
_data
Definition: test_InPortCorbaCdrProvider.py:35
test_InPortCorbaCdrProvider.ConnectorMock
Definition: test_InPortCorbaCdrProvider.py:53
test_InPortCorbaCdrProvider.TestInPortCorbaCdrProvider.test_put
def test_put(self)
Definition: test_InPortCorbaCdrProvider.py:82
test_InPortCorbaCdrProvider.TestInPortCorbaCdrProvider._buffer
_buffer
Definition: test_InPortCorbaCdrProvider.py:71
test_InPortCorbaCdrProvider.ConnectorMock._buffer
_buffer
Definition: test_InPortCorbaCdrProvider.py:55
test_InPortCorbaCdrProvider.TestInPortCorbaCdrProvider
Definition: test_InPortCorbaCdrProvider.py:64
test_InPortCorbaCdrProvider.BufferMock
Definition: test_InPortCorbaCdrProvider.py:33
test_InPortCorbaCdrProvider.TestInPortCorbaCdrProvider._orb
_orb
Definition: test_InPortCorbaCdrProvider.py:70
test_InPortCorbaCdrProvider.TestInPortCorbaCdrProvider.setUp
def setUp(self)
Definition: test_InPortCorbaCdrProvider.py:65
test_InPortCorbaCdrProvider.TestInPortCorbaCdrProvider.test_setBuffer
def test_setBuffer(self)
Definition: test_InPortCorbaCdrProvider.py:78
OpenRTM_aist.InPort.InPort
InPort template class.
Definition: InPort.py:58
test_InPortCorbaCdrProvider.ConnectorMock.__init__
def __init__(self, buffer)
Definition: test_InPortCorbaCdrProvider.py:54
OpenRTM_aist.Properties.Properties
Definition: Properties.py:83
test_InPortCorbaCdrProvider.TestInPortCorbaCdrProvider._con
_con
Definition: test_InPortCorbaCdrProvider.py:83
test_InPortCorbaCdrProvider.BufferMock.write
def write(self, data)
Definition: test_InPortCorbaCdrProvider.py:38
test_InPortCorbaCdrProvider.TestInPortCorbaCdrProvider.test_init
def test_init(self)
Definition: test_InPortCorbaCdrProvider.py:74
test_InPortCorbaCdrProvider.ConnectorMock.write
def write(self, data)
Definition: test_InPortCorbaCdrProvider.py:58
test_InPortCorbaCdrProvider.BufferMock.full
def full(self)
Definition: test_InPortCorbaCdrProvider.py:49
test_InPortCorbaCdrProvider.TestInPortCorbaCdrProvider._inp
_inp
Definition: test_InPortCorbaCdrProvider.py:69


openrtm_aist_python
Author(s): Shinji Kurihara
autogenerated on Mon Apr 21 2025 02:45:07