test_PortBase.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 # -*- Python -*-
3 
4 #
5 # \file test_PortBase.py
6 # \brief test for RTC's Port base class
7 # \date $Date: 2007/09/18 $
8 # \author Shinji Kurihara
9 #
10 # Copyright (C) 2006
11 # Task-intelligence Research Group,
12 # Intelligent Systems Research Institute,
13 # National Institute of
14 # Advanced Industrial Science and Technology (AIST), Japan
15 # All rights reserved.
16 #
17 
18 import sys
19 sys.path.insert(1,"../")
20 
21 import unittest
22 from PortBase import *
23 
24 import CORBA
25 import OpenRTM_aist
26 import RTC, RTC__POA
27 
28 
30  def __init_(self, manager):
31  OpenRTM_aist.DataFlowComponentBase.__init__(self, manager)
32 
34  def __init__(self):
35  OpenRTM_aist.DataFlowComponentBase.__init__(self, OpenRTM_aist.Manager.instance())
36  self.addOutPort("out",OpenRTM_aist.OutPort("out", RTC.TimedLong(RTC.Time(0,0),0)))
37 
38 
40  def __init__(self):
41  OpenRTM_aist.DataFlowComponentBase.__init__(self, OpenRTM_aist.Manager.instance())
42  self.addInPort("in",OpenRTM_aist.InPort("in", RTC.TimedLong(RTC.Time(0,0),0)))
43 
44 
45 class TestPortBase(unittest.TestCase):
46  def setUp(self):
47  OpenRTM_aist.Manager.init(sys.argv)
48  OpenRTM_aist.Manager.instance().activateManager()
49  self._pb = PortBase()
50  return
51 
52  def tearDown(self):
53  OpenRTM_aist.Manager.instance().shutdownManager()
54  return
55 
57  self._pb.setName("test_connect")
58  prof = self._pb.get_port_profile()
59  self.assertEqual(prof.name,"test_connect")
60  return
61 
63  self._pb.setName("test_connect")
64  prof = self._pb.getPortProfile()
65  self.assertEqual(prof.name,"test_connect")
66 
67 
69  OpenRTM_aist.Manager.init(sys.argv)
70  nvlist = [OpenRTM_aist.NVUtil.newNV("dataport.interface_type","corba_cdr"),
71  OpenRTM_aist.NVUtil.newNV("dataport.dataflow_type","push"),
72  OpenRTM_aist.NVUtil.newNV("dataport.subscription_type","flush")]
73 
74  outp = OutPortObj()
75  outp = outp.get_ports()
76 
77  inp = InPortObj()
78  inp = inp.get_ports()
79  prof = RTC.ConnectorProfile("connector0",
80  "connectorid_0",
81  [outp[0]._narrow(RTC.PortService),inp[0]._narrow(RTC.PortService)],
82  nvlist)
83  ret, prof = self._pb.connect(prof)
84  cntlist = outp[0].get_connector_profiles()
85  self.assertEqual(cntlist[0].name,"connector0")
86  cntprofile = outp[0].get_connector_profile("connectorid_0")
87  self.assertEqual(cntprofile.connector_id,"connectorid_0")
88  #ret,prof = self._pb.notify_connect(prof)
89  return
90 
91 
92 
94  nvlist = [OpenRTM_aist.NVUtil.newNV("dataport.interface_type","corba_cdr"),
95  OpenRTM_aist.NVUtil.newNV("dataport.dataflow_type","push"),
96  OpenRTM_aist.NVUtil.newNV("dataport.subscription_type","flush")]
97 
98  outp = OutPortObj().get_ports()
99  inp = InPortObj().get_ports()
100  prof = RTC.ConnectorProfile("connector0","connector_id1",[inp[0],outp[0]],nvlist)
101  self._pb.connect(prof)
102  cntprofile = outp[0].get_connector_profile("connector_id1")
103  self.assertEqual(cntprofile.connector_id,"connector_id1")
104  outp[0].disconnect("connector_id1")
105  cntprofile = outp[0].get_connector_profile("connector_id1")
106  self.assertNotEqual(cntprofile.connector_id,"connector_id1")
107  self._pb.connect(prof)
108  outp[0].disconnect_all()
109  self.assertNotEqual(cntprofile.connector_id,"connector_id1")
110  return
111 
112 
113 
114  def test_getProfile(self):
115  outp = OutPortObj().get_ports()
116  self._pb.setName("test")
117  self._pb.setPortRef(outp[0])
118  rtobj = RTObjMock(OpenRTM_aist.Manager.instance())
119  self._pb.setOwner(rtobj)
120  prof = self._pb.getProfile()
121  self.assertEqual(prof.name,".test")
122  self.assertEqual(prof.port_ref,outp[0])
123  self.assertEqual(prof.port_ref,outp[0])
124  self.assertEqual(prof.port_ref,self._pb.getPortRef())
125  return
126 
127 
128  def test_getUUID(self):
129  self._pb.getUUID()
130  return
131 
132 
134  nvlist = [OpenRTM_aist.NVUtil.newNV("dataport.interface_type","corba_cdr"),
135  OpenRTM_aist.NVUtil.newNV("dataport.dataflow_type","push"),
136  OpenRTM_aist.NVUtil.newNV("dataport.subscription_type","flush")]
137 
138  outp = OutPortObj().get_ports()
139  inp = InPortObj().get_ports()
140  prof = RTC.ConnectorProfile("connector0","connector_id1",[inp[0],outp[0]],nvlist)
141  self._pb.updateConnectorProfile(prof)
142  cntlist = self._pb.get_connector_profiles()
143  self.assertEqual(cntlist[0].name,"connector0")
144  return
145 
146 
147 
149  nvlist = [OpenRTM_aist.NVUtil.newNV("dataport.interface_type","corba_cdr"),
150  OpenRTM_aist.NVUtil.newNV("dataport.dataflow_type","push"),
151  OpenRTM_aist.NVUtil.newNV("dataport.subscription_type","flush")]
152 
153  outp = OutPortObj().get_ports()
154  inp = InPortObj().get_ports()
155  prof = RTC.ConnectorProfile("connector0","connector_id1",[inp[0],outp[0]],nvlist)
156  self._pb.connect(prof)
157  #self.assertEqual(self._pb.eraseConnectorProfile("connector_id1"),True)
158  return
159 
160 
161 
163  self.assertEqual(self._pb.appendInterface("name", "type", RTC.PROVIDED), True)
164  piprof = RTC.PortInterfaceProfile("name","type",RTC.PROVIDED)
165  self._pb._profile=RTC.PortProfile("name",[piprof],RTC.PortService._nil,[],RTC.RTObject._nil,[])
166  self.assertEqual(self._pb.appendInterface("name", "type", RTC.PROVIDED), False)
167  self.assertEqual(self._pb.deleteInterface("name", RTC.PROVIDED), True)
168  self.assertEqual(self._pb.deleteInterface("name", RTC.PROVIDED), False)
169  return
170 
171 
173  self._pb.addProperty("name1","val1")
174  prof = self._pb.getPortProfile()
175  self.assertEqual(prof.properties[0].name, "name1")
176  self._pb.appendProperty("name2","val2")
177  prof = self._pb.getPortProfile()
178  self.assertEqual(prof.properties[1].name, "name2")
179 
180 
181 
182 if __name__ == '__main__':
183  unittest.main()
def __init_(self, manager)
def test_get_connector_profiles(self)
def addOutPort(self, name, outport)
Definition: RTObject.py:2765
def addInPort(self, name, inport)
Definition: RTObject.py:2721
InPort template class.
Definition: InPort.py:58
def newNV(name, value)
Create NameVale.
Definition: NVUtil.py:50


openrtm_aist_python
Author(s): Shinji Kurihara
autogenerated on Mon Feb 28 2022 23:01:07