7 from omniORB
import CORBA
8 from optparse
import OptionParser, OptionError
14 print "usage: ConnectorComp [options]" 16 print ": Set subscription type Flush" 18 print ": Set subscription type New" 19 print " --periodic [Hz] " 20 print ": Set subscription type Periodic \n" 22 print " ConnectorComp --flush" 23 print " ConnectorComp --new" 24 print " ConnectorComp --new --policy ALL" 25 print " ConnectorComp --new --policy SKIP --skip 100" 26 print " ConnectorComp --periodic 10" 27 print " ConnectorComp --periodic 10 --policy FIFO" 28 print " ConnectorComp --periodic 10 --policy NEW \n" 33 orb = CORBA.ORB_init(sys.argv)
42 conin.setObject(naming.resolve(
"ConsoleIn0.rtc"))
45 inobj = conin.getObject()._narrow(RTC.RTObject)
46 pin = inobj.get_ports()
47 pin[0].disconnect_all()
51 conout.setObject(naming.resolve(
"ConsoleOut0.rtc"))
54 outobj = conout.getObject()._narrow(RTC.RTObject)
55 pout = outobj.get_ports()
56 pout[0].disconnect_all()
65 for arg
in sys.argv[1:]:
73 push_policy = OpenRTM_aist.normalize([sys.argv[3]])
74 if push_policy ==
"skip":
75 skip_count = sys.argv[5]
78 elif arg ==
"--periodic":
79 subs_type =
"periodic" 82 push_policy = OpenRTM_aist.normalize([sys.argv[4]])
83 if push_policy ==
"skip":
84 skip_count = sys.argv[6]
94 print "Subscription Type: ", subs_type
95 print "Period: ", period,
" [Hz]" 96 print "push policy: ", push_policy
97 print "skip count: ", skip_count
100 conprof = RTC.ConnectorProfile(
"connector0",
"", [pin[0],pout[0]], [])
113 if subs_type ==
"periodic":
120 if push_policy ==
"skip":
126 ret,conprof = pin[0].connect(conprof)
129 eclistin = inobj.get_owned_contexts()
130 eclistin[0].activate_component(inobj)
133 eclistout = outobj.get_owned_contexts()
134 eclistout[0].activate_component(outobj)
137 if __name__ ==
"__main__":
def push_back(seq, elem)
Push the new element back to the CORBA sequence.
CORBA Naming Service helper class.
def newNV(name, value)
Create NameVale.