test_OutPortPushConnector.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 # -*- Python -*-
3 
4 #
5 # \file test_OutPortPushConnector.py
6 # \brief test for OutPortPushConnector class
7 # \date $Date: 2007/09/19$
8 # \author Shinji Kurihara
9 #
10 # Copyright (C) 2006
11 # Noriaki Ando
12 # Task-intelligence Research Group,
13 # Intelligent Systems Research Institute,
14 # National Institute of
15 # Advanced Industrial Science and Technology (AIST), Japan
16 # All rights reserved.
17 #
18 
19 import sys
20 sys.path.insert(1,"../")
21 
22 from omniORB import *
23 from omniORB import any
24 
25 import unittest
26 from OutPortPushConnector import *
27 
28 import OpenRTM
29 import RTC
30 import OpenRTM_aist
31 
32 class MyBuffer:
33  def __init__(self):
34  self._data = None
35  return
36 
37  def write(self, data, sec=0, usec=0):
38  self._data = data
39  return
40 
41  def read(self):
42  return self._data
43 
44  def init(self,info):
45  return
46 
47 class ConsumerMock:
48  def __init__(self,buff):
49  self._data = None
50  self._buff = buff
51  return
52 
53  def init(self,prop):
54  return
55 
56  def write(self, data):
57  self._data = data
58  return OpenRTM.PORT_OK
59 
60  def put(self, data):
61  self._buff.write(data)
62  return OpenRTM.PORT_OK
63 
64 class TestOutPortPushConnector(unittest.TestCase):
65  def setUp(self):
66  self._buffer = MyBuffer()
69  "id",
70  ["in","out"],
72 
74  return
75 
76  def test_write(self):
77  wdata = RTC.TimedLong(RTC.Time(0,0), 123)
78  self._oc.write(wdata)
79  val = self._buffer.read()
80  rdata = RTC.TimedLong(RTC.Time(0,0), 0)
81  get_data = cdrUnmarshal(any.to_any(rdata).typecode(),val,1)
82  self.assertEqual(get_data.data, 123)
83  return
84 
85 
86  def test_disconnect(self):
87  self.assertEqual(self._oc.disconnect(), OpenRTM_aist.DataPortStatus.PORT_OK)
88  return
89 
90 
91  def test_activate(self):
92  self._oc.activate()
93  return
94 
95  def test_deactivate(self):
96  self._oc.deactivate()
97  return
98 
99  def test_getBuffer(self):
100  self.assertEqual(self._oc.getBuffer(),self._buffer)
101  return
102 
104  self._oc.createPublisher(self._profile)
105  return
106 
107  def test_createBuffer(self):
108  self._oc.createBuffer(self._profile)
109  return
110 
111 
112 if __name__ == '__main__':
113  unittest.main()
The Properties class represents a persistent set of properties.
Definition: Properties.py:83
def write(self, data, sec=0, usec=0)


openrtm_aist_python
Author(s): Shinji Kurihara
autogenerated on Mon Feb 28 2022 23:01:07