test_InPortCorbaConsumer.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 # -*- Python -*-
3 
4 #
5 # \file test_InPortCorbaConsumer.py
6 # \brief test for InPortCorbaConsumer class
7 # \date $Date: 2007/09/21 $
8 # \author Shinji Kurihara
9 #
10 # Copyright (C) 2006
11 # Noriaki Ando
12 # Task-intelligence Research Group,
13 # Intelligent Systems Research Institute,
14 # National Institute of
15 # Advanced Industrial Science and Technology (AIST), Japan
16 # All rights reserved.
17 
18 
19 import OpenRTM_aist
20 
21 import sys
22 sys.path.insert(1,"../")
23 
24 import unittest
25 
26 from InPortCorbaConsumer import *
27 from NVUtil import *
28 
29 from omniORB import CORBA
30 import RTC, RTC__POA
31 import SDOPackage,SDOPackage__POA
32 from omniORB import any
33 
34 
35 class InPortTest(RTC__POA.InPortAny):
36  def __init__(self):
37  self.orb = CORBA.ORB_init()
38  self.poa = self.orb.resolve_initial_references("RootPOA")
39  poaManager = self.poa._get_the_POAManager()
40  poaManager.activate()
41 
42 
43  def put(self, data):
44  print "put data: ", data
45 
46 
48  def __init__(self):
49  self.orb = CORBA.ORB_init()
50  self.poa = self.orb.resolve_initial_references("RootPOA")
51  OpenRTM_aist.RTObject_impl.__init__(self, orb=self.orb, poa=self.poa)
52  poaManager = self.poa._get_the_POAManager()
53  poaManager.activate()
54 
55 
56 class TestInPortCorbaConsumer(unittest.TestCase):
57  def setUp(self):
58  ringbuf = OpenRTM_aist.RingBuffer(8)
59  ringbuf.init(RTC.TimedLong(RTC.Time(0,0), 0))
61  RTC.TimedLong(RTC.Time(0,0), 0),
62  ringbuf))
63 
65  self.assertEqual(self._ipcc.equal_operator(self._ipcc), self._ipcc)
66 
67  def test_put(self):
68  self._ipcc.setObject(InPortTest()._this())
69  self._ipcc.put(RTC.TimedLong(RTC.Time(0,0), 123))
70 
71  def test_push(self):
72  self._ipcc.setObject(InPortTest()._this())
73  self._ipcc.push()
74 
75  def test_clone(self):
76  self._ipcc.clone()
77 
79  port = any.to_any(InPortTest()._this())
80 # prop = [SDOPackage.NameValue("dataport.dataflow_type","Push"),
81 # SDOPackage.NameValue("dataport.corba_any.inport_ref",port)]
82  prop = [newNV("dataport.dataflow_type","Push"),
83  newNV("dataport.corba_any.inport_ref",port)]
84  self.assertEqual(self._ipcc.subscribeInterface(prop), True)
85 
86 
87  def unsubscribeInterface(self, properties):
88  pass
89 
90 
91 ############### test #################
92 if __name__ == '__main__':
93  unittest.main()
def newNV(name, value)
Create NameVale.
Definition: NVUtil.py:50


openrtm_aist_python
Author(s): Shinji Kurihara
autogenerated on Thu Jun 6 2019 19:11:34