22 sys.path.insert(1,
"../")
26 from InPortCorbaConsumer
import *
29 from omniORB
import CORBA
31 import SDOPackage,SDOPackage__POA
32 from omniORB
import any
37 self.
orb = CORBA.ORB_init()
38 self.
poa = self.orb.resolve_initial_references(
"RootPOA")
39 poaManager = self.poa._get_the_POAManager()
44 print "put data: ", data
49 self.
orb = CORBA.ORB_init()
50 self.
poa = self.orb.resolve_initial_references(
"RootPOA")
51 OpenRTM_aist.RTObject_impl.__init__(self, orb=self.
orb, poa=self.
poa)
52 poaManager = self.poa._get_the_POAManager()
59 ringbuf.init(RTC.TimedLong(RTC.Time(0,0), 0))
61 RTC.TimedLong(RTC.Time(0,0), 0),
65 self.assertEqual(self._ipcc.equal_operator(self.
_ipcc), self.
_ipcc)
69 self._ipcc.put(RTC.TimedLong(RTC.Time(0,0), 123))
82 prop = [
newNV(
"dataport.dataflow_type",
"Push"),
83 newNV(
"dataport.corba_any.inport_ref",port)]
84 self.assertEqual(self._ipcc.subscribeInterface(prop),
True)
92 if __name__ ==
'__main__':
def test_subscribeInterface(self)
def unsubscribeInterface(self, properties)
def test_equal_operator(self)
def newNV(name, value)
Create NameVale.