18 from omniORB
import any
25 sys.path.insert(1,
"../")
29 from CorbaPort
import *
31 import _GlobalIDL, _GlobalIDL__POA
45 orb = CORBA.ORB_init(sys.argv)
46 poa = orb.resolve_initial_references(
"RootPOA")
47 poa._get_the_POAManager().activate()
57 OpenRTM_aist.Manager.instance().shutdownManager()
62 prop.setProperty(
"connection_limit",
"3")
63 self._cpSvc.init(prop)
65 self._cpSvc.init(prop)
70 self.assertEqual(self._cpSvc.registerProvider(
"myservice0",
"MyService", self.
_mysvc),
True)
71 self._cpSvc.activateInterfaces()
72 self._cpSvc.deactivateInterfaces()
76 self.assertEqual(self._cpCon.registerConsumer(
"myservice0",
"MyService", self.
_mycon),
True)
80 self.assertEqual(self._cpSvc.registerProvider(
"myservice0",
"MyService", self.
_mysvc),
True)
81 self._cpSvc.activateInterfaces()
82 self._cpSvc.deactivateInterfaces()
83 self._cpSvc.activateInterfaces()
84 self._cpSvc.deactivateInterfaces()
88 self.assertEqual(self._cpSvc.registerProvider(
"myservice0",
"MyService", self.
_mysvc),
True)
89 self._cpSvc.activateInterfaces()
90 self._cpSvc.deactivateInterfaces()
94 prof = RTC.ConnectorProfile(
"",
"",[],[])
95 self.assertEqual(self._cpSvc.publishInterfaces(prof),RTC.RTC_OK)
99 self.assertEqual(self._cpSvc.registerProvider(
"myservice0",
"MyService", self.
_mysvc),
True)
100 self.assertEqual(self._cpCon.registerConsumer(
"myservice0",
"MyService", self.
_mycon),
True)
101 prof = RTC.ConnectorProfile(
"",
"",
102 [self._cpSvc.getPortRef(),self._cpCon.getPortRef()],
104 self.assertEqual(self._cpSvc.subscribeInterfaces(prof),RTC.RTC_OK)
108 prof = RTC.ConnectorProfile(
"",
"",[],[])
109 self._cpSvc.unsubscribeInterfaces(prof)
112 self.assertEqual(self._cpSvc.registerProvider(
"myservice0",
"MyService", self.
_mysvc),
True)
113 consHolder = CorbaPort.CorbaConsumerHolder(
"myservice0",
"MyService",self.
_mycon,self.
_cpCon)
115 self.assertEqual(self._cpSvc.findProvider([],consHolder,ior),
False)
118 self.assertEqual(self._cpSvc.registerProvider(
"myservice0",
"MyService", self.
_mysvc),
True)
119 consHolder = CorbaPort.CorbaConsumerHolder(
"myservice0",
"MyService",self.
_mycon,self.
_cpCon)
121 self.assertEqual(self._cpSvc.findProviderOld([],consHolder,ior),
False)
126 if __name__ ==
'__main__':
def test_registerProvider(self)
def test_publishInterfaces(self)
def test_registerConsumer(self)
def test_activateInterfaces(self)
The Properties class represents a persistent set of properties.
def test_unsubscribeInterfaces(self)
def test_subscribeInterfaces(self)
def test_findProvider(self)
def test_findProviderOld(self)
def test_deactivateInterfaces(self)