test_OutPortBase.py
Go to the documentation of this file.
00001 #!/usr/bin/env python
00002 # -*- Python -*-
00003 
00004 #
00005 #  \file test_OutPortBase.py
00006 #  \brief test for OutPortBase base class
00007 #  \date $Date: 2007/09/19 $
00008 #  \author Shinji Kurihara
00009 # 
00010 #  Copyright (C) 2003-2006
00011 #      Task-intelligence Research Group,
00012 #      Intelligent Systems Research Institute,
00013 #      National Institute of
00014 #          Advanced Industrial Science and Technology (AIST), Japan
00015 #      All rights reserved.
00016 
00017 import sys
00018 sys.path.insert(1,"../")
00019 
00020 #from omniORB import any
00021 
00022 import unittest
00023 from OutPortBase import *
00024 
00025 import OpenRTM_aist
00026 import RTC
00027 
00028 counter = 0
00029 
00030 class PublisherTest:
00031   def __init__(self):
00032     global counter
00033     self.counter = counter
00034     counter+=1
00035 
00036   def update(self):
00037     print "update",self.counter
00038     
00039 class ConnectorMock:
00040   def __init__(self, name="name", id="id", ports=None, properties=None):
00041     self._name       = name
00042     self._id         = id
00043     self._ports      = ports
00044     self._properties = properties
00045     return
00046 
00047   def profile(self):
00048     return OpenRTM_aist.ConnectorInfo(self._name,self._id,self._ports,self._properties)
00049 
00050   def id(self):
00051     return self._id
00052 
00053   def name(self):
00054     return self._name
00055 
00056   def activate(self):
00057     pass
00058 
00059   def deactivate(self):
00060     pass
00061 
00062 
00063 class ConsumerMock:
00064   def init(self,prop):
00065     return
00066 
00067 class TestOutPortBase(unittest.TestCase):
00068   def setUp(self):
00069     OpenRTM_aist.Manager.init(sys.argv)
00070     self._opb = OutPortBase("test","TimedLong")
00071     self._opb.init(OpenRTM_aist.Properties())
00072     return
00073 
00074   def tearDown(self):
00075     OpenRTM_aist.Manager.instance().shutdownManager()
00076     return
00077 
00078   def test_getName(self):
00079     self.assertEqual(self._opb.getName(),"unknown.test")
00080     return
00081 
00082   def test_connectors(self):
00083     self.assertEqual(self._opb.connectors(),[])
00084     return
00085 
00086   def test_getConnectorProfiles(self):
00087     self.assertEqual(self._opb.getConnectorProfiles(),[])
00088     self._opb._connectors = [ConnectorMock()]
00089     cprof = self._opb.getConnectorProfiles()[0]
00090     self.assertEqual(cprof.name,"name")
00091     return
00092 
00093   def test_getConnectorIds(self):
00094     self.assertEqual(self._opb.getConnectorIds(),[])
00095     self._opb._connectors = [ConnectorMock()]
00096     cprof = self._opb.getConnectorIds()[0]
00097     self.assertEqual(cprof,"id")
00098     return
00099     
00100   def test_getConnectorProfileById(self):
00101     self.assertEqual(self._opb.getConnectorProfileById("id",[]),False)
00102     self._opb._connectors = [ConnectorMock()]
00103     prof = [None]
00104     self.assertEqual(self._opb.getConnectorProfileById("id",prof),True)
00105     self.assertEqual(prof[0].id,"id")
00106     return
00107 
00108   def test_getConnectorProfileByName(self):
00109     self.assertEqual(self._opb.getConnectorProfileByName("name",[]),False)
00110     self._opb._connectors = [ConnectorMock()]
00111     prof = [None]
00112     self.assertEqual(self._opb.getConnectorProfileByName("name",prof),True)
00113     self.assertEqual(prof[0].name,"name")
00114     return
00115 
00116   def test_activateInterfaces(self):
00117     self._opb._connectors = [ConnectorMock()]
00118     self._opb.activateInterfaces()
00119     self._opb.deactivateInterfaces()
00120     return
00121 
00122   def test_publishInterfaces(self):
00123     cprof = RTC.ConnectorProfile("","",[],[])
00124     self.assertEqual(self._opb.publishInterfaces(cprof),RTC.BAD_PARAMETER)
00125     cprof = RTC.ConnectorProfile("connecto0","",[],[])
00126     OpenRTM_aist.CORBA_SeqUtil.push_back(cprof.properties,
00127                  OpenRTM_aist.NVUtil.newNV("dataport.interface_type",
00128                          "corba_cdr"))
00129 
00130     OpenRTM_aist.CORBA_SeqUtil.push_back(cprof.properties,
00131                  OpenRTM_aist.NVUtil.newNV("dataport.dataflow_type",
00132                          "push"))
00133 
00134     OpenRTM_aist.CORBA_SeqUtil.push_back(cprof.properties,
00135                  OpenRTM_aist.NVUtil.newNV("dataport.subscription_type",
00136                          "flush"))
00137 
00138     self.assertEqual(self._opb.publishInterfaces(cprof),RTC.RTC_OK)
00139     return
00140 
00141   def test_subscribeInterfaces(self):
00142     cprof = RTC.ConnectorProfile("","",[],[])
00143     self.assertEqual(self._opb.subscribeInterfaces(cprof),RTC.BAD_PARAMETER)
00144     cprof = RTC.ConnectorProfile("connecto0","",[],[])
00145     OpenRTM_aist.CORBA_SeqUtil.push_back(cprof.properties,
00146                  OpenRTM_aist.NVUtil.newNV("dataport.interface_type",
00147                          "corba_cdr"))
00148 
00149     OpenRTM_aist.CORBA_SeqUtil.push_back(cprof.properties,
00150                  OpenRTM_aist.NVUtil.newNV("dataport.dataflow_type",
00151                          "push"))
00152 
00153     OpenRTM_aist.CORBA_SeqUtil.push_back(cprof.properties,
00154                  OpenRTM_aist.NVUtil.newNV("dataport.subscription_type",
00155                          "flush"))
00156 
00157     #self.assertEqual(self._opb.subscribeInterfaces(cprof),RTC.RTC_OK)
00158     return
00159     
00160 
00161   def test_unsubscribeInterfaces(self):
00162     self._opb._connectors = [ConnectorMock()]
00163     cprof = RTC.ConnectorProfile("connecto0","",[],[])
00164     OpenRTM_aist.CORBA_SeqUtil.push_back(cprof.properties,
00165                  OpenRTM_aist.NVUtil.newNV("dataport.interface_type",
00166                          "corba_cdr"))
00167 
00168     OpenRTM_aist.CORBA_SeqUtil.push_back(cprof.properties,
00169                  OpenRTM_aist.NVUtil.newNV("dataport.dataflow_type",
00170                          "push"))
00171 
00172     OpenRTM_aist.CORBA_SeqUtil.push_back(cprof.properties,
00173                  OpenRTM_aist.NVUtil.newNV("dataport.subscription_type",
00174                          "flush"))
00175     self._opb.unsubscribeInterfaces(cprof)
00176     return
00177 
00178   def test_initProviders(self):
00179     self._opb.initProviders()
00180     return
00181 
00182   def test_initConsumers(self):
00183     self._opb.initConsumers()
00184     return
00185 
00186   def test_createProvider(self):
00187     self._opb._connectors = [ConnectorMock()]
00188     cprof = RTC.ConnectorProfile("connecto0","",[],[])
00189     OpenRTM_aist.CORBA_SeqUtil.push_back(cprof.properties,
00190                  OpenRTM_aist.NVUtil.newNV("dataport.interface_type",
00191                          "corba_cdr"))
00192 
00193     OpenRTM_aist.CORBA_SeqUtil.push_back(cprof.properties,
00194                  OpenRTM_aist.NVUtil.newNV("dataport.dataflow_type",
00195                          "push"))
00196 
00197     OpenRTM_aist.CORBA_SeqUtil.push_back(cprof.properties,
00198                  OpenRTM_aist.NVUtil.newNV("dataport.subscription_type",
00199                          "flush"))
00200     prop = OpenRTM_aist.Properties()
00201     prop.setProperty("interface_type","corba_cdr")
00202     prop.setProperty("dataflow_type","pull")
00203     prop.setProperty("subscription_type","flush")
00204     self.assertNotEqual(self._opb.createProvider(cprof,prop),0)
00205 
00206     return
00207 
00208   def test_createConsumer(self):
00209     self._opb._connectors = [ConnectorMock()]
00210     cprof = RTC.ConnectorProfile("connecto0","",[],[])
00211     OpenRTM_aist.CORBA_SeqUtil.push_back(cprof.properties,
00212                  OpenRTM_aist.NVUtil.newNV("dataport.interface_type",
00213                          "corba_cdr"))
00214 
00215     OpenRTM_aist.CORBA_SeqUtil.push_back(cprof.properties,
00216                  OpenRTM_aist.NVUtil.newNV("dataport.dataflow_type",
00217                          "push"))
00218 
00219     OpenRTM_aist.CORBA_SeqUtil.push_back(cprof.properties,
00220                  OpenRTM_aist.NVUtil.newNV("dataport.subscription_type",
00221                          "flush"))
00222     prop = OpenRTM_aist.Properties()
00223     prop.setProperty("interface_type","corba_cdr")
00224     prop.setProperty("dataflow_type","push")
00225     prop.setProperty("subscription_type","flush")
00226     self.assertEqual(self._opb.createConsumer(cprof,prop),0)
00227 
00228     return
00229 
00230   def test_createConnector(self):
00231     self._opb._connectors = [ConnectorMock()]
00232     cprof = RTC.ConnectorProfile("connecto0","",[],[])
00233     OpenRTM_aist.CORBA_SeqUtil.push_back(cprof.properties,
00234                  OpenRTM_aist.NVUtil.newNV("dataport.interface_type",
00235                          "corba_cdr"))
00236 
00237     OpenRTM_aist.CORBA_SeqUtil.push_back(cprof.properties,
00238                  OpenRTM_aist.NVUtil.newNV("dataport.dataflow_type",
00239                          "push"))
00240 
00241     OpenRTM_aist.CORBA_SeqUtil.push_back(cprof.properties,
00242                  OpenRTM_aist.NVUtil.newNV("dataport.subscription_type",
00243                          "flush"))
00244     prop = OpenRTM_aist.Properties()
00245     prop.setProperty("interface_type","corba_cdr")
00246     prop.setProperty("dataflow_type","push")
00247     prop.setProperty("subscription_type","flush")
00248     self.assertNotEqual(self._opb.createConnector(cprof,prop,consumer_=ConsumerMock()),0)
00249 
00250     return
00251 
00252   def test_getConnectorById(self):
00253     self.assertEqual(self._opb.getConnectorById("test"),0)
00254     self._opb._connectors = [ConnectorMock()]
00255     prof = [None]
00256     conn = self._opb.getConnectorById("id")
00257     self.assertEqual(conn.id(),"id")
00258     return
00259 
00260 
00261   def test_getConnectorByName(self):
00262     self.assertEqual(self._opb.getConnectorByName("test"),0)
00263     self._opb._connectors = [ConnectorMock()]
00264     prof = [None]
00265     conn = self._opb.getConnectorByName("name")
00266     self.assertEqual(conn.name(),"name")
00267     return
00268 
00269 
00270 ############### test #################
00271 if __name__ == '__main__':
00272   unittest.main()


openrtm_aist_python
Author(s): Shinji Kurihara
autogenerated on Thu Aug 27 2015 14:17:28