Functions | |
def | leak_check |
memory leak check | |
def | mem_rss |
def | print_file_and_cons |
file and console out | |
Variables | |
list | compo0 = ns.rtc_handles["ConsoleIn0.rtc"] |
list | compo1 = ns.rtc_handles["ConsoleOut0.rtc"] |
tuple | conprof = RTC.ConnectorProfile("connector0", "123", [consin_ports[0],consout_ports[0]], [SDOPackage.NameValue("dataport.interface_type",any.to_any("corba_cdr")),SDOPackage.NameValue("dataport.dataflow_type",any.to_any("push")),SDOPackage.NameValue("dataport.subscription_type",any.to_any("flush"))]) |
tuple | conprof2 = RTC.ConnectorProfile("connector0", "123", [consout_ports[0], consin_ports[0]], [SDOPackage.NameValue("dataport.interface_type",any.to_any("corba_cdr")),SDOPackage.NameValue("dataport.dataflow_type",any.to_any("pull")),SDOPackage.NameValue("dataport.subscription_type",any.to_any("flush"))]) |
tuple | consin_ports = compo0.rtc_ref.get_ports() |
tuple | consout_ports = compo1.rtc_ref.get_ports() |
tuple | data = RTC.TimedLong(RTC.Time(0,0),12345) |
tuple | data0 = cdrMarshal(any.to_any(data).typecode(), data, 1) |
tuple | env = RtmEnv(sys.argv, ["localhost:9898"]) |
string | fodat = "=== " |
tuple | fout = open(test_case + ".log", 'w') |
tuple | inportcdr = inportobj._narrow(OpenRTM.InPortCdr) |
tuple | inportobj = env.orb.string_to_object(ior) |
tuple | ior = any.from_any(conprof0.properties[4].value, keep_structs=True) |
tuple | ior10 = any.from_any(conprof10.properties[4].value, keep_structs=True) |
list | list0 = env.name_space["localhost:9898"] |
int | loop_cnt = 1000 |
list | ns = env.name_space['localhost:9898'] |
tuple | outportcdr = outportobj._narrow(OpenRTM.OutPortCdr) |
tuple | outportobj = env.orb.string_to_object(ior10) |
tuple | ret1 = inportcdr.put(data0) |
list | ret12 = consin_ports[0] |
list | ret2 = consin_ports[0] |
tuple | rss0 = mem_rss() |
tuple | rss1 = mem_rss() |
tuple | rssEnd = mem_rss() |
string | test_case = "DataPortTest" |
file out setting |
def DataPortTest.leak_check | ( | rss_start, | |
rss_end | |||
) |
memory leak check
Definition at line 58 of file DataPortTest.py.
def DataPortTest.mem_rss | ( | ) |
Definition at line 37 of file DataPortTest.py.
def DataPortTest.print_file_and_cons | ( | out_data, | |
out_flag = 0 |
|||
) |
file and console out
Definition at line 42 of file DataPortTest.py.
list DataPortTest::compo0 = ns.rtc_handles["ConsoleIn0.rtc"] |
Definition at line 31 of file DataPortTest.py.
list DataPortTest::compo1 = ns.rtc_handles["ConsoleOut0.rtc"] |
Definition at line 32 of file DataPortTest.py.
tuple DataPortTest::conprof = RTC.ConnectorProfile("connector0", "123", [consin_ports[0],consout_ports[0]], [SDOPackage.NameValue("dataport.interface_type",any.to_any("corba_cdr")),SDOPackage.NameValue("dataport.dataflow_type",any.to_any("push")),SDOPackage.NameValue("dataport.subscription_type",any.to_any("flush"))]) |
-------------------------------------------------------------------- dataflow_type: push
Definition at line 83 of file DataPortTest.py.
tuple DataPortTest::conprof2 = RTC.ConnectorProfile("connector0", "123", [consout_ports[0], consin_ports[0]], [SDOPackage.NameValue("dataport.interface_type",any.to_any("corba_cdr")),SDOPackage.NameValue("dataport.dataflow_type",any.to_any("pull")),SDOPackage.NameValue("dataport.subscription_type",any.to_any("flush"))]) |
-------------------------------------------------------------------- Connector Porfile: corba_cdr, push, new <<< In -> Out conprof = RTC.ConnectorProfile("connector0", "123", [consin_ports[0],consout_ports[0]], [SDOPackage.NameValue("dataport.interface_type",any.to_any("corba_cdr")),SDOPackage.NameValue("dataport.dataflow_type",any.to_any("push")),SDOPackage.NameValue("dataport.subscription_type",any.to_any("new"))])-------------------------------------------------------------------- Connector Porfile: corba_cdr, push, periodic <<< In -> Out conprof = RTC.ConnectorProfile("connector0", "123", [consin_ports[0],consout_ports[0]], [SDOPackage.NameValue("dataport.interface_type",any.to_any("corba_cdr")),SDOPackage.NameValue("dataport.dataflow_type",any.to_any("push")),SDOPackage.NameValue("dataport.subscription_type",any.to_any("periodic"))])-------------------------------------------------------------------- print "ConnectorProfile=\n",conprof-------------------------------------------------------------------- dataflow_type: pull Connector Porfile: corba_cdr, pull, flush <<< Out -> In
Definition at line 108 of file DataPortTest.py.
tuple DataPortTest::consin_ports = compo0.rtc_ref.get_ports() |
Definition at line 73 of file DataPortTest.py.
tuple DataPortTest::consout_ports = compo1.rtc_ref.get_ports() |
Definition at line 75 of file DataPortTest.py.
tuple DataPortTest::data = RTC.TimedLong(RTC.Time(0,0),12345) |
Definition at line 140 of file DataPortTest.py.
tuple DataPortTest::data0 = cdrMarshal(any.to_any(data).typecode(), data, 1) |
Definition at line 141 of file DataPortTest.py.
tuple DataPortTest::env = RtmEnv(sys.argv, ["localhost:9898"]) |
Definition at line 23 of file DataPortTest.py.
string DataPortTest::fodat = "=== " |
-----------------------------------------------------------------------------
----------------------------------------------------------------------------- ec2[0].deactivate_component(compo1.rtc_ref) ec1[0].deactivate_component(compo0.rtc_ref)
Definition at line 70 of file DataPortTest.py.
tuple DataPortTest::fout = open(test_case + ".log", 'w') |
Definition at line 68 of file DataPortTest.py.
tuple DataPortTest::inportcdr = inportobj._narrow(OpenRTM.InPortCdr) |
Definition at line 139 of file DataPortTest.py.
tuple DataPortTest::inportobj = env.orb.string_to_object(ior) |
Definition at line 138 of file DataPortTest.py.
tuple DataPortTest::ior = any.from_any(conprof0.properties[4].value, keep_structs=True) |
Definition at line 136 of file DataPortTest.py.
tuple DataPortTest::ior10 = any.from_any(conprof10.properties[4].value, keep_structs=True) |
Definition at line 190 of file DataPortTest.py.
list DataPortTest::list0 = env.name_space["localhost:9898"] |
Definition at line 24 of file DataPortTest.py.
int DataPortTest::loop_cnt = 1000 |
Definition at line 78 of file DataPortTest.py.
list DataPortTest::ns = env.name_space['localhost:9898'] |
Definition at line 29 of file DataPortTest.py.
tuple DataPortTest::outportcdr = outportobj._narrow(OpenRTM.OutPortCdr) |
Definition at line 194 of file DataPortTest.py.
tuple DataPortTest::outportobj = env.orb.string_to_object(ior10) |
Definition at line 193 of file DataPortTest.py.
tuple DataPortTest::ret1 = inportcdr.put(data0) |
Definition at line 142 of file DataPortTest.py.
list DataPortTest::ret12 = consin_ports[0] |
Definition at line 206 of file DataPortTest.py.
list DataPortTest::ret2 = consin_ports[0] |
Definition at line 152 of file DataPortTest.py.
tuple DataPortTest::rss0 = mem_rss() |
Definition at line 158 of file DataPortTest.py.
tuple DataPortTest::rss1 = mem_rss() |
Definition at line 161 of file DataPortTest.py.
tuple DataPortTest::rssEnd = mem_rss() |
Definition at line 167 of file DataPortTest.py.
string DataPortTest::test_case = "DataPortTest" |
file out setting
Definition at line 67 of file DataPortTest.py.