test_InPortCorbaConsumer.py
Go to the documentation of this file.
00001 #!/usr/bin/env python
00002 # -*- Python -*-
00003 
00004 #
00005 #  \file  test_InPortCorbaConsumer.py
00006 #  \brief test for InPortCorbaConsumer class
00007 #  \date  $Date: 2007/09/21 $
00008 #  \author Shinji Kurihara
00009 # 
00010 #  Copyright (C) 2006
00011 #      Noriaki Ando
00012 #      Task-intelligence Research Group,
00013 #      Intelligent Systems Research Institute,
00014 #      National Institute of
00015 #          Advanced Industrial Science and Technology (AIST), Japan
00016 #      All rights reserved.
00017  
00018 
00019 import OpenRTM_aist
00020 
00021 import sys
00022 sys.path.insert(1,"../")
00023 
00024 import unittest
00025 
00026 from InPortCorbaConsumer import *
00027 from NVUtil import *
00028  
00029 from omniORB import CORBA
00030 import RTC, RTC__POA
00031 import SDOPackage,SDOPackage__POA
00032 from omniORB import any
00033 
00034 
00035 class InPortTest(RTC__POA.InPortAny):
00036         def __init__(self):
00037                 self.orb = CORBA.ORB_init()
00038                 self.poa = self.orb.resolve_initial_references("RootPOA")
00039                 poaManager = self.poa._get_the_POAManager()
00040                 poaManager.activate()
00041                 
00042                 
00043         def put(self, data):
00044                 print "put data: ", data
00045 
00046                 
00047 class test(OpenRTM_aist.RTObject_impl):
00048         def __init__(self):
00049                 self.orb = CORBA.ORB_init()
00050                 self.poa = self.orb.resolve_initial_references("RootPOA")
00051                 OpenRTM_aist.RTObject_impl.__init__(self, orb=self.orb, poa=self.poa)
00052                 poaManager = self.poa._get_the_POAManager()
00053                 poaManager.activate()
00054 
00055 
00056 class TestInPortCorbaConsumer(unittest.TestCase):
00057         def setUp(self):
00058                 ringbuf = OpenRTM_aist.RingBuffer(8)
00059                 ringbuf.init(RTC.TimedLong(RTC.Time(0,0), 0))
00060                 self._ipcc = InPortCorbaConsumer(OpenRTM_aist.OutPort("out",
00061                                                                                                                  RTC.TimedLong(RTC.Time(0,0), 0),
00062                                                                                                                  ringbuf))
00063                 
00064         def test_equal_operator(self):
00065                 self.assertEqual(self._ipcc.equal_operator(self._ipcc), self._ipcc)
00066 
00067         def test_put(self):
00068                 self._ipcc.setObject(InPortTest()._this())
00069                 self._ipcc.put(RTC.TimedLong(RTC.Time(0,0), 123))
00070 
00071         def test_push(self):
00072                 self._ipcc.setObject(InPortTest()._this())
00073                 self._ipcc.push()
00074 
00075         def test_clone(self):
00076                 self._ipcc.clone()
00077 
00078         def test_subscribeInterface(self):
00079                 port = any.to_any(InPortTest()._this())
00080 #               prop = [SDOPackage.NameValue("dataport.dataflow_type","Push"),
00081 #                               SDOPackage.NameValue("dataport.corba_any.inport_ref",port)]
00082                 prop = [newNV("dataport.dataflow_type","Push"),
00083                                                         newNV("dataport.corba_any.inport_ref",port)]
00084                 self.assertEqual(self._ipcc.subscribeInterface(prop), True)
00085 
00086                 
00087         def unsubscribeInterface(self, properties):
00088                 pass
00089 
00090 
00091 ############### test #################
00092 if __name__ == '__main__':
00093         unittest.main()


openrtm_aist_python
Author(s): Shinji Kurihara
autogenerated on Thu Aug 27 2015 14:17:28