7 from omniORB
import CORBA
8 from optparse
import OptionParser, OptionError
14 print(
"usage: ConnectorComp [options]")
16 print(
": Set subscription type Flush")
18 print(
": Set subscription type New")
19 print(
" --periodic [Hz] ")
20 print(
": Set subscription type Periodic \n")
22 print(
" ConnectorComp --flush")
23 print(
" ConnectorComp --new")
24 print(
" ConnectorComp --new --policy ALL")
25 print(
" ConnectorComp --new --policy SKIP --skip 100")
26 print(
" ConnectorComp --periodic 10")
27 print(
" ConnectorComp --periodic 10 --policy FIFO")
28 print(
" ConnectorComp --periodic 10 --policy NEW \n")
33 orb = CORBA.ORB_init(sys.argv)
42 conin.setObject(naming.resolve(
"ConsoleIn0.rtc"))
45 inobj = conin.getObject()._narrow(RTC.RTObject)
46 pin = inobj.get_ports()
47 pin[0].disconnect_all()
51 conout.setObject(naming.resolve(
"ConsoleOut0.rtc"))
54 outobj = conout.getObject()._narrow(RTC.RTObject)
55 pout = outobj.get_ports()
56 pout[0].disconnect_all()
65 for arg
in sys.argv[1:]:
73 push_policy = OpenRTM_aist.normalize([sys.argv[3]])
74 if push_policy ==
"skip":
75 skip_count = sys.argv[5]
78 elif arg ==
"--periodic":
79 subs_type =
"periodic"
82 push_policy = OpenRTM_aist.normalize([sys.argv[4]])
83 if push_policy ==
"skip":
84 skip_count = sys.argv[6]
94 print(
"Subscription Type: ", subs_type)
95 print(
"Period: ", period,
" [Hz]")
96 print(
"push policy: ", push_policy)
97 print(
"skip count: ", skip_count)
100 conprof = RTC.ConnectorProfile(
"connector0",
"", [pin[0],pout[0]], [])
113 if subs_type ==
"periodic":
120 if push_policy ==
"skip":
126 ret,conprof = pin[0].connect(conprof)
129 eclistin = inobj.get_owned_contexts()
130 eclistin[0].activate_component(inobj)
133 eclistout = outobj.get_owned_contexts()
134 eclistout[0].activate_component(outobj)
137 if __name__ ==
"__main__":