Namespaces | Functions | Variables
ConnectTest.py File Reference

Go to the source code of this file.

Namespaces

namespace  ConnectTest

Functions

def ConnectTest.delete_recv_file
def ConnectTest.diff_file
def ConnectTest.make_connecter_profile

Variables

tuple ConnectTest.bret = diff_file()
 7 送受信データ比較
int ConnectTest.case_no = 0
 6 ポート切断
tuple ConnectTest.ec_recv = g_compo_recv.rtc_ref.get_owned_contexts()
tuple ConnectTest.ec_send = g_compo_send.rtc_ref.get_owned_contexts()
tuple ConnectTest.env = RtmEnv(sys.argv, ["localhost:2809"])
 ネームサーバー定義 env = RtmEnv(sys.argv, ["localhost:2809"]) list0 = env.name_space["localhost:2809"].list_obj() env.name_space['localhost:2809'].rtc_handles.keys() ns = env.name_space['localhost:2809']
tuple ConnectTest.fin2 = open(g_diff_recv_file, 'r')
tuple ConnectTest.fout = open(g_test_result_file, 'w')
 6 ポート切断
string ConnectTest.g_check_message = " file not found."
list ConnectTest.g_compo_recv = ns.rtc_handles["AutoTestIn0.rtc"]
list ConnectTest.g_compo_send = ns.rtc_handles["AutoTestOut0.rtc"]
string ConnectTest.g_connector_id1 = "001"
string ConnectTest.g_connector_id2 = "002"
string ConnectTest.g_connector_id3 = "003"
tuple ConnectTest.g_conprof1 = RTC.ConnectorProfile(g_name1, g_connector_id1, [g_out_ports[g_port1], g_in_ports[g_port1]], [SDOPackage.NameValue("dataport.data_type",any.to_any(g_data_type1)),SDOPackage.NameValue("dataport.interface_type",any.to_any(g_interface_type1)),SDOPackage.NameValue("dataport.dataflow_type",any.to_any(g_dataflow_type)),SDOPackage.NameValue("dataport.subscription_type",any.to_any(g_subscription_type)),SDOPackage.NameValue("dataport.publisher.push_policy",any.to_any(g_push_policy)),SDOPackage.NameValue("dataport.publisher.push_rate",any.to_any(g_push_rate)),SDOPackage.NameValue("dataport.publisher.skip_count",any.to_any(g_skip_count))])
tuple ConnectTest.g_conprof2 = RTC.ConnectorProfile(g_name2, g_connector_id2, [g_out_ports[g_port2], g_in_ports[g_port2]], [SDOPackage.NameValue("dataport.data_type",any.to_any(g_data_type2)),SDOPackage.NameValue("dataport.interface_type",any.to_any(g_interface_type1)),SDOPackage.NameValue("dataport.dataflow_type",any.to_any(g_dataflow_type)),SDOPackage.NameValue("dataport.subscription_type",any.to_any(g_subscription_type)),SDOPackage.NameValue("dataport.publisher.push_policy",any.to_any(g_push_policy)),SDOPackage.NameValue("dataport.publisher.push_rate",any.to_any(g_push_rate)),SDOPackage.NameValue("dataport.publisher.skip_count",any.to_any(g_skip_count))])
tuple ConnectTest.g_conprof3 = RTC.ConnectorProfile(g_name3, g_connector_id3, [g_out_ports[g_port3], g_in_ports[g_port3]], [SDOPackage.NameValue("dataport.interface_type",any.to_any(g_interface_type3))])
string ConnectTest.g_data_type1 = "TimedFloat"
string ConnectTest.g_data_type2 = "TimedFloatSeq"
string ConnectTest.g_dataflow_type = "push"
string ConnectTest.g_diff_recv_file = "./received-data"
string ConnectTest.g_diff_send_file = "./original-data"
tuple ConnectTest.g_in_ports = g_compo_recv.rtc_ref.get_ports()
string ConnectTest.g_interface_type1 = "corba_cdr"
string ConnectTest.g_interface_type3 = "MyService"
string ConnectTest.g_mess_footer = " > "
string ConnectTest.g_mess_header = "< "
string ConnectTest.g_name1 = "out"
 ConnectorProfile(name, connector_id, ports, properties) String name String connector_id RTC.PortService ports[] SDOPackage.NameValue properties[].
string ConnectTest.g_name2 = "seqout"
 データポート TimedFloatSeq
string ConnectTest.g_name3 = "MyService"
 サービスポート
tuple ConnectTest.g_out_ports = g_compo_send.rtc_ref.get_ports()
int ConnectTest.g_port1 = 0
 ポート番号指定 ( get_ports()より )
int ConnectTest.g_port2 = 1
int ConnectTest.g_port3 = 2
string ConnectTest.g_push_policy = "NEW"
string ConnectTest.g_push_rate = "2000"
string ConnectTest.g_skip_count = "4"
string ConnectTest.g_subscription_type = "flush"
string ConnectTest.g_test_case = "case"
string ConnectTest.g_test_cnt = "count"
string ConnectTest.g_test_name = "<< component connection test >>"
string ConnectTest.g_test_ng = "NG detected."
string ConnectTest.g_test_ng_message = " < received-data >"
string ConnectTest.g_test_ok = "OK."
string ConnectTest.g_test_result_file = "./ResultTest.log"
list ConnectTest.list0 = env.name_space["localhost:2809"]
int ConnectTest.loop_count = 3
 ケース毎のテスト回数
tuple ConnectTest.message = g_mess_header+g_test_case+str(case_no)
list ConnectTest.ns = env.name_space['localhost:2809']
list ConnectTest.ret0 = g_out_ports[g_port1]
 1 コネクタープロファイル設定
list ConnectTest.ret1 = g_out_ports[g_port2]
list ConnectTest.ret2 = g_out_ports[g_port3]
tuple ConnectTest.s2 = fin2.readline()
int ConnectTest.sleep_act_time = 10
 activate_componentからdeactivate_componentまでのスリープ時間(秒数)
int ConnectTest.sleep_connect_time = 2
 connectからdisconnectまでのスリープ時間(秒数)
int ConnectTest.sleep_for_time = 2
 forループのスリープ時間(秒数)
int ConnectTest.sleep_recv_act_time = 1
 受信側activate_componentから送信側activate_componentまでのスリープ時間(秒数)


openrtm_aist
Author(s): Noriaki Ando
autogenerated on Sat Jun 8 2019 18:49:08