00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017 import sys
00018 sys.path.insert(1,"../")
00019
00020
00021
00022 import unittest
00023 from OutPortBase import *
00024
00025 import OpenRTM_aist
00026 import RTC
00027
00028 counter = 0
00029
00030 class PublisherTest:
00031 def __init__(self):
00032 global counter
00033 self.counter = counter
00034 counter+=1
00035
00036 def update(self):
00037 print "update",self.counter
00038
00039 class ConnectorMock:
00040 def __init__(self, name="name", id="id", ports=None, properties=None):
00041 self._name = name
00042 self._id = id
00043 self._ports = ports
00044 self._properties = properties
00045 return
00046
00047 def profile(self):
00048 return OpenRTM_aist.ConnectorInfo(self._name,self._id,self._ports,self._properties)
00049
00050 def id(self):
00051 return self._id
00052
00053 def name(self):
00054 return self._name
00055
00056 def activate(self):
00057 pass
00058
00059 def deactivate(self):
00060 pass
00061
00062
00063 class ConsumerMock:
00064 def init(self,prop):
00065 return
00066
00067 class TestOutPortBase(unittest.TestCase):
00068 def setUp(self):
00069 OpenRTM_aist.Manager.init(sys.argv)
00070 self._opb = OutPortBase("test","TimedLong")
00071 self._opb.init(OpenRTM_aist.Properties())
00072 return
00073
00074 def tearDown(self):
00075 OpenRTM_aist.Manager.instance().shutdownManager()
00076 return
00077
00078 def test_getName(self):
00079 self.assertEqual(self._opb.getName(),"unknown.test")
00080 return
00081
00082 def test_connectors(self):
00083 self.assertEqual(self._opb.connectors(),[])
00084 return
00085
00086 def test_getConnectorProfiles(self):
00087 self.assertEqual(self._opb.getConnectorProfiles(),[])
00088 self._opb._connectors = [ConnectorMock()]
00089 cprof = self._opb.getConnectorProfiles()[0]
00090 self.assertEqual(cprof.name,"name")
00091 return
00092
00093 def test_getConnectorIds(self):
00094 self.assertEqual(self._opb.getConnectorIds(),[])
00095 self._opb._connectors = [ConnectorMock()]
00096 cprof = self._opb.getConnectorIds()[0]
00097 self.assertEqual(cprof,"id")
00098 return
00099
00100 def test_getConnectorProfileById(self):
00101 self.assertEqual(self._opb.getConnectorProfileById("id",[]),False)
00102 self._opb._connectors = [ConnectorMock()]
00103 prof = [None]
00104 self.assertEqual(self._opb.getConnectorProfileById("id",prof),True)
00105 self.assertEqual(prof[0].id,"id")
00106 return
00107
00108 def test_getConnectorProfileByName(self):
00109 self.assertEqual(self._opb.getConnectorProfileByName("name",[]),False)
00110 self._opb._connectors = [ConnectorMock()]
00111 prof = [None]
00112 self.assertEqual(self._opb.getConnectorProfileByName("name",prof),True)
00113 self.assertEqual(prof[0].name,"name")
00114 return
00115
00116 def test_activateInterfaces(self):
00117 self._opb._connectors = [ConnectorMock()]
00118 self._opb.activateInterfaces()
00119 self._opb.deactivateInterfaces()
00120 return
00121
00122 def test_publishInterfaces(self):
00123 cprof = RTC.ConnectorProfile("","",[],[])
00124 self.assertEqual(self._opb.publishInterfaces(cprof),RTC.BAD_PARAMETER)
00125 cprof = RTC.ConnectorProfile("connecto0","",[],[])
00126 OpenRTM_aist.CORBA_SeqUtil.push_back(cprof.properties,
00127 OpenRTM_aist.NVUtil.newNV("dataport.interface_type",
00128 "corba_cdr"))
00129
00130 OpenRTM_aist.CORBA_SeqUtil.push_back(cprof.properties,
00131 OpenRTM_aist.NVUtil.newNV("dataport.dataflow_type",
00132 "push"))
00133
00134 OpenRTM_aist.CORBA_SeqUtil.push_back(cprof.properties,
00135 OpenRTM_aist.NVUtil.newNV("dataport.subscription_type",
00136 "flush"))
00137
00138 self.assertEqual(self._opb.publishInterfaces(cprof),RTC.RTC_OK)
00139 return
00140
00141 def test_subscribeInterfaces(self):
00142 cprof = RTC.ConnectorProfile("","",[],[])
00143 self.assertEqual(self._opb.subscribeInterfaces(cprof),RTC.BAD_PARAMETER)
00144 cprof = RTC.ConnectorProfile("connecto0","",[],[])
00145 OpenRTM_aist.CORBA_SeqUtil.push_back(cprof.properties,
00146 OpenRTM_aist.NVUtil.newNV("dataport.interface_type",
00147 "corba_cdr"))
00148
00149 OpenRTM_aist.CORBA_SeqUtil.push_back(cprof.properties,
00150 OpenRTM_aist.NVUtil.newNV("dataport.dataflow_type",
00151 "push"))
00152
00153 OpenRTM_aist.CORBA_SeqUtil.push_back(cprof.properties,
00154 OpenRTM_aist.NVUtil.newNV("dataport.subscription_type",
00155 "flush"))
00156
00157
00158 return
00159
00160
00161 def test_unsubscribeInterfaces(self):
00162 self._opb._connectors = [ConnectorMock()]
00163 cprof = RTC.ConnectorProfile("connecto0","",[],[])
00164 OpenRTM_aist.CORBA_SeqUtil.push_back(cprof.properties,
00165 OpenRTM_aist.NVUtil.newNV("dataport.interface_type",
00166 "corba_cdr"))
00167
00168 OpenRTM_aist.CORBA_SeqUtil.push_back(cprof.properties,
00169 OpenRTM_aist.NVUtil.newNV("dataport.dataflow_type",
00170 "push"))
00171
00172 OpenRTM_aist.CORBA_SeqUtil.push_back(cprof.properties,
00173 OpenRTM_aist.NVUtil.newNV("dataport.subscription_type",
00174 "flush"))
00175 self._opb.unsubscribeInterfaces(cprof)
00176 return
00177
00178 def test_initProviders(self):
00179 self._opb.initProviders()
00180 return
00181
00182 def test_initConsumers(self):
00183 self._opb.initConsumers()
00184 return
00185
00186 def test_createProvider(self):
00187 self._opb._connectors = [ConnectorMock()]
00188 cprof = RTC.ConnectorProfile("connecto0","",[],[])
00189 OpenRTM_aist.CORBA_SeqUtil.push_back(cprof.properties,
00190 OpenRTM_aist.NVUtil.newNV("dataport.interface_type",
00191 "corba_cdr"))
00192
00193 OpenRTM_aist.CORBA_SeqUtil.push_back(cprof.properties,
00194 OpenRTM_aist.NVUtil.newNV("dataport.dataflow_type",
00195 "push"))
00196
00197 OpenRTM_aist.CORBA_SeqUtil.push_back(cprof.properties,
00198 OpenRTM_aist.NVUtil.newNV("dataport.subscription_type",
00199 "flush"))
00200 prop = OpenRTM_aist.Properties()
00201 prop.setProperty("interface_type","corba_cdr")
00202 prop.setProperty("dataflow_type","pull")
00203 prop.setProperty("subscription_type","flush")
00204 self.assertNotEqual(self._opb.createProvider(cprof,prop),0)
00205
00206 return
00207
00208 def test_createConsumer(self):
00209 self._opb._connectors = [ConnectorMock()]
00210 cprof = RTC.ConnectorProfile("connecto0","",[],[])
00211 OpenRTM_aist.CORBA_SeqUtil.push_back(cprof.properties,
00212 OpenRTM_aist.NVUtil.newNV("dataport.interface_type",
00213 "corba_cdr"))
00214
00215 OpenRTM_aist.CORBA_SeqUtil.push_back(cprof.properties,
00216 OpenRTM_aist.NVUtil.newNV("dataport.dataflow_type",
00217 "push"))
00218
00219 OpenRTM_aist.CORBA_SeqUtil.push_back(cprof.properties,
00220 OpenRTM_aist.NVUtil.newNV("dataport.subscription_type",
00221 "flush"))
00222 prop = OpenRTM_aist.Properties()
00223 prop.setProperty("interface_type","corba_cdr")
00224 prop.setProperty("dataflow_type","push")
00225 prop.setProperty("subscription_type","flush")
00226 self.assertEqual(self._opb.createConsumer(cprof,prop),0)
00227
00228 return
00229
00230 def test_createConnector(self):
00231 self._opb._connectors = [ConnectorMock()]
00232 cprof = RTC.ConnectorProfile("connecto0","",[],[])
00233 OpenRTM_aist.CORBA_SeqUtil.push_back(cprof.properties,
00234 OpenRTM_aist.NVUtil.newNV("dataport.interface_type",
00235 "corba_cdr"))
00236
00237 OpenRTM_aist.CORBA_SeqUtil.push_back(cprof.properties,
00238 OpenRTM_aist.NVUtil.newNV("dataport.dataflow_type",
00239 "push"))
00240
00241 OpenRTM_aist.CORBA_SeqUtil.push_back(cprof.properties,
00242 OpenRTM_aist.NVUtil.newNV("dataport.subscription_type",
00243 "flush"))
00244 prop = OpenRTM_aist.Properties()
00245 prop.setProperty("interface_type","corba_cdr")
00246 prop.setProperty("dataflow_type","push")
00247 prop.setProperty("subscription_type","flush")
00248 self.assertNotEqual(self._opb.createConnector(cprof,prop,consumer_=ConsumerMock()),0)
00249
00250 return
00251
00252 def test_getConnectorById(self):
00253 self.assertEqual(self._opb.getConnectorById("test"),0)
00254 self._opb._connectors = [ConnectorMock()]
00255 prof = [None]
00256 conn = self._opb.getConnectorById("id")
00257 self.assertEqual(conn.id(),"id")
00258 return
00259
00260
00261 def test_getConnectorByName(self):
00262 self.assertEqual(self._opb.getConnectorByName("test"),0)
00263 self._opb._connectors = [ConnectorMock()]
00264 prof = [None]
00265 conn = self._opb.getConnectorByName("name")
00266 self.assertEqual(conn.name(),"name")
00267 return
00268
00269
00270
00271 if __name__ == '__main__':
00272 unittest.main()