18 from omniORB
import any
19 from omniORB
import CORBA
23 import SDOPackage, SDOPackage__POA
26 sys.path.insert(1,
"../")
30 from OutPortCorbaConsumer
import *
34 self.
orb = CORBA.ORB_init()
35 self.
poa = self.orb.resolve_initial_references(
"RootPOA")
36 poaManager = self.poa._get_the_POAManager()
41 print "Called get operation." 42 return any.to_any(RTC.TimedLong(RTC.Time(0,0),123))
49 ringbuf.init(RTC.TimedLong(RTC.Time(0,0),0))
51 RTC.TimedLong(RTC.Time(0,0),0),
59 self.assertEqual(self._opcc.get(data),
True)
60 self.assertEqual(data[0].data,123)
69 prop = [SDOPackage.NameValue(
"dataport.dataflow_type",
"Push"),
70 SDOPackage.NameValue(
"dataport.corba_any.outport_ref",port)]
71 self.assertEqual(self._opcc.subscribeInterface(prop),
True)
72 self._opcc.unsubscribeInterface(prop)
76 if __name__ ==
'__main__':
def test_subscribeInterface(self)