test_InPortCorbaCdrProvider.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 # -*- Python -*-
3 
4 
5 # \file test_InPortCorbaCdrProvider.py
6 # \brief test for InPortCorbaCdrProvider class
7 # \date $Date: 2007/09/20 $
8 # \author Shinji Kurihara
9 #
10 # Copyright (C) 2003-2005
11 # Task-intelligence Research Group,
12 # Intelligent Systems Research Institute,
13 # National Institute of
14 # Advanced Industrial Science and Technology (AIST), Japan
15 # All rights reserved.
16 
17 
18 from omniORB import *
19 from omniORB import any
20 
21 import sys
22 sys.path.insert(1,"../")
23 
24 import unittest
25 
26 from InPortCorbaCdrProvider import *
27 
28 import RTC, RTC__POA
29 import OpenRTM
30 import OpenRTM_aist
31 
32 
33 class BufferMock:
34  def __init__(self):
35  self._data = None
36  return
37 
38  def write(self, data):
39  self._data = data
40  return OpenRTM_aist.BufferStatus.BUFFER_OK
41 
42  def read(self, value):
43  if len(value) > 0:
44  value[0] = self._data
45  else:
46  value.append(self._data)
47  return OpenRTM_aist.BufferStatus.BUFFER_OK
48 
49  def full(self):
50  return False
51 
52 
54  def __init__(self, buffer):
55  self._buffer = buffer
56  return
57 
58  def write(self, data):
59  self._buffer.write(data)
60  return OpenRTM_aist.BufferStatus.BUFFER_OK
61 
62 
63 
64 class TestInPortCorbaCdrProvider(unittest.TestCase):
65  def setUp(self):
67  OpenRTM_aist.CdrRingBufferInit()
68  self._prov = OpenRTM_aist.InPortProviderFactory.instance().createObject("corba_cdr")
69  self._inp = OpenRTM_aist.InPort("in",RTC.TimedLong(RTC.Time(0,0),0))
70  self._orb = OpenRTM_aist.Manager.instance().getORB()
72  return
73 
74  def test_init(self):
75  self._prov.init(OpenRTM_aist.Properties())
76  return
77 
78  def test_setBuffer(self):
79  self._prov.setBuffer(self._buffer)
80  return
81 
82  def test_put(self):
83  self._con = ConnectorMock(self._buffer)
84  self._prov._connector = self._con
85  self._prov.setBuffer(self._buffer)
86  data = RTC.TimedLong(RTC.Time(0,0),123)
87  cdr = cdrMarshal(any.to_any(data).typecode(), data, 1)
88  self.assertEqual(self._prov.put(cdr),OpenRTM.PORT_OK)
89  self.assertEqual(self._prov.put(cdr),OpenRTM.PORT_OK)
90  self.assertEqual(self._prov.put(cdr),OpenRTM.PORT_OK)
91  self.assertEqual(self._prov.put(cdr),OpenRTM.PORT_OK)
92  self.assertEqual(self._prov.put(cdr),OpenRTM.PORT_OK)
93  self.assertEqual(self._prov.put(cdr),OpenRTM.PORT_OK)
94  self.assertEqual(self._prov.put(cdr),OpenRTM.PORT_OK)
95  self.assertEqual(self._prov.put(cdr),OpenRTM.PORT_OK)
96  self.assertEqual(self._prov.put(cdr),OpenRTM.PORT_OK)
97  self.assertEqual(self._prov.put(cdr),OpenRTM.PORT_OK)
98  self.assertEqual(self._prov.put(cdr),OpenRTM.PORT_OK)
99  val = []
100  self.assertEqual(self._buffer.read(val), OpenRTM_aist.BufferStatus.BUFFER_OK)
101  get_data = cdrUnmarshal(any.to_any(data).typecode(), val[0], 1)
102  self.assertEqual(get_data.data, 123)
103  return
104 
105 
106 
107 ############### test #################
108 if __name__ == '__main__':
109  unittest.main()
110 
The Properties class represents a persistent set of properties.
Definition: Properties.py:83
InPort template class.
Definition: InPort.py:58


openrtm_aist_python
Author(s): Shinji Kurihara
autogenerated on Thu Jun 6 2019 19:11:34