Functions | |
| def | leak_check |
| memory leak check | |
| def | mem_rss |
| def | print_file_and_cons |
| file and console out | |
Variables | |
| list | compo0 = ns.rtc_handles["ConsoleIn0.rtc"] |
| list | compo1 = ns.rtc_handles["ConsoleOut0.rtc"] |
| tuple | conprof = RTC.ConnectorProfile("connector0", "123", [consin_ports[0],consout_ports[0]], [SDOPackage.NameValue("dataport.interface_type",any.to_any("corba_cdr")),SDOPackage.NameValue("dataport.dataflow_type",any.to_any("push")),SDOPackage.NameValue("dataport.subscription_type",any.to_any("flush"))]) |
| tuple | conprof2 = RTC.ConnectorProfile("connector0", "123", [consout_ports[0], consin_ports[0]], [SDOPackage.NameValue("dataport.interface_type",any.to_any("corba_cdr")),SDOPackage.NameValue("dataport.dataflow_type",any.to_any("pull")),SDOPackage.NameValue("dataport.subscription_type",any.to_any("flush"))]) |
| tuple | consin_ports = compo0.rtc_ref.get_ports() |
| tuple | consout_ports = compo1.rtc_ref.get_ports() |
| tuple | data = RTC.TimedLong(RTC.Time(0,0),12345) |
| tuple | data0 = cdrMarshal(any.to_any(data).typecode(), data, 1) |
| tuple | env = RtmEnv(sys.argv, ["localhost:9898"]) |
| string | fodat = "=== " |
| tuple | fout = open(test_case + ".log", 'w') |
| tuple | inportcdr = inportobj._narrow(OpenRTM.InPortCdr) |
| tuple | inportobj = env.orb.string_to_object(ior) |
| tuple | ior = any.from_any(conprof0.properties[4].value, keep_structs=True) |
| tuple | ior10 = any.from_any(conprof10.properties[4].value, keep_structs=True) |
| list | list0 = env.name_space["localhost:9898"] |
| int | loop_cnt = 1000 |
| list | ns = env.name_space['localhost:9898'] |
| tuple | outportcdr = outportobj._narrow(OpenRTM.OutPortCdr) |
| tuple | outportobj = env.orb.string_to_object(ior10) |
| tuple | ret1 = inportcdr.put(data0) |
| list | ret12 = consin_ports[0] |
| list | ret2 = consin_ports[0] |
| tuple | rss0 = mem_rss() |
| tuple | rss1 = mem_rss() |
| tuple | rssEnd = mem_rss() |
| string | test_case = "DataPortTest" |
| file out setting | |
| def DataPortTest.leak_check | ( | rss_start, | |
| rss_end | |||
| ) |
memory leak check
Definition at line 58 of file DataPortTest.py.
| def DataPortTest.mem_rss | ( | ) |
Definition at line 37 of file DataPortTest.py.
| def DataPortTest.print_file_and_cons | ( | out_data, | |
out_flag = 0 |
|||
| ) |
file and console out
Definition at line 42 of file DataPortTest.py.
| list DataPortTest::compo0 = ns.rtc_handles["ConsoleIn0.rtc"] |
Definition at line 31 of file DataPortTest.py.
| list DataPortTest::compo1 = ns.rtc_handles["ConsoleOut0.rtc"] |
Definition at line 32 of file DataPortTest.py.
| tuple DataPortTest::conprof = RTC.ConnectorProfile("connector0", "123", [consin_ports[0],consout_ports[0]], [SDOPackage.NameValue("dataport.interface_type",any.to_any("corba_cdr")),SDOPackage.NameValue("dataport.dataflow_type",any.to_any("push")),SDOPackage.NameValue("dataport.subscription_type",any.to_any("flush"))]) |
-------------------------------------------------------------------- dataflow_type: push
Definition at line 83 of file DataPortTest.py.
| tuple DataPortTest::conprof2 = RTC.ConnectorProfile("connector0", "123", [consout_ports[0], consin_ports[0]], [SDOPackage.NameValue("dataport.interface_type",any.to_any("corba_cdr")),SDOPackage.NameValue("dataport.dataflow_type",any.to_any("pull")),SDOPackage.NameValue("dataport.subscription_type",any.to_any("flush"))]) |
-------------------------------------------------------------------- Connector Porfile: corba_cdr, push, new <<< In -> Out conprof = RTC.ConnectorProfile("connector0", "123", [consin_ports[0],consout_ports[0]], [SDOPackage.NameValue("dataport.interface_type",any.to_any("corba_cdr")),SDOPackage.NameValue("dataport.dataflow_type",any.to_any("push")),SDOPackage.NameValue("dataport.subscription_type",any.to_any("new"))])-------------------------------------------------------------------- Connector Porfile: corba_cdr, push, periodic <<< In -> Out conprof = RTC.ConnectorProfile("connector0", "123", [consin_ports[0],consout_ports[0]], [SDOPackage.NameValue("dataport.interface_type",any.to_any("corba_cdr")),SDOPackage.NameValue("dataport.dataflow_type",any.to_any("push")),SDOPackage.NameValue("dataport.subscription_type",any.to_any("periodic"))])-------------------------------------------------------------------- print "ConnectorProfile=\n",conprof-------------------------------------------------------------------- dataflow_type: pull Connector Porfile: corba_cdr, pull, flush <<< Out -> In
Definition at line 108 of file DataPortTest.py.
| tuple DataPortTest::consin_ports = compo0.rtc_ref.get_ports() |
Definition at line 73 of file DataPortTest.py.
| tuple DataPortTest::consout_ports = compo1.rtc_ref.get_ports() |
Definition at line 75 of file DataPortTest.py.
| tuple DataPortTest::data = RTC.TimedLong(RTC.Time(0,0),12345) |
Definition at line 140 of file DataPortTest.py.
| tuple DataPortTest::data0 = cdrMarshal(any.to_any(data).typecode(), data, 1) |
Definition at line 141 of file DataPortTest.py.
| tuple DataPortTest::env = RtmEnv(sys.argv, ["localhost:9898"]) |
Definition at line 23 of file DataPortTest.py.
| string DataPortTest::fodat = "=== " |
-----------------------------------------------------------------------------
----------------------------------------------------------------------------- ec2[0].deactivate_component(compo1.rtc_ref) ec1[0].deactivate_component(compo0.rtc_ref)
Definition at line 70 of file DataPortTest.py.
| tuple DataPortTest::fout = open(test_case + ".log", 'w') |
Definition at line 68 of file DataPortTest.py.
| tuple DataPortTest::inportcdr = inportobj._narrow(OpenRTM.InPortCdr) |
Definition at line 139 of file DataPortTest.py.
| tuple DataPortTest::inportobj = env.orb.string_to_object(ior) |
Definition at line 138 of file DataPortTest.py.
| tuple DataPortTest::ior = any.from_any(conprof0.properties[4].value, keep_structs=True) |
Definition at line 136 of file DataPortTest.py.
| tuple DataPortTest::ior10 = any.from_any(conprof10.properties[4].value, keep_structs=True) |
Definition at line 190 of file DataPortTest.py.
| list DataPortTest::list0 = env.name_space["localhost:9898"] |
Definition at line 24 of file DataPortTest.py.
| int DataPortTest::loop_cnt = 1000 |
Definition at line 78 of file DataPortTest.py.
| list DataPortTest::ns = env.name_space['localhost:9898'] |
Definition at line 29 of file DataPortTest.py.
| tuple DataPortTest::outportcdr = outportobj._narrow(OpenRTM.OutPortCdr) |
Definition at line 194 of file DataPortTest.py.
| tuple DataPortTest::outportobj = env.orb.string_to_object(ior10) |
Definition at line 193 of file DataPortTest.py.
| tuple DataPortTest::ret1 = inportcdr.put(data0) |
Definition at line 142 of file DataPortTest.py.
| list DataPortTest::ret12 = consin_ports[0] |
Definition at line 206 of file DataPortTest.py.
| list DataPortTest::ret2 = consin_ports[0] |
Definition at line 152 of file DataPortTest.py.
| tuple DataPortTest::rss0 = mem_rss() |
Definition at line 158 of file DataPortTest.py.
| tuple DataPortTest::rss1 = mem_rss() |
Definition at line 161 of file DataPortTest.py.
| tuple DataPortTest::rssEnd = mem_rss() |
Definition at line 167 of file DataPortTest.py.
| string DataPortTest::test_case = "DataPortTest" |
file out setting
Definition at line 67 of file DataPortTest.py.