Go to the documentation of this file.00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019 import sys
00020 sys.path.insert(1,"../")
00021
00022 from omniORB import *
00023 from omniORB import any
00024
00025 import unittest
00026 from OutPortPushConnector import *
00027
00028 import OpenRTM
00029 import RTC
00030 import OpenRTM_aist
00031
00032 class MyBuffer:
00033 def __init__(self):
00034 self._data = None
00035 return
00036
00037 def write(self, data, sec=0, usec=0):
00038 self._data = data
00039 return
00040
00041 def read(self):
00042 return self._data
00043
00044 def init(self,info):
00045 return
00046
00047 class ConsumerMock:
00048 def __init__(self,buff):
00049 self._data = None
00050 self._buff = buff
00051 return
00052
00053 def init(self,prop):
00054 return
00055
00056 def write(self, data):
00057 self._data = data
00058 return OpenRTM.PORT_OK
00059
00060 def put(self, data):
00061 self._buff.write(data)
00062 return OpenRTM.PORT_OK
00063
00064 class TestOutPortPushConnector(unittest.TestCase):
00065 def setUp(self):
00066 self._buffer = MyBuffer()
00067 self._consumer = ConsumerMock(self._buffer)
00068 self._profile = OpenRTM_aist.ConnectorInfo("test",
00069 "id",
00070 ["in","out"],
00071 OpenRTM_aist.Properties())
00072
00073 self._oc = OutPortPushConnector(self._profile,self._consumer,OpenRTM_aist.ConnectorListeners(),self._buffer)
00074 return
00075
00076 def test_write(self):
00077 wdata = RTC.TimedLong(RTC.Time(0,0), 123)
00078 self._oc.write(wdata)
00079 val = self._buffer.read()
00080 rdata = RTC.TimedLong(RTC.Time(0,0), 0)
00081 get_data = cdrUnmarshal(any.to_any(rdata).typecode(),val,1)
00082 self.assertEqual(get_data.data, 123)
00083 return
00084
00085
00086 def test_disconnect(self):
00087 self.assertEqual(self._oc.disconnect(), OpenRTM_aist.DataPortStatus.PORT_OK)
00088 return
00089
00090
00091 def test_activate(self):
00092 self._oc.activate()
00093 return
00094
00095 def test_deactivate(self):
00096 self._oc.deactivate()
00097 return
00098
00099 def test_getBuffer(self):
00100 self.assertEqual(self._oc.getBuffer(),self._buffer)
00101 return
00102
00103 def test_createPublisher(self):
00104 self._oc.createPublisher(self._profile)
00105 return
00106
00107 def test_createBuffer(self):
00108 self._oc.createBuffer(self._profile)
00109 return
00110
00111
00112 if __name__ == '__main__':
00113 unittest.main()