Go to the documentation of this file.00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018 import sys
00019 sys.path.insert(1,"../")
00020
00021 import unittest
00022
00023 from InPortCorbaCdrConsumer import *
00024
00025 import RTC, RTC__POA
00026 import OpenRTM_aist
00027
00028
00029 class TestInPortCorbaCdrConsumer(unittest.TestCase):
00030 def setUp(self):
00031 InPortCorbaCdrConsumerInit()
00032 self._cons = OpenRTM_aist.InPortConsumerFactory.instance().createObject("corba_cdr")
00033 self._inp = OpenRTM_aist.InPort("in",RTC.TimedLong(RTC.Time(0,0),0))
00034 self._orb = OpenRTM_aist.Manager.instance().getORB()
00035 return
00036
00037 def test_init(self):
00038 self._cons.init(OpenRTM_aist.Properties())
00039 return
00040
00041 def test_put(self):
00042 self.assertEqual(self._cons.put(123),OpenRTM_aist.DataPortStatus.CONNECTION_LOST)
00043 return
00044
00045 def test_publishInterfaceProfile(self):
00046 self._cons.publishInterfaceProfile(OpenRTM_aist.Properties())
00047 return
00048
00049 def test_subscribeInterface(self):
00050 ior = self._orb.object_to_string(self._inp.get_port_profile().port_ref)
00051 self.assertEqual(self._cons.subscribeInterface([OpenRTM_aist.NVUtil.newNV("dataport.corba_cdr.inport_ior",ior)]),True)
00052 self.assertEqual(self._cons.subscribeInterface([OpenRTM_aist.NVUtil.newNV("dataport.corba_cdr.inport_ref",
00053 self._inp.get_port_profile().port_ref)]),True)
00054 return
00055
00056 def test_unsubscribeInterface(self):
00057 ior = self._orb.object_to_string(self._inp.get_port_profile().port_ref)
00058 self.assertEqual(self._cons.subscribeInterface([OpenRTM_aist.NVUtil.newNV("dataport.corba_cdr.inport_ior",ior)]),True)
00059 self._cons.unsubscribeInterface([OpenRTM_aist.NVUtil.newNV("dataport.corba_cdr.inport_ior",ior)])
00060 self.assertEqual(self._cons.subscribeInterface([OpenRTM_aist.NVUtil.newNV("dataport.corba_cdr.inport_ref",
00061 self._inp.get_port_profile().port_ref)]),True)
00062 self._cons.unsubscribeInterface([OpenRTM_aist.NVUtil.newNV("dataport.corba_cdr.inport_ref",
00063 self._inp.get_port_profile().port_ref)])
00064 return
00065
00066 def test_subscribeFromIor(self):
00067 ior = self._orb.object_to_string(self._inp.get_port_profile().port_ref)
00068 self.assertEqual(self._cons.subscribeFromIor([OpenRTM_aist.NVUtil.newNV("dataport.corba_cdr.inport_ior",ior)]),True)
00069 return
00070
00071 def test_subscribeFromRef(self):
00072 self.assertEqual(self._cons.subscribeFromRef([OpenRTM_aist.NVUtil.newNV("dataport.corba_cdr.inport_ref",
00073 self._inp.get_port_profile().port_ref)]),True)
00074 return
00075
00076 def test_unsubscribeFromIor(self):
00077 ior = self._orb.object_to_string(self._inp.get_port_profile().port_ref)
00078 self.assertEqual(self._cons.subscribeFromIor([OpenRTM_aist.NVUtil.newNV("dataport.corba_cdr.inport_ior",ior)]),True)
00079 self.assertEqual(self._cons.unsubscribeFromIor([OpenRTM_aist.NVUtil.newNV("dataport.corba_cdr.inport_ior",ior)]),True)
00080 return
00081
00082 def test_unsubscribeFromRef(self):
00083 self.assertEqual(self._cons.subscribeFromRef([OpenRTM_aist.NVUtil.newNV("dataport.corba_cdr.inport_ref",
00084 self._inp.get_port_profile().port_ref)]),True)
00085 self.assertEqual(self._cons.unsubscribeFromRef([OpenRTM_aist.NVUtil.newNV("dataport.corba_cdr.inport_ref",
00086 self._inp.get_port_profile().port_ref)]),True)
00087 return
00088
00089
00090 if __name__ == '__main__':
00091 unittest.main()
00092