Go to the documentation of this file.00001
00002
00003
00004
00005 import sys
00006
00007 from omniORB import CORBA
00008
00009 import RTC
00010 import OpenRTM_aist
00011
00012
00013 def main():
00014
00015
00016 subs_type = "Flush"
00017
00018
00019 orb = CORBA.ORB_init(sys.argv)
00020
00021
00022 naming = OpenRTM_aist.CorbaNaming(orb, "localhost")
00023
00024 sl = OpenRTM_aist.CorbaConsumer()
00025 tkm = OpenRTM_aist.CorbaConsumer()
00026
00027
00028 tkm.setObject(naming.resolve("TkMotorComp0.rtc"))
00029
00030
00031 inobj = tkm.getObject()._narrow(RTC.RTObject)
00032 pin = inobj.get_ports()
00033 pin[0].disconnect_all()
00034
00035
00036
00037 sl.setObject(naming.resolve("SliderComp0.rtc"))
00038
00039
00040 outobj = sl.getObject()._narrow(RTC.RTObject)
00041 pout = outobj.get_ports()
00042 pout[0].disconnect_all()
00043
00044
00045
00046 conprof = RTC.ConnectorProfile("connector0", "", [pin[0],pout[0]], [])
00047 OpenRTM_aist.CORBA_SeqUtil.push_back(conprof.properties,
00048 OpenRTM_aist.NVUtil.newNV("dataport.interface_type",
00049 "corba_cdr"))
00050
00051 OpenRTM_aist.CORBA_SeqUtil.push_back(conprof.properties,
00052 OpenRTM_aist.NVUtil.newNV("dataport.dataflow_type",
00053 "push"))
00054
00055 OpenRTM_aist.CORBA_SeqUtil.push_back(conprof.properties,
00056 OpenRTM_aist.NVUtil.newNV("dataport.subscription_type",
00057 subs_type))
00058
00059 ret = pin[0].connect(conprof)
00060
00061
00062 eclistin = inobj.get_owned_contexts()
00063 eclistin[0].activate_component(inobj)
00064
00065
00066 eclistout = outobj.get_owned_contexts()
00067 eclistout[0].activate_component(outobj)
00068
00069
00070
00071 if __name__ == "__main__":
00072 main()