9 from rtc_handle
import *
10 from BasicDataType_idl
import *
17 g_test_name =
"<< component connection test >>" 25 env =
RtmEnv(sys.argv, [
"localhost:2809"])
26 list0 = env.name_space[
"localhost:2809"].list_obj()
27 env.name_space[
'localhost:2809'].rtc_handles.keys()
28 ns = env.name_space[
'localhost:2809']
30 g_compo_send = ns.rtc_handles[
"AutoTestOut0.rtc"]
31 g_compo_recv = ns.rtc_handles[
"AutoTestIn0.rtc"]
33 ec_send = g_compo_send.rtc_ref.get_owned_contexts()
34 ec_recv = g_compo_recv.rtc_ref.get_owned_contexts()
36 g_out_ports = g_compo_send.rtc_ref.get_ports()
37 g_in_ports = g_compo_recv.rtc_ref.get_ports()
47 g_interface_type1 =
"corba_cdr" 48 g_dataflow_type =
"push" 49 g_subscription_type =
"flush" 68 g_connector_id1 =
"001" 69 g_data_type1 =
"TimedFloat" 71 g_conprof1 = RTC.ConnectorProfile(g_name1, g_connector_id1, [g_out_ports[g_port1], g_in_ports[g_port1]], [SDOPackage.NameValue(
"dataport.data_type",any.to_any(g_data_type1)),SDOPackage.NameValue(
"dataport.interface_type",any.to_any(g_interface_type1)),SDOPackage.NameValue(
"dataport.dataflow_type",any.to_any(g_dataflow_type)),SDOPackage.NameValue(
"dataport.subscription_type",any.to_any(g_subscription_type)),SDOPackage.NameValue(
"dataport.publisher.push_policy",any.to_any(g_push_policy)),SDOPackage.NameValue(
"dataport.publisher.push_rate",any.to_any(g_push_rate)),SDOPackage.NameValue(
"dataport.publisher.skip_count",any.to_any(g_skip_count))])
75 g_connector_id2 =
"002" 76 g_data_type2 =
"TimedFloatSeq" 78 g_conprof2 = RTC.ConnectorProfile(g_name2, g_connector_id2, [g_out_ports[g_port2], g_in_ports[g_port2]], [SDOPackage.NameValue(
"dataport.data_type",any.to_any(g_data_type2)),SDOPackage.NameValue(
"dataport.interface_type",any.to_any(g_interface_type1)),SDOPackage.NameValue(
"dataport.dataflow_type",any.to_any(g_dataflow_type)),SDOPackage.NameValue(
"dataport.subscription_type",any.to_any(g_subscription_type)),SDOPackage.NameValue(
"dataport.publisher.push_policy",any.to_any(g_push_policy)),SDOPackage.NameValue(
"dataport.publisher.push_rate",any.to_any(g_push_rate)),SDOPackage.NameValue(
"dataport.publisher.skip_count",any.to_any(g_skip_count))])
82 g_connector_id3 =
"003" 83 g_interface_type3 =
"MyService" 85 g_conprof3 = RTC.ConnectorProfile(g_name3, g_connector_id3, [g_out_ports[g_port3], g_in_ports[g_port3]], [SDOPackage.NameValue(
"dataport.interface_type",any.to_any(g_interface_type3))])
89 g_diff_send_file =
"./original-data" 90 g_diff_recv_file =
"./received-data" 91 g_check_message = g_diff_recv_file +
" file not found." 92 g_test_result_file =
"./ResultTest.log" 96 g_test_ng =
"NG detected." 97 g_test_ng_message =
" < received-data >" 113 global g_conprof1, g_conprof2, g_conprof3
115 if connect_direction == 0:
117 g_conprof1 = RTC.ConnectorProfile(g_name1, g_connector_id1, [g_out_ports[g_port1], g_in_ports[g_port1]], [SDOPackage.NameValue(
"dataport.data_type",any.to_any(g_data_type1)),SDOPackage.NameValue(
"dataport.interface_type",any.to_any(g_interface_type1)),SDOPackage.NameValue(
"dataport.dataflow_type",any.to_any(g_dataflow_type)),SDOPackage.NameValue(
"dataport.subscription_type",any.to_any(subscription_type)),SDOPackage.NameValue(
"dataport.publisher.push_policy",any.to_any(push_policy)),SDOPackage.NameValue(
"dataport.publisher.push_rate",any.to_any(g_push_rate)),SDOPackage.NameValue(
"dataport.publisher.skip_count",any.to_any(g_skip_count))])
119 g_conprof2 = RTC.ConnectorProfile(g_name2, g_connector_id2, [g_out_ports[g_port2], g_in_ports[g_port2]], [SDOPackage.NameValue(
"dataport.data_type",any.to_any(g_data_type2)),SDOPackage.NameValue(
"dataport.interface_type",any.to_any(g_interface_type1)),SDOPackage.NameValue(
"dataport.dataflow_type",any.to_any(g_dataflow_type)),SDOPackage.NameValue(
"dataport.subscription_type",any.to_any(subscription_type)),SDOPackage.NameValue(
"dataport.publisher.push_policy",any.to_any(push_policy)),SDOPackage.NameValue(
"dataport.publisher.push_rate",any.to_any(g_push_rate)),SDOPackage.NameValue(
"dataport.publisher.skip_count",any.to_any(g_skip_count))])
126 g_conprof1 = RTC.ConnectorProfile(g_name1, g_connector_id1, [g_in_ports[g_port1], g_out_ports[g_port1]], [SDOPackage.NameValue(
"dataport.data_type",any.to_any(g_data_type1)),SDOPackage.NameValue(
"dataport.interface_type",any.to_any(g_interface_type1)),SDOPackage.NameValue(
"dataport.dataflow_type",any.to_any(g_dataflow_type)),SDOPackage.NameValue(
"dataport.subscription_type",any.to_any(subscription_type)),SDOPackage.NameValue(
"dataport.publisher.push_policy",any.to_any(push_policy)),SDOPackage.NameValue(
"dataport.publisher.push_rate",any.to_any(g_push_rate)),SDOPackage.NameValue(
"dataport.publisher.skip_count",any.to_any(g_skip_count))])
128 g_conprof2 = RTC.ConnectorProfile(g_name2, g_connector_id2, [g_in_ports[g_port2], g_out_ports[g_port2]], [SDOPackage.NameValue(
"dataport.data_type",any.to_any(g_data_type2)),SDOPackage.NameValue(
"dataport.interface_type",any.to_any(g_interface_type1)),SDOPackage.NameValue(
"dataport.dataflow_type",any.to_any(g_dataflow_type)),SDOPackage.NameValue(
"dataport.subscription_type",any.to_any(subscription_type)),SDOPackage.NameValue(
"dataport.publisher.push_policy",any.to_any(push_policy)),SDOPackage.NameValue(
"dataport.publisher.push_rate",any.to_any(g_push_rate)),SDOPackage.NameValue(
"dataport.publisher.skip_count",any.to_any(g_skip_count))])
144 if os.path.isfile(g_diff_recv_file) ==
True:
145 os.remove(g_diff_recv_file)