test_OutPortBase.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 # -*- Python -*-
3 
4 #
5 # \file test_OutPortBase.py
6 # \brief test for OutPortBase base class
7 # \date $Date: 2007/09/19 $
8 # \author Shinji Kurihara
9 #
10 # Copyright (C) 2003-2006
11 # Task-intelligence Research Group,
12 # Intelligent Systems Research Institute,
13 # National Institute of
14 # Advanced Industrial Science and Technology (AIST), Japan
15 # All rights reserved.
16 
17 import sys
18 sys.path.insert(1,"../")
19 
20 #from omniORB import any
21 
22 import unittest
23 from OutPortBase import *
24 
25 import OpenRTM_aist
26 import RTC
27 
28 counter = 0
29 
31  def __init__(self):
32  global counter
33  self.counter = counter
34  counter+=1
35 
36  def update(self):
37  print "update",self.counter
38 
40  def __init__(self, name="name", id="id", ports=None, properties=None):
41  self._name = name
42  self._id = id
43  self._ports = ports
44  self._properties = properties
45  return
46 
47  def profile(self):
48  return OpenRTM_aist.ConnectorInfo(self._name,self._id,self._ports,self._properties)
49 
50  def id(self):
51  return self._id
52 
53  def name(self):
54  return self._name
55 
56  def activate(self):
57  pass
58 
59  def deactivate(self):
60  pass
61 
62 
63 class ConsumerMock:
64  def init(self,prop):
65  return
66 
67 class TestOutPortBase(unittest.TestCase):
68  def setUp(self):
69  OpenRTM_aist.Manager.init(sys.argv)
70  self._opb = OutPortBase("test","TimedLong")
71  self._opb.init(OpenRTM_aist.Properties())
72  return
73 
74  def tearDown(self):
75  OpenRTM_aist.Manager.instance().shutdownManager()
76  return
77 
78  def test_getName(self):
79  self.assertEqual(self._opb.getName(),"unknown.test")
80  return
81 
82  def test_connectors(self):
83  self.assertEqual(self._opb.connectors(),[])
84  return
85 
87  self.assertEqual(self._opb.getConnectorProfiles(),[])
88  self._opb._connectors = [ConnectorMock()]
89  cprof = self._opb.getConnectorProfiles()[0]
90  self.assertEqual(cprof.name,"name")
91  return
92 
94  self.assertEqual(self._opb.getConnectorIds(),[])
95  self._opb._connectors = [ConnectorMock()]
96  cprof = self._opb.getConnectorIds()[0]
97  self.assertEqual(cprof,"id")
98  return
99 
101  self.assertEqual(self._opb.getConnectorProfileById("id",[]),False)
102  self._opb._connectors = [ConnectorMock()]
103  prof = [None]
104  self.assertEqual(self._opb.getConnectorProfileById("id",prof),True)
105  self.assertEqual(prof[0].id,"id")
106  return
107 
109  self.assertEqual(self._opb.getConnectorProfileByName("name",[]),False)
110  self._opb._connectors = [ConnectorMock()]
111  prof = [None]
112  self.assertEqual(self._opb.getConnectorProfileByName("name",prof),True)
113  self.assertEqual(prof[0].name,"name")
114  return
115 
117  self._opb._connectors = [ConnectorMock()]
118  self._opb.activateInterfaces()
119  self._opb.deactivateInterfaces()
120  return
121 
123  cprof = RTC.ConnectorProfile("","",[],[])
124  self.assertEqual(self._opb.publishInterfaces(cprof),RTC.BAD_PARAMETER)
125  cprof = RTC.ConnectorProfile("connecto0","",[],[])
126  OpenRTM_aist.CORBA_SeqUtil.push_back(cprof.properties,
127  OpenRTM_aist.NVUtil.newNV("dataport.interface_type",
128  "corba_cdr"))
129 
130  OpenRTM_aist.CORBA_SeqUtil.push_back(cprof.properties,
131  OpenRTM_aist.NVUtil.newNV("dataport.dataflow_type",
132  "push"))
133 
134  OpenRTM_aist.CORBA_SeqUtil.push_back(cprof.properties,
135  OpenRTM_aist.NVUtil.newNV("dataport.subscription_type",
136  "flush"))
137 
138  self.assertEqual(self._opb.publishInterfaces(cprof),RTC.RTC_OK)
139  return
140 
142  cprof = RTC.ConnectorProfile("","",[],[])
143  self.assertEqual(self._opb.subscribeInterfaces(cprof),RTC.BAD_PARAMETER)
144  cprof = RTC.ConnectorProfile("connecto0","",[],[])
145  OpenRTM_aist.CORBA_SeqUtil.push_back(cprof.properties,
146  OpenRTM_aist.NVUtil.newNV("dataport.interface_type",
147  "corba_cdr"))
148 
149  OpenRTM_aist.CORBA_SeqUtil.push_back(cprof.properties,
150  OpenRTM_aist.NVUtil.newNV("dataport.dataflow_type",
151  "push"))
152 
153  OpenRTM_aist.CORBA_SeqUtil.push_back(cprof.properties,
154  OpenRTM_aist.NVUtil.newNV("dataport.subscription_type",
155  "flush"))
156 
157  #self.assertEqual(self._opb.subscribeInterfaces(cprof),RTC.RTC_OK)
158  return
159 
160 
162  self._opb._connectors = [ConnectorMock()]
163  cprof = RTC.ConnectorProfile("connecto0","",[],[])
164  OpenRTM_aist.CORBA_SeqUtil.push_back(cprof.properties,
165  OpenRTM_aist.NVUtil.newNV("dataport.interface_type",
166  "corba_cdr"))
167 
168  OpenRTM_aist.CORBA_SeqUtil.push_back(cprof.properties,
169  OpenRTM_aist.NVUtil.newNV("dataport.dataflow_type",
170  "push"))
171 
172  OpenRTM_aist.CORBA_SeqUtil.push_back(cprof.properties,
173  OpenRTM_aist.NVUtil.newNV("dataport.subscription_type",
174  "flush"))
175  self._opb.unsubscribeInterfaces(cprof)
176  return
177 
179  self._opb.initProviders()
180  return
181 
183  self._opb.initConsumers()
184  return
185 
187  self._opb._connectors = [ConnectorMock()]
188  cprof = RTC.ConnectorProfile("connecto0","",[],[])
189  OpenRTM_aist.CORBA_SeqUtil.push_back(cprof.properties,
190  OpenRTM_aist.NVUtil.newNV("dataport.interface_type",
191  "corba_cdr"))
192 
193  OpenRTM_aist.CORBA_SeqUtil.push_back(cprof.properties,
194  OpenRTM_aist.NVUtil.newNV("dataport.dataflow_type",
195  "push"))
196 
197  OpenRTM_aist.CORBA_SeqUtil.push_back(cprof.properties,
198  OpenRTM_aist.NVUtil.newNV("dataport.subscription_type",
199  "flush"))
200  prop = OpenRTM_aist.Properties()
201  prop.setProperty("interface_type","corba_cdr")
202  prop.setProperty("dataflow_type","pull")
203  prop.setProperty("subscription_type","flush")
204  self.assertNotEqual(self._opb.createProvider(cprof,prop),0)
205 
206  return
207 
209  self._opb._connectors = [ConnectorMock()]
210  cprof = RTC.ConnectorProfile("connecto0","",[],[])
211  OpenRTM_aist.CORBA_SeqUtil.push_back(cprof.properties,
212  OpenRTM_aist.NVUtil.newNV("dataport.interface_type",
213  "corba_cdr"))
214 
215  OpenRTM_aist.CORBA_SeqUtil.push_back(cprof.properties,
216  OpenRTM_aist.NVUtil.newNV("dataport.dataflow_type",
217  "push"))
218 
219  OpenRTM_aist.CORBA_SeqUtil.push_back(cprof.properties,
220  OpenRTM_aist.NVUtil.newNV("dataport.subscription_type",
221  "flush"))
222  prop = OpenRTM_aist.Properties()
223  prop.setProperty("interface_type","corba_cdr")
224  prop.setProperty("dataflow_type","push")
225  prop.setProperty("subscription_type","flush")
226  self.assertEqual(self._opb.createConsumer(cprof,prop),0)
227 
228  return
229 
231  self._opb._connectors = [ConnectorMock()]
232  cprof = RTC.ConnectorProfile("connecto0","",[],[])
233  OpenRTM_aist.CORBA_SeqUtil.push_back(cprof.properties,
234  OpenRTM_aist.NVUtil.newNV("dataport.interface_type",
235  "corba_cdr"))
236 
237  OpenRTM_aist.CORBA_SeqUtil.push_back(cprof.properties,
238  OpenRTM_aist.NVUtil.newNV("dataport.dataflow_type",
239  "push"))
240 
241  OpenRTM_aist.CORBA_SeqUtil.push_back(cprof.properties,
242  OpenRTM_aist.NVUtil.newNV("dataport.subscription_type",
243  "flush"))
244  prop = OpenRTM_aist.Properties()
245  prop.setProperty("interface_type","corba_cdr")
246  prop.setProperty("dataflow_type","push")
247  prop.setProperty("subscription_type","flush")
248  self.assertNotEqual(self._opb.createConnector(cprof,prop,consumer_=ConsumerMock()),0)
249 
250  return
251 
253  self.assertEqual(self._opb.getConnectorById("test"),0)
254  self._opb._connectors = [ConnectorMock()]
255  prof = [None]
256  conn = self._opb.getConnectorById("id")
257  self.assertEqual(conn.id(),"id")
258  return
259 
260 
262  self.assertEqual(self._opb.getConnectorByName("test"),0)
263  self._opb._connectors = [ConnectorMock()]
264  prof = [None]
265  conn = self._opb.getConnectorByName("name")
266  self.assertEqual(conn.name(),"name")
267  return
268 
269 
270 
271 if __name__ == '__main__':
272  unittest.main()
def push_back(seq, elem)
Push the new element back to the CORBA sequence.
The Properties class represents a persistent set of properties.
Definition: Properties.py:83
def __init__(self, name="name", id="id", ports=None, properties=None)
def newNV(name, value)
Create NameVale.
Definition: NVUtil.py:50


openrtm_aist_python
Author(s): Shinji Kurihara
autogenerated on Mon Feb 28 2022 23:01:07