test_InPortBase.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 # -*- coding: euc-jp -*-
3 
4 
5 # \file test_InPortBase.py
6 # \brief test for InPortBase class
7 # \date $Date: 2007/09/20 $
8 # \author Shinji Kurihara
9 #
10 # Copyright (C) 2003-2005
11 # Task-intelligence Research Group,
12 # Intelligent Systems Research Institute,
13 # National Institute of
14 # Advanced Industrial Science and Technology (AIST), Japan
15 # All rights reserved.
16 
17 
18 import sys
19 sys.path.insert(1,"../")
20 
21 from omniORB import CORBA, PortableServer
22 from omniORB import any
23 
24 import unittest
25 
26 from InPortBase import *
27 
28 import RTC, RTC__POA
29 import OpenRTM_aist
30 
31 
33  def __init__(self, name, data_type):
34  InPortBase.__init__(self, name, data_type)
35  return
36 
37  def getThebuffer(self):
38  return self._thebuffer
39 
40  def getProviderTypes(self):
41  return self._providerTypes
42 
43  def getConsumerTypes(self):
44  return self._consumerTypes
45 
46  def publishInterfaces_public(self, connector_profile):
47  return self.publishInterfaces(connector_profile)
48 
49 
50 
51 class TestInPortBase(unittest.TestCase):
52  def setUp(self):
53  #mgr=OpenRTM_aist.Manager.instance()
54  self._orb = CORBA.ORB_init(sys.argv)
55  self._poa = self._orb.resolve_initial_references("RootPOA")
56  self._poa._get_the_POAManager().activate()
57 
58  self._inport = InPortMock("in", OpenRTM_aist.toTypename(RTC.TimedLong(RTC.Time(0,0), 0)))
59  self._outport = OpenRTM_aist.OutPort("out",RTC.TimedLong(RTC.Time(0,0), 0))
60  profile = self._outport.getPortProfile()
61  prop = OpenRTM_aist.NVUtil.toProperties(profile.properties)
62  self._outport.init(prop)
63 
64  self._nvlist = [OpenRTM_aist.NVUtil.newNV("dataport.interface_type","corba_cdr"),
65  OpenRTM_aist.NVUtil.newNV("dataport.dataflow_type","push"),
66  OpenRTM_aist.NVUtil.newNV("dataport.subscription_type","flush")]
67 
68  import time
69  time.sleep(0.05)
70  return
71 
72 
73  def tearDown(self):
74  OpenRTM_aist.Manager.instance().shutdownManager()
75  return
76 
77  def test_properties(self):
78  self.assertEqual(isinstance(self._inport.properties(),OpenRTM_aist.Properties),True)
79  return
80 
81  def test_init(self):
82  self.assertEqual(self._inport.getThebuffer(),None)
83  profile = self._inport.getPortProfile()
84  prop = OpenRTM_aist.NVUtil.toProperties(profile.properties)
85 
86  self.assertEqual(prop.getProperty("dataport.dataflow_type"),"")
87  self.assertEqual(prop.getProperty("dataport.interface_type"),"")
88 
89  pstr = self._inport.getProviderTypes()
90  self.assertEqual(len(pstr),0)
91  cstr = self._inport.getConsumerTypes()
92  self.assertEqual(len(cstr),0)
93 
94  self._inport.init(prop)
95 
96  # self._singlebufferがTrueの場合self._thebufferが取得される self.assertNotEqual(self._inport.getThebuffer(),None) profile = self._inport.getPortProfile() prop = OpenRTM_aist.NVUtil.toProperties(profile.properties) # getPortProfileのpropertiesに以下が追加される val = any.from_any(prop.getProperty("dataport.dataflow_type"),keep_structs=True) self.assertEqual(val,"push, pull") val = any.from_any(prop.getProperty("dataport.interface_type"),keep_structs=True) self.assertEqual(val,"corba_cdr") # ProviderTypes,ConsumerTypesが取得される pstr = self._inport.getProviderTypes() self.assertEqual(pstr[0],"corba_cdr") cstr = self._inport.getConsumerTypes() self.assertEqual(cstr[0],"corba_cdr") return def test_activate_deactivateInterfaces(self): profile = self._inport.getPortProfile() prop = OpenRTM_aist.NVUtil.toProperties(profile.properties) self._inport.init(prop) prof = RTC.ConnectorProfile("InPortBaseTest0", "id0", [self._inport.get_port_profile().port_ref], self._nvlist) self._inport.publishInterfaces_public(prof) prof.connector_id = "id1" prof.name = "InPortBaseTest1" self._inport.publishInterfaces_public(prof) self._inport.activateInterfaces() self._inport.deactivateInterfaces() return def test_subscribe_unsubscribeInterfaces(self): profile = self._inport.getPortProfile() prop = OpenRTM_aist.NVUtil.toProperties(profile.properties) self._inport.init(prop) prof = RTC.ConnectorProfile("InPortBaseTest0", "id0", [self._inport.get_port_profile().port_ref,self._outport.get_port_profile().port_ref], self._nvlist) ret, con_prof = self._outport.connect(prof) self.assertEqual(self._inport.subscribeInterfaces(prof),RTC.RTC_OK) self._inport.unsubscribeInterfaces(prof) ret = self._outport.disconnect(prof.connector_id) return def test_getConnectorProfiles(self): profile = self._inport.getPortProfile() prop = OpenRTM_aist.NVUtil.toProperties(profile.properties) self._inport.init(prop) prof = RTC.ConnectorProfile("InPortBaseTest0", "id0", [self._inport.get_port_profile().port_ref,self._outport.get_port_profile().port_ref], self._nvlist) ret, con_prof = self._outport.connect(prof) cprofs = self._inport.getConnectorProfiles() self.assertEqual(len(cprofs),1) self.assertEqual(cprofs[0].name,"InPortBaseTest0") self.assertEqual(cprofs[0].id,"id0") ret = self._outport.disconnect(prof.connector_id) return def test_getConnectorIds(self): profile = self._inport.getPortProfile() prop = OpenRTM_aist.NVUtil.toProperties(profile.properties) self._inport.init(prop) prof = RTC.ConnectorProfile("InPortBaseTest0", "id0", [self._inport.get_port_profile().port_ref,self._outport.get_port_profile().port_ref], self._nvlist) ret, con_prof = self._outport.connect(prof) cids = self._inport.getConnectorIds() self.assertEqual(len(cids),1) self.assertEqual(cids[0],"id0") ret = self._outport.disconnect(prof.connector_id) return def test_getConnectorNames(self): profile = self._inport.getPortProfile() prop = OpenRTM_aist.NVUtil.toProperties(profile.properties) self._inport.init(prop) prof = RTC.ConnectorProfile("InPortBaseTest0", "id0", [self._inport.get_port_profile().port_ref,self._outport.get_port_profile().port_ref], self._nvlist) ret, con_prof = self._outport.connect(prof) cnames = self._inport.getConnectorNames() self.assertEqual(len(cnames),1) self.assertEqual(cnames[0],"InPortBaseTest0") ret = self._outport.disconnect(prof.connector_id) return def test_getConnectorById(self): profile = self._inport.getPortProfile() prop = OpenRTM_aist.NVUtil.toProperties(profile.properties) self._inport.init(prop) prof = RTC.ConnectorProfile("InPortBaseTest0", "id0", [self._inport.get_port_profile().port_ref,self._outport.get_port_profile().port_ref], self._nvlist) ret, con_prof = self._outport.connect(prof) con = self._inport.getConnectorById("id") self.assertEqual(con,None) con = self._inport.getConnectorById("id0") self.assertEqual(con.profile().name,"InPortBaseTest0") self.assertEqual(con.profile().id,"id0") ret = self._outport.disconnect(prof.connector_id) return def test_getConnectorByName(self): profile = self._inport.getPortProfile() prop = OpenRTM_aist.NVUtil.toProperties(profile.properties) self._inport.init(prop) prof = RTC.ConnectorProfile("InPortBaseTest0", "id0", [self._inport.get_port_profile().port_ref,self._outport.get_port_profile().port_ref], self._nvlist) ret, con_prof = self._outport.connect(prof) con = self._inport.getConnectorByName("test") self.assertEqual(con,None) con = self._inport.getConnectorByName("InPortBaseTest0") self.assertEqual(con.profile().name,"InPortBaseTest0") self.assertEqual(con.profile().id,"id0") ret = self._outport.disconnect(prof.connector_id) return def test_getConnectorProfileById(self): profile = self._inport.getPortProfile() prop = OpenRTM_aist.NVUtil.toProperties(profile.properties) self._inport.init(prop) prof = RTC.ConnectorProfile("InPortBaseTest0", "id0", [self._inport.get_port_profile().port_ref,self._outport.get_port_profile().port_ref], self._nvlist) ret, con_prof = self._outport.connect(prof) cprof = [None] ret = self._inport.getConnectorProfileById("test", cprof) self.assertEqual(ret,False) ret = self._inport.getConnectorProfileById("id0", cprof) self.assertEqual(ret,True) self.assertEqual(cprof[0].name,"InPortBaseTest0") self.assertEqual(cprof[0].id,"id0") ret = self._outport.disconnect(prof.connector_id) return def test_getConnectorProfileByName(self): profile = self._inport.getPortProfile() prop = OpenRTM_aist.NVUtil.toProperties(profile.properties) self._inport.init(prop) prof = RTC.ConnectorProfile("InPortBaseTest0", "id0", [self._inport.get_port_profile().port_ref,self._outport.get_port_profile().port_ref], self._nvlist) ret, con_prof = self._outport.connect(prof) cprof = [None] ret = self._inport.getConnectorProfileByName("test", cprof) self.assertEqual(ret,False) ret = self._inport.getConnectorProfileByName("InPortBaseTest0", cprof) self.assertEqual(ret,True) self.assertEqual(cprof[0].name,"InPortBaseTest0") self.assertEqual(cprof[0].id,"id0") ret = self._outport.disconnect(prof.connector_id) return ############### test ################# if __name__ == '__main__': unittest.main()
97  self.assertNotEqual(self._inport.getThebuffer(),None)
98  profile = self._inport.getPortProfile()
99  prop = OpenRTM_aist.NVUtil.toProperties(profile.properties)
100 
101  # getPortProfileのpropertiesに以下が追加される val = any.from_any(prop.getProperty("dataport.dataflow_type"),keep_structs=True) self.assertEqual(val,"push, pull") val = any.from_any(prop.getProperty("dataport.interface_type"),keep_structs=True) self.assertEqual(val,"corba_cdr") # ProviderTypes,ConsumerTypesが取得される pstr = self._inport.getProviderTypes() self.assertEqual(pstr[0],"corba_cdr") cstr = self._inport.getConsumerTypes() self.assertEqual(cstr[0],"corba_cdr") return def test_activate_deactivateInterfaces(self): profile = self._inport.getPortProfile() prop = OpenRTM_aist.NVUtil.toProperties(profile.properties) self._inport.init(prop) prof = RTC.ConnectorProfile("InPortBaseTest0", "id0", [self._inport.get_port_profile().port_ref], self._nvlist) self._inport.publishInterfaces_public(prof) prof.connector_id = "id1" prof.name = "InPortBaseTest1" self._inport.publishInterfaces_public(prof) self._inport.activateInterfaces() self._inport.deactivateInterfaces() return def test_subscribe_unsubscribeInterfaces(self): profile = self._inport.getPortProfile() prop = OpenRTM_aist.NVUtil.toProperties(profile.properties) self._inport.init(prop) prof = RTC.ConnectorProfile("InPortBaseTest0", "id0", [self._inport.get_port_profile().port_ref,self._outport.get_port_profile().port_ref], self._nvlist) ret, con_prof = self._outport.connect(prof) self.assertEqual(self._inport.subscribeInterfaces(prof),RTC.RTC_OK) self._inport.unsubscribeInterfaces(prof) ret = self._outport.disconnect(prof.connector_id) return def test_getConnectorProfiles(self): profile = self._inport.getPortProfile() prop = OpenRTM_aist.NVUtil.toProperties(profile.properties) self._inport.init(prop) prof = RTC.ConnectorProfile("InPortBaseTest0", "id0", [self._inport.get_port_profile().port_ref,self._outport.get_port_profile().port_ref], self._nvlist) ret, con_prof = self._outport.connect(prof) cprofs = self._inport.getConnectorProfiles() self.assertEqual(len(cprofs),1) self.assertEqual(cprofs[0].name,"InPortBaseTest0") self.assertEqual(cprofs[0].id,"id0") ret = self._outport.disconnect(prof.connector_id) return def test_getConnectorIds(self): profile = self._inport.getPortProfile() prop = OpenRTM_aist.NVUtil.toProperties(profile.properties) self._inport.init(prop) prof = RTC.ConnectorProfile("InPortBaseTest0", "id0", [self._inport.get_port_profile().port_ref,self._outport.get_port_profile().port_ref], self._nvlist) ret, con_prof = self._outport.connect(prof) cids = self._inport.getConnectorIds() self.assertEqual(len(cids),1) self.assertEqual(cids[0],"id0") ret = self._outport.disconnect(prof.connector_id) return def test_getConnectorNames(self): profile = self._inport.getPortProfile() prop = OpenRTM_aist.NVUtil.toProperties(profile.properties) self._inport.init(prop) prof = RTC.ConnectorProfile("InPortBaseTest0", "id0", [self._inport.get_port_profile().port_ref,self._outport.get_port_profile().port_ref], self._nvlist) ret, con_prof = self._outport.connect(prof) cnames = self._inport.getConnectorNames() self.assertEqual(len(cnames),1) self.assertEqual(cnames[0],"InPortBaseTest0") ret = self._outport.disconnect(prof.connector_id) return def test_getConnectorById(self): profile = self._inport.getPortProfile() prop = OpenRTM_aist.NVUtil.toProperties(profile.properties) self._inport.init(prop) prof = RTC.ConnectorProfile("InPortBaseTest0", "id0", [self._inport.get_port_profile().port_ref,self._outport.get_port_profile().port_ref], self._nvlist) ret, con_prof = self._outport.connect(prof) con = self._inport.getConnectorById("id") self.assertEqual(con,None) con = self._inport.getConnectorById("id0") self.assertEqual(con.profile().name,"InPortBaseTest0") self.assertEqual(con.profile().id,"id0") ret = self._outport.disconnect(prof.connector_id) return def test_getConnectorByName(self): profile = self._inport.getPortProfile() prop = OpenRTM_aist.NVUtil.toProperties(profile.properties) self._inport.init(prop) prof = RTC.ConnectorProfile("InPortBaseTest0", "id0", [self._inport.get_port_profile().port_ref,self._outport.get_port_profile().port_ref], self._nvlist) ret, con_prof = self._outport.connect(prof) con = self._inport.getConnectorByName("test") self.assertEqual(con,None) con = self._inport.getConnectorByName("InPortBaseTest0") self.assertEqual(con.profile().name,"InPortBaseTest0") self.assertEqual(con.profile().id,"id0") ret = self._outport.disconnect(prof.connector_id) return def test_getConnectorProfileById(self): profile = self._inport.getPortProfile() prop = OpenRTM_aist.NVUtil.toProperties(profile.properties) self._inport.init(prop) prof = RTC.ConnectorProfile("InPortBaseTest0", "id0", [self._inport.get_port_profile().port_ref,self._outport.get_port_profile().port_ref], self._nvlist) ret, con_prof = self._outport.connect(prof) cprof = [None] ret = self._inport.getConnectorProfileById("test", cprof) self.assertEqual(ret,False) ret = self._inport.getConnectorProfileById("id0", cprof) self.assertEqual(ret,True) self.assertEqual(cprof[0].name,"InPortBaseTest0") self.assertEqual(cprof[0].id,"id0") ret = self._outport.disconnect(prof.connector_id) return def test_getConnectorProfileByName(self): profile = self._inport.getPortProfile() prop = OpenRTM_aist.NVUtil.toProperties(profile.properties) self._inport.init(prop) prof = RTC.ConnectorProfile("InPortBaseTest0", "id0", [self._inport.get_port_profile().port_ref,self._outport.get_port_profile().port_ref], self._nvlist) ret, con_prof = self._outport.connect(prof) cprof = [None] ret = self._inport.getConnectorProfileByName("test", cprof) self.assertEqual(ret,False) ret = self._inport.getConnectorProfileByName("InPortBaseTest0", cprof) self.assertEqual(ret,True) self.assertEqual(cprof[0].name,"InPortBaseTest0") self.assertEqual(cprof[0].id,"id0") ret = self._outport.disconnect(prof.connector_id) return ############### test ################# if __name__ == '__main__': unittest.main()
102  val = any.from_any(prop.getProperty("dataport.dataflow_type"),keep_structs=True)
103  self.assertEqual(val,"push, pull")
104  val = any.from_any(prop.getProperty("dataport.interface_type"),keep_structs=True)
105  self.assertEqual(val,"corba_cdr")
106 
107  # ProviderTypes,ConsumerTypesが取得される pstr = self._inport.getProviderTypes() self.assertEqual(pstr[0],"corba_cdr") cstr = self._inport.getConsumerTypes() self.assertEqual(cstr[0],"corba_cdr") return def test_activate_deactivateInterfaces(self): profile = self._inport.getPortProfile() prop = OpenRTM_aist.NVUtil.toProperties(profile.properties) self._inport.init(prop) prof = RTC.ConnectorProfile("InPortBaseTest0", "id0", [self._inport.get_port_profile().port_ref], self._nvlist) self._inport.publishInterfaces_public(prof) prof.connector_id = "id1" prof.name = "InPortBaseTest1" self._inport.publishInterfaces_public(prof) self._inport.activateInterfaces() self._inport.deactivateInterfaces() return def test_subscribe_unsubscribeInterfaces(self): profile = self._inport.getPortProfile() prop = OpenRTM_aist.NVUtil.toProperties(profile.properties) self._inport.init(prop) prof = RTC.ConnectorProfile("InPortBaseTest0", "id0", [self._inport.get_port_profile().port_ref,self._outport.get_port_profile().port_ref], self._nvlist) ret, con_prof = self._outport.connect(prof) self.assertEqual(self._inport.subscribeInterfaces(prof),RTC.RTC_OK) self._inport.unsubscribeInterfaces(prof) ret = self._outport.disconnect(prof.connector_id) return def test_getConnectorProfiles(self): profile = self._inport.getPortProfile() prop = OpenRTM_aist.NVUtil.toProperties(profile.properties) self._inport.init(prop) prof = RTC.ConnectorProfile("InPortBaseTest0", "id0", [self._inport.get_port_profile().port_ref,self._outport.get_port_profile().port_ref], self._nvlist) ret, con_prof = self._outport.connect(prof) cprofs = self._inport.getConnectorProfiles() self.assertEqual(len(cprofs),1) self.assertEqual(cprofs[0].name,"InPortBaseTest0") self.assertEqual(cprofs[0].id,"id0") ret = self._outport.disconnect(prof.connector_id) return def test_getConnectorIds(self): profile = self._inport.getPortProfile() prop = OpenRTM_aist.NVUtil.toProperties(profile.properties) self._inport.init(prop) prof = RTC.ConnectorProfile("InPortBaseTest0", "id0", [self._inport.get_port_profile().port_ref,self._outport.get_port_profile().port_ref], self._nvlist) ret, con_prof = self._outport.connect(prof) cids = self._inport.getConnectorIds() self.assertEqual(len(cids),1) self.assertEqual(cids[0],"id0") ret = self._outport.disconnect(prof.connector_id) return def test_getConnectorNames(self): profile = self._inport.getPortProfile() prop = OpenRTM_aist.NVUtil.toProperties(profile.properties) self._inport.init(prop) prof = RTC.ConnectorProfile("InPortBaseTest0", "id0", [self._inport.get_port_profile().port_ref,self._outport.get_port_profile().port_ref], self._nvlist) ret, con_prof = self._outport.connect(prof) cnames = self._inport.getConnectorNames() self.assertEqual(len(cnames),1) self.assertEqual(cnames[0],"InPortBaseTest0") ret = self._outport.disconnect(prof.connector_id) return def test_getConnectorById(self): profile = self._inport.getPortProfile() prop = OpenRTM_aist.NVUtil.toProperties(profile.properties) self._inport.init(prop) prof = RTC.ConnectorProfile("InPortBaseTest0", "id0", [self._inport.get_port_profile().port_ref,self._outport.get_port_profile().port_ref], self._nvlist) ret, con_prof = self._outport.connect(prof) con = self._inport.getConnectorById("id") self.assertEqual(con,None) con = self._inport.getConnectorById("id0") self.assertEqual(con.profile().name,"InPortBaseTest0") self.assertEqual(con.profile().id,"id0") ret = self._outport.disconnect(prof.connector_id) return def test_getConnectorByName(self): profile = self._inport.getPortProfile() prop = OpenRTM_aist.NVUtil.toProperties(profile.properties) self._inport.init(prop) prof = RTC.ConnectorProfile("InPortBaseTest0", "id0", [self._inport.get_port_profile().port_ref,self._outport.get_port_profile().port_ref], self._nvlist) ret, con_prof = self._outport.connect(prof) con = self._inport.getConnectorByName("test") self.assertEqual(con,None) con = self._inport.getConnectorByName("InPortBaseTest0") self.assertEqual(con.profile().name,"InPortBaseTest0") self.assertEqual(con.profile().id,"id0") ret = self._outport.disconnect(prof.connector_id) return def test_getConnectorProfileById(self): profile = self._inport.getPortProfile() prop = OpenRTM_aist.NVUtil.toProperties(profile.properties) self._inport.init(prop) prof = RTC.ConnectorProfile("InPortBaseTest0", "id0", [self._inport.get_port_profile().port_ref,self._outport.get_port_profile().port_ref], self._nvlist) ret, con_prof = self._outport.connect(prof) cprof = [None] ret = self._inport.getConnectorProfileById("test", cprof) self.assertEqual(ret,False) ret = self._inport.getConnectorProfileById("id0", cprof) self.assertEqual(ret,True) self.assertEqual(cprof[0].name,"InPortBaseTest0") self.assertEqual(cprof[0].id,"id0") ret = self._outport.disconnect(prof.connector_id) return def test_getConnectorProfileByName(self): profile = self._inport.getPortProfile() prop = OpenRTM_aist.NVUtil.toProperties(profile.properties) self._inport.init(prop) prof = RTC.ConnectorProfile("InPortBaseTest0", "id0", [self._inport.get_port_profile().port_ref,self._outport.get_port_profile().port_ref], self._nvlist) ret, con_prof = self._outport.connect(prof) cprof = [None] ret = self._inport.getConnectorProfileByName("test", cprof) self.assertEqual(ret,False) ret = self._inport.getConnectorProfileByName("InPortBaseTest0", cprof) self.assertEqual(ret,True) self.assertEqual(cprof[0].name,"InPortBaseTest0") self.assertEqual(cprof[0].id,"id0") ret = self._outport.disconnect(prof.connector_id) return ############### test ################# if __name__ == '__main__': unittest.main()
108  pstr = self._inport.getProviderTypes()
109  self.assertEqual(pstr[0],"corba_cdr")
110  cstr = self._inport.getConsumerTypes()
111  self.assertEqual(cstr[0],"corba_cdr")
112 
113  return
114 
116  profile = self._inport.getPortProfile()
117  prop = OpenRTM_aist.NVUtil.toProperties(profile.properties)
118  self._inport.init(prop)
119  prof = RTC.ConnectorProfile("InPortBaseTest0",
120  "id0",
121  [self._inport.get_port_profile().port_ref],
122  self._nvlist)
123 
124  self._inport.publishInterfaces_public(prof)
125  prof.connector_id = "id1"
126  prof.name = "InPortBaseTest1"
127  self._inport.publishInterfaces_public(prof)
128 
129  self._inport.activateInterfaces()
130  self._inport.deactivateInterfaces()
131 
132 
133  return
134 
136  profile = self._inport.getPortProfile()
137  prop = OpenRTM_aist.NVUtil.toProperties(profile.properties)
138  self._inport.init(prop)
139  prof = RTC.ConnectorProfile("InPortBaseTest0",
140  "id0",
141  [self._inport.get_port_profile().port_ref,self._outport.get_port_profile().port_ref],
142  self._nvlist)
143  ret, con_prof = self._outport.connect(prof)
144  self.assertEqual(self._inport.subscribeInterfaces(prof),RTC.RTC_OK)
145  self._inport.unsubscribeInterfaces(prof)
146  ret = self._outport.disconnect(prof.connector_id)
147  return
148 
149 
151  profile = self._inport.getPortProfile()
152  prop = OpenRTM_aist.NVUtil.toProperties(profile.properties)
153  self._inport.init(prop)
154 
155  prof = RTC.ConnectorProfile("InPortBaseTest0",
156  "id0",
157  [self._inport.get_port_profile().port_ref,self._outport.get_port_profile().port_ref],
158  self._nvlist)
159  ret, con_prof = self._outport.connect(prof)
160 
161  cprofs = self._inport.getConnectorProfiles()
162  self.assertEqual(len(cprofs),1)
163 
164  self.assertEqual(cprofs[0].name,"InPortBaseTest0")
165  self.assertEqual(cprofs[0].id,"id0")
166 
167  ret = self._outport.disconnect(prof.connector_id)
168  return
169 
170 
172  profile = self._inport.getPortProfile()
173  prop = OpenRTM_aist.NVUtil.toProperties(profile.properties)
174  self._inport.init(prop)
175 
176  prof = RTC.ConnectorProfile("InPortBaseTest0",
177  "id0",
178  [self._inport.get_port_profile().port_ref,self._outport.get_port_profile().port_ref],
179  self._nvlist)
180  ret, con_prof = self._outport.connect(prof)
181 
182  cids = self._inport.getConnectorIds()
183  self.assertEqual(len(cids),1)
184 
185  self.assertEqual(cids[0],"id0")
186 
187  ret = self._outport.disconnect(prof.connector_id)
188  return
189 
190 
192  profile = self._inport.getPortProfile()
193  prop = OpenRTM_aist.NVUtil.toProperties(profile.properties)
194  self._inport.init(prop)
195 
196  prof = RTC.ConnectorProfile("InPortBaseTest0",
197  "id0",
198  [self._inport.get_port_profile().port_ref,self._outport.get_port_profile().port_ref],
199  self._nvlist)
200  ret, con_prof = self._outport.connect(prof)
201 
202  cnames = self._inport.getConnectorNames()
203  self.assertEqual(len(cnames),1)
204 
205  self.assertEqual(cnames[0],"InPortBaseTest0")
206 
207  ret = self._outport.disconnect(prof.connector_id)
208  return
209 
210 
212  profile = self._inport.getPortProfile()
213  prop = OpenRTM_aist.NVUtil.toProperties(profile.properties)
214  self._inport.init(prop)
215 
216  prof = RTC.ConnectorProfile("InPortBaseTest0",
217  "id0",
218  [self._inport.get_port_profile().port_ref,self._outport.get_port_profile().port_ref],
219  self._nvlist)
220  ret, con_prof = self._outport.connect(prof)
221 
222  con = self._inport.getConnectorById("id")
223  self.assertEqual(con,None)
224 
225  con = self._inport.getConnectorById("id0")
226  self.assertEqual(con.profile().name,"InPortBaseTest0")
227  self.assertEqual(con.profile().id,"id0")
228 
229  ret = self._outport.disconnect(prof.connector_id)
230  return
231 
232 
234  profile = self._inport.getPortProfile()
235  prop = OpenRTM_aist.NVUtil.toProperties(profile.properties)
236  self._inport.init(prop)
237 
238  prof = RTC.ConnectorProfile("InPortBaseTest0",
239  "id0",
240  [self._inport.get_port_profile().port_ref,self._outport.get_port_profile().port_ref],
241  self._nvlist)
242  ret, con_prof = self._outport.connect(prof)
243 
244  con = self._inport.getConnectorByName("test")
245  self.assertEqual(con,None)
246 
247  con = self._inport.getConnectorByName("InPortBaseTest0")
248 
249  self.assertEqual(con.profile().name,"InPortBaseTest0")
250  self.assertEqual(con.profile().id,"id0")
251 
252  ret = self._outport.disconnect(prof.connector_id)
253  return
254 
255 
257  profile = self._inport.getPortProfile()
258  prop = OpenRTM_aist.NVUtil.toProperties(profile.properties)
259  self._inport.init(prop)
260 
261  prof = RTC.ConnectorProfile("InPortBaseTest0",
262  "id0",
263  [self._inport.get_port_profile().port_ref,self._outport.get_port_profile().port_ref],
264  self._nvlist)
265  ret, con_prof = self._outport.connect(prof)
266 
267  cprof = [None]
268  ret = self._inport.getConnectorProfileById("test", cprof)
269  self.assertEqual(ret,False)
270 
271  ret = self._inport.getConnectorProfileById("id0", cprof)
272  self.assertEqual(ret,True)
273 
274  self.assertEqual(cprof[0].name,"InPortBaseTest0")
275  self.assertEqual(cprof[0].id,"id0")
276 
277  ret = self._outport.disconnect(prof.connector_id)
278  return
279 
280 
282  profile = self._inport.getPortProfile()
283  prop = OpenRTM_aist.NVUtil.toProperties(profile.properties)
284  self._inport.init(prop)
285 
286  prof = RTC.ConnectorProfile("InPortBaseTest0",
287  "id0",
288  [self._inport.get_port_profile().port_ref,self._outport.get_port_profile().port_ref],
289  self._nvlist)
290  ret, con_prof = self._outport.connect(prof)
291 
292  cprof = [None]
293  ret = self._inport.getConnectorProfileByName("test", cprof)
294  self.assertEqual(ret,False)
295 
296  ret = self._inport.getConnectorProfileByName("InPortBaseTest0", cprof)
297  self.assertEqual(ret,True)
298 
299  self.assertEqual(cprof[0].name,"InPortBaseTest0")
300  self.assertEqual(cprof[0].id,"id0")
301 
302  ret = self._outport.disconnect(prof.connector_id)
303  return
304 
305 
306 
307 
308 if __name__ == '__main__':
309  unittest.main()
310 
def __init__(self, name, data_type)
The Properties class represents a persistent set of properties.
Definition: Properties.py:83
def toProperties(nv)
coil::Properties toProperties(const SDOPackage::NVList& nv);
Definition: NVUtil.py:158
def newNV(name, value)
Create NameVale.
Definition: NVUtil.py:50
def publishInterfaces_public(self, connector_profile)


openrtm_aist_python
Author(s): Shinji Kurihara
autogenerated on Mon Feb 28 2022 23:01:07