Functions | |
| def | delete_recv_file () |
内部関数:受信ファイル削除 More... | |
| def | diff_file () |
内部関数:送受信ファイルのデータ比較 More... | |
| def | make_connecter_profile (subscription_type, push_policy, connect_direction) |
内部関数:コネクタープロファイル設定(データポート) More... | |
Variables | |
| def | bret = diff_file() |
| 7 送受信データ比較 More... | |
| int | case_no = 0 |
テストケース番号の初期値設定 上から連番を振っている More... | |
| ec_recv = g_compo_recv.rtc_ref.get_owned_contexts() | |
| ec_send = g_compo_send.rtc_ref.get_owned_contexts() | |
| env = RtmEnv(sys.argv, ["localhost:2809"]) | |
| ネームサーバー定義 env = RtmEnv(sys.argv, ["localhost:2809"]) list0 = env.name_space["localhost:2809"].list_obj() env.name_space['localhost:2809'].rtc_handles.keys() ns = env.name_space['localhost:2809'] More... | |
| fin2 = open(g_diff_recv_file, 'r') | |
| fout = open(g_test_result_file, 'w') | |
| 6 ポート切断 More... | |
| string | g_check_message = g_diff_recv_file + " file not found." |
| g_compo_recv = ns.rtc_handles["AutoTestIn0.rtc"] | |
| g_compo_send = ns.rtc_handles["AutoTestOut0.rtc"] | |
| string | g_connector_id1 = "001" |
| string | g_connector_id2 = "002" |
| string | g_connector_id3 = "003" |
| g_conprof1 = RTC.ConnectorProfile(g_name1, g_connector_id1, [g_out_ports[g_port1], g_in_ports[g_port1]], [SDOPackage.NameValue("dataport.data_type",any.to_any(g_data_type1)),SDOPackage.NameValue("dataport.interface_type",any.to_any(g_interface_type1)),SDOPackage.NameValue("dataport.dataflow_type",any.to_any(g_dataflow_type)),SDOPackage.NameValue("dataport.subscription_type",any.to_any(g_subscription_type)),SDOPackage.NameValue("dataport.publisher.push_policy",any.to_any(g_push_policy)),SDOPackage.NameValue("dataport.publisher.push_rate",any.to_any(g_push_rate)),SDOPackage.NameValue("dataport.publisher.skip_count",any.to_any(g_skip_count))]) | |
| g_conprof2 = RTC.ConnectorProfile(g_name2, g_connector_id2, [g_out_ports[g_port2], g_in_ports[g_port2]], [SDOPackage.NameValue("dataport.data_type",any.to_any(g_data_type2)),SDOPackage.NameValue("dataport.interface_type",any.to_any(g_interface_type1)),SDOPackage.NameValue("dataport.dataflow_type",any.to_any(g_dataflow_type)),SDOPackage.NameValue("dataport.subscription_type",any.to_any(g_subscription_type)),SDOPackage.NameValue("dataport.publisher.push_policy",any.to_any(g_push_policy)),SDOPackage.NameValue("dataport.publisher.push_rate",any.to_any(g_push_rate)),SDOPackage.NameValue("dataport.publisher.skip_count",any.to_any(g_skip_count))]) | |
| g_conprof3 = RTC.ConnectorProfile(g_name3, g_connector_id3, [g_out_ports[g_port3], g_in_ports[g_port3]], [SDOPackage.NameValue("dataport.interface_type",any.to_any(g_interface_type3))]) | |
| string | g_data_type1 = "TimedFloat" |
| string | g_data_type2 = "TimedFloatSeq" |
| string | g_dataflow_type = "push" |
| string | g_diff_recv_file = "./received-data" |
| string | g_diff_send_file = "./original-data" |
送受信結果判定関連 More... | |
| g_in_ports = g_compo_recv.rtc_ref.get_ports() | |
| string | g_interface_type1 = "corba_cdr" |
コネクタープロファイルデフォルト定義 More... | |
| string | g_interface_type3 = "MyService" |
| string | g_mess_footer = " > " |
| string | g_mess_header = "< " |
| string | g_name1 = "out" |
| ConnectorProfile(name, connector_id, ports, properties) String name String connector_id RTC.PortService ports[] SDOPackage.NameValue properties[]. More... | |
| string | g_name2 = "seqout" |
| データポート TimedFloatSeq More... | |
| string | g_name3 = "MyService" |
| サービスポート More... | |
| g_out_ports = g_compo_send.rtc_ref.get_ports() | |
| int | g_port1 = 0 |
| ポート番号指定 ( get_ports()より ) More... | |
| int | g_port2 = 1 |
| int | g_port3 = 2 |
| string | g_push_policy = "NEW" |
| string | g_push_rate = "2000" |
| string | g_skip_count = "4" |
| string | g_subscription_type = "flush" |
| string | g_test_case = "case" |
| string | g_test_cnt = "count" |
| string | g_test_name = "<< component connection test >>" |
More... | |
| string | g_test_ng = "NG detected." |
| string | g_test_ng_message = " < received-data >" |
| string | g_test_ok = "OK." |
| string | g_test_result_file = "./ResultTest.log" |
| list0 = env.name_space["localhost:2809"].list_obj() | |
| int | loop_count = 3 |
| ケース毎のテスト回数 More... | |
| string | message = g_mess_header + g_test_case + str(case_no) + " " |
| ns = env.name_space['localhost:2809'] | |
| ret0 = g_out_ports[g_port1].connect(g_conprof1) | |
| 1 コネクタープロファイル設定 More... | |
| ret1 = g_out_ports[g_port2].connect(g_conprof2) | |
| ret2 = g_out_ports[g_port3].connect(g_conprof3) | |
| s2 = fin2.readline() | |
| int | sleep_act_time = 10 |
| activate_componentからdeactivate_componentまでのスリープ時間(秒数) More... | |
| int | sleep_connect_time = 2 |
| connectからdisconnectまでのスリープ時間(秒数) More... | |
| int | sleep_for_time = 2 |
| forループのスリープ時間(秒数) More... | |
| int | sleep_recv_act_time = 1 |
| 受信側activate_componentから送信側activate_componentまでのスリープ時間(秒数) More... | |
| def ConnectTest.delete_recv_file | ( | ) |
| def ConnectTest.diff_file | ( | ) |
内部関数:送受信ファイルのデータ比較
(引数) なし
Definition at line 156 of file ConnectTest.py.
| def ConnectTest.make_connecter_profile | ( | subscription_type, | |
| push_policy, | |||
| connect_direction | |||
| ) |
内部関数:コネクタープロファイル設定(データポート)
(引数) subscription_type : "flush", "new", "periodic" push_policy : "ALL", "FIFO", "SKIP", "NEW", ""
Definition at line 112 of file ConnectTest.py.
| def ConnectTest.bret = diff_file() |
7 送受信データ比較
Definition at line 377 of file ConnectTest.py.
| int ConnectTest.case_no = 0 |
テストケース番号の初期値設定 上から連番を振っている
●注意:Activateを先に行っている為、受信データは途中からの内容になります。
●注意:Activateを先に行っている為、受信データは途中からの内容になります。
●注意:Activateを先に行っている為、受信データは途中からの内容になります。
●注意:Activateを先に行っている為、受信データは途中からの内容になります。
6 ポート切断
Definition at line 210 of file ConnectTest.py.
| ConnectTest.ec_recv = g_compo_recv.rtc_ref.get_owned_contexts() |
Definition at line 34 of file ConnectTest.py.
| ConnectTest.ec_send = g_compo_send.rtc_ref.get_owned_contexts() |
Definition at line 33 of file ConnectTest.py.
| ConnectTest.env = RtmEnv(sys.argv, ["localhost:2809"]) |
ネームサーバー定義 env = RtmEnv(sys.argv, ["localhost:2809"]) list0 = env.name_space["localhost:2809"].list_obj() env.name_space['localhost:2809'].rtc_handles.keys() ns = env.name_space['localhost:2809']
Definition at line 25 of file ConnectTest.py.
| ConnectTest.fin2 = open(g_diff_recv_file, 'r') |
Definition at line 395 of file ConnectTest.py.
| ConnectTest.fout = open(g_test_result_file, 'w') |
6 ポート切断
5 ディアクティベート
4 アクティベート
1 コネクタープロファイル設定
差分ファイルからテスト結果出力
2 受信データファイル削除
テスト結果出力
4 アクティベート 5 ディアクティベート 受信ファイル有無判定
4 アクティベート 5 ディアクティベート テスト結果出力
5 ディアクティベート 6 ポート切断 受信ファイル有無判定
6 ポート切断 5 ディアクティベート 受信ファイル有無判定
6 ポート切断 受信ファイル有無判定
5 ディアクティベート 受信ファイル有無判定
Definition at line 228 of file ConnectTest.py.
| string ConnectTest.g_check_message = g_diff_recv_file + " file not found." |
Definition at line 91 of file ConnectTest.py.
| ConnectTest.g_compo_recv = ns.rtc_handles["AutoTestIn0.rtc"] |
Definition at line 31 of file ConnectTest.py.
| ConnectTest.g_compo_send = ns.rtc_handles["AutoTestOut0.rtc"] |
Definition at line 30 of file ConnectTest.py.
| string ConnectTest.g_connector_id1 = "001" |
Definition at line 68 of file ConnectTest.py.
| string ConnectTest.g_connector_id2 = "002" |
Definition at line 75 of file ConnectTest.py.
| string ConnectTest.g_connector_id3 = "003" |
Definition at line 82 of file ConnectTest.py.
| ConnectTest.g_conprof1 = RTC.ConnectorProfile(g_name1, g_connector_id1, [g_out_ports[g_port1], g_in_ports[g_port1]], [SDOPackage.NameValue("dataport.data_type",any.to_any(g_data_type1)),SDOPackage.NameValue("dataport.interface_type",any.to_any(g_interface_type1)),SDOPackage.NameValue("dataport.dataflow_type",any.to_any(g_dataflow_type)),SDOPackage.NameValue("dataport.subscription_type",any.to_any(g_subscription_type)),SDOPackage.NameValue("dataport.publisher.push_policy",any.to_any(g_push_policy)),SDOPackage.NameValue("dataport.publisher.push_rate",any.to_any(g_push_rate)),SDOPackage.NameValue("dataport.publisher.skip_count",any.to_any(g_skip_count))]) |
Definition at line 71 of file ConnectTest.py.
| ConnectTest.g_conprof2 = RTC.ConnectorProfile(g_name2, g_connector_id2, [g_out_ports[g_port2], g_in_ports[g_port2]], [SDOPackage.NameValue("dataport.data_type",any.to_any(g_data_type2)),SDOPackage.NameValue("dataport.interface_type",any.to_any(g_interface_type1)),SDOPackage.NameValue("dataport.dataflow_type",any.to_any(g_dataflow_type)),SDOPackage.NameValue("dataport.subscription_type",any.to_any(g_subscription_type)),SDOPackage.NameValue("dataport.publisher.push_policy",any.to_any(g_push_policy)),SDOPackage.NameValue("dataport.publisher.push_rate",any.to_any(g_push_rate)),SDOPackage.NameValue("dataport.publisher.skip_count",any.to_any(g_skip_count))]) |
Definition at line 78 of file ConnectTest.py.
| ConnectTest.g_conprof3 = RTC.ConnectorProfile(g_name3, g_connector_id3, [g_out_ports[g_port3], g_in_ports[g_port3]], [SDOPackage.NameValue("dataport.interface_type",any.to_any(g_interface_type3))]) |
Definition at line 85 of file ConnectTest.py.
| string ConnectTest.g_data_type1 = "TimedFloat" |
Definition at line 69 of file ConnectTest.py.
| string ConnectTest.g_data_type2 = "TimedFloatSeq" |
Definition at line 76 of file ConnectTest.py.
| string ConnectTest.g_dataflow_type = "push" |
Definition at line 48 of file ConnectTest.py.
| string ConnectTest.g_diff_recv_file = "./received-data" |
Definition at line 90 of file ConnectTest.py.
| string ConnectTest.g_diff_send_file = "./original-data" |
| ConnectTest.g_in_ports = g_compo_recv.rtc_ref.get_ports() |
Definition at line 37 of file ConnectTest.py.
| string ConnectTest.g_interface_type1 = "corba_cdr" |
| string ConnectTest.g_interface_type3 = "MyService" |
Definition at line 83 of file ConnectTest.py.
| string ConnectTest.g_mess_footer = " > " |
Definition at line 99 of file ConnectTest.py.
| string ConnectTest.g_mess_header = "< " |
Definition at line 98 of file ConnectTest.py.
| string ConnectTest.g_name1 = "out" |
ConnectorProfile(name, connector_id, ports, properties) String name String connector_id RTC.PortService ports[] SDOPackage.NameValue properties[].
データポート TimedFloat
Definition at line 67 of file ConnectTest.py.
| string ConnectTest.g_name2 = "seqout" |
データポート TimedFloatSeq
Definition at line 74 of file ConnectTest.py.
| string ConnectTest.g_name3 = "MyService" |
サービスポート
Definition at line 81 of file ConnectTest.py.
| ConnectTest.g_out_ports = g_compo_send.rtc_ref.get_ports() |
Definition at line 36 of file ConnectTest.py.
| int ConnectTest.g_port1 = 0 |
ポート番号指定 ( get_ports()より )
Definition at line 56 of file ConnectTest.py.
| int ConnectTest.g_port2 = 1 |
Definition at line 57 of file ConnectTest.py.
| int ConnectTest.g_port3 = 2 |
Definition at line 58 of file ConnectTest.py.
| string ConnectTest.g_push_policy = "NEW" |
Definition at line 50 of file ConnectTest.py.
| string ConnectTest.g_push_rate = "2000" |
Definition at line 51 of file ConnectTest.py.
| string ConnectTest.g_skip_count = "4" |
Definition at line 52 of file ConnectTest.py.
| string ConnectTest.g_subscription_type = "flush" |
Definition at line 49 of file ConnectTest.py.
| string ConnectTest.g_test_case = "case" |
Definition at line 93 of file ConnectTest.py.
| string ConnectTest.g_test_cnt = "count" |
Definition at line 94 of file ConnectTest.py.
| string ConnectTest.g_test_name = "<< component connection test >>" |
Definition at line 17 of file ConnectTest.py.
| string ConnectTest.g_test_ng = "NG detected." |
Definition at line 96 of file ConnectTest.py.
| string ConnectTest.g_test_ng_message = " < received-data >" |
Definition at line 97 of file ConnectTest.py.
| string ConnectTest.g_test_ok = "OK." |
Definition at line 95 of file ConnectTest.py.
| string ConnectTest.g_test_result_file = "./ResultTest.log" |
Definition at line 92 of file ConnectTest.py.
| ConnectTest.list0 = env.name_space["localhost:2809"].list_obj() |
Definition at line 26 of file ConnectTest.py.
| int ConnectTest.loop_count = 3 |
ケース毎のテスト回数
Definition at line 213 of file ConnectTest.py.
| string ConnectTest.message = g_mess_header + g_test_case + str(case_no) + " " |
Definition at line 240 of file ConnectTest.py.
| ConnectTest.ns = env.name_space['localhost:2809'] |
Definition at line 28 of file ConnectTest.py.
| ConnectTest.ret0 = g_out_ports[g_port1].connect(g_conprof1) |
1 コネクタープロファイル設定
2 受信データファイル削除
3 ポート接続 データポート1 TimedFloat
1 コネクタープロファイル設定 3 ポート接続 データポート1 TimedFloat
1 コネクタープロファイル設定 4 アクティベート 3 ポート接続 データポート1 TimedFloat
Definition at line 254 of file ConnectTest.py.
| ConnectTest.ret1 = g_out_ports[g_port2].connect(g_conprof2) |
Definition at line 257 of file ConnectTest.py.
| ConnectTest.ret2 = g_out_ports[g_port3].connect(g_conprof3) |
Definition at line 260 of file ConnectTest.py.
| ConnectTest.s2 = fin2.readline() |
Definition at line 397 of file ConnectTest.py.
| int ConnectTest.sleep_act_time = 10 |
activate_componentからdeactivate_componentまでのスリープ時間(秒数)
Definition at line 219 of file ConnectTest.py.
| int ConnectTest.sleep_connect_time = 2 |
connectからdisconnectまでのスリープ時間(秒数)
Definition at line 225 of file ConnectTest.py.
| int ConnectTest.sleep_for_time = 2 |
forループのスリープ時間(秒数)
Definition at line 222 of file ConnectTest.py.
| int ConnectTest.sleep_recv_act_time = 1 |
受信側activate_componentから送信側activate_componentまでのスリープ時間(秒数)
Definition at line 216 of file ConnectTest.py.