test_OutPortCorbaConsumer.py
Go to the documentation of this file.
00001 #!/usr/bin/env python
00002 # -*- Python -*-
00003 
00004 #
00005 #  \file  test_OutPortCorbaConsumer.py
00006 #  \brief test for OutPortCorbaConsumer class
00007 #  \date  $Date: 2007/09/26 $
00008 #  \author Shinji Kurihara
00009 # 
00010 #  Copyright (C) 2006
00011 #      Noriaki Ando
00012 #      Task-intelligence Research Group,
00013 #      Intelligent Systems Research Institute,
00014 #      National Institute of
00015 #          Advanced Industrial Science and Technology (AIST), Japan
00016 #      All rights reserved.
00017  
00018 from omniORB import any
00019 from omniORB import CORBA
00020 
00021 import OpenRTM_aist
00022 import RTC, RTC__POA 
00023 import SDOPackage, SDOPackage__POA
00024 
00025 import sys
00026 sys.path.insert(1,"../")
00027 
00028 import unittest
00029 
00030 from OutPortCorbaConsumer import *
00031 
00032 class OutPortTest(RTC__POA.OutPortAny):
00033         def __init__(self):
00034                 self.orb = CORBA.ORB_init()
00035                 self.poa = self.orb.resolve_initial_references("RootPOA")
00036                 poaManager = self.poa._get_the_POAManager()
00037                 poaManager.activate()
00038                 
00039                 
00040         def get(self):
00041                 print "Called get operation."
00042                 return any.to_any(RTC.TimedLong(RTC.Time(0,0),123))
00043 
00044 
00045 class TestOutPortCorbaConsumer(unittest.TestCase):
00046 
00047         def setUp(self):
00048                 ringbuf = OpenRTM_aist.RingBuffer(8)
00049                 ringbuf.init(RTC.TimedLong(RTC.Time(0,0),0))
00050                 self._opcc = OutPortCorbaConsumer(OpenRTM_aist.InPort("in",
00051                                                                                                                  RTC.TimedLong(RTC.Time(0,0),0),
00052                                                                                                                  ringbuf))
00053                 
00054 
00055     
00056         def test_get(self):
00057                 self._opcc.setObject(OutPortTest()._this())
00058                 data=[None]
00059                 self.assertEqual(self._opcc.get(data),True)
00060                 self.assertEqual(data[0].data,123)
00061 
00062 
00063         def test_pull(self):
00064                 self._opcc.pull()
00065 
00066 
00067         def test_subscribeInterface(self):
00068                 port = any.to_any(OutPortTest()._this())
00069                 prop = [SDOPackage.NameValue("dataport.dataflow_type","Push"),
00070                                 SDOPackage.NameValue("dataport.corba_any.outport_ref",port)]
00071                 self.assertEqual(self._opcc.subscribeInterface(prop), True)
00072                 self._opcc.unsubscribeInterface(prop)
00073 
00074 
00075 ############### test #################
00076 if __name__ == '__main__':
00077         unittest.main()


openrtm_aist_python
Author(s): Shinji Kurihara
autogenerated on Thu Aug 27 2015 14:17:28