1 import os, subprocess, sys, time
2 from datetime
import datetime, timezone
3 from sick_scan_xd_simu_cfg
import SickScanXdSimuConfig
4 from sick_scan_xd_subscriber
import SickScanXdMonitor
5 from sick_scan_xd_simu_report
import filter_logfile, SickScanXdStatus, SickScanXdMsgStatus, SickScanXdSimuReport
7 def start_process(os_name, cmd_list, wait_before, wait_after, logfilepath=""):
9 Starts a subprocess running a given list of commands
13 time.sleep(wait_before)
14 if os_name ==
"windows":
15 cmd =
" && ".join(cmd_list)
16 proc = subprocess.Popen([
"cmd",
"/c", cmd])
18 cmd =
" ; ".join(cmd_list)
19 if len(logfilepath) > 0:
20 cmd = f
"(({cmd}) | tee {logfilepath})"
21 proc = subprocess.Popen([
"bash",
"-c", cmd])
22 print(f
"Started subprocess {proc}, running command {cmd}")
23 time.sleep(wait_after)
28 Runs a sick_scan_xd simulation:
29 * Start a tiny sopas test server to emulate sopas responses of a multiScan,
30 * Launch sick_scan_xd,
31 * Start rviz to display pointclouds and laserscan messages
32 * Replay UDP packets with scan data, previously recorded and converted to json-file
33 * Receive the pointcloud-, laserscan- and IMU-messages published by sick_scan_xd
34 * Compare the received messages to predefined reference messages
35 * Check against errors, verify complete and correct messages
36 * Return status 0 for success or an error code in case of any failures.
40 simu_start_time = datetime.now(timezone.utc)
41 simu_start_time_str = f
"{simu_start_time.strftime('%a %m/%d/%Y %H:%M:%S utc')}"
44 if len(config.error_messages) > 0:
45 report.append_message(SickScanXdMsgStatus.ERROR, f
"## ERROR in sick_scan_xd_simu: invalid configuration:")
46 for error_message
in config.error_messages:
47 report.append_message(SickScanXdMsgStatus.ERROR, error_message)
48 report.set_exit_status(SickScanXdStatus.CONFIG_ERROR)
49 elif len(config.cmd_sick_scan_xd) == 0:
50 report.append_message(SickScanXdMsgStatus.ERROR, f
"## ERROR in sick_scan_xd_simu: ROS version {config.ros_version} on {config.os_name} not supported")
51 report.set_exit_status(SickScanXdStatus.CONFIG_ERROR)
52 if report.get_exit_status() != SickScanXdStatus.SUCCESS:
53 report.print_messages()
54 return int(report.get_exit_status())
55 report.append_message(SickScanXdMsgStatus.INFO, f
"sick_scan_xd_simu started {simu_start_time_str} on {config.os_name}, ROS {config.ros_version}")
56 report.append_file_links(SickScanXdMsgStatus.INFO,
"sick_scan_xd_simu config file: ", [config.config_file])
59 proc_sopas_server_logfile =
"proc_sopas_server.log"
60 if len(config.cmd_sopas_server) > 0:
61 proc_sopas_server =
start_process(config.os_name, config.cmd_init + config.cmd_sopas_server, 0, 5, logfilepath=f
"{config.log_folder}/{proc_sopas_server_logfile}")
62 if len(config.cmd_rviz) > 0:
63 proc_rviz =
start_process(config.os_name, config.cmd_init + config.cmd_rviz, 0, 5)
64 proc_sick_scan_xd_logfile =
"proc_sick_scan_xd.log"
65 proc_sick_scan_xd =
start_process(config.os_name, config.cmd_init + config.cmd_sick_scan_xd, 0, 5, logfilepath=f
"{config.log_folder}/{proc_sick_scan_xd_logfile}")
67 proc_scandata_sender =
None
68 proc_scandata_sender_logfile =
""
69 if len(config.cmd_udp_scandata_sender) > 0:
70 proc_scandata_sender_logfile =
"proc_scandata_sender.log"
71 proc_scandata_sender =
start_process(config.os_name, config.cmd_init + config.cmd_udp_scandata_sender, 1, 5, logfilepath=f
"{config.log_folder}/{proc_scandata_sender_logfile}")
74 if proc_scandata_sender
is not None:
75 status_scandata_sender = proc_scandata_sender.wait()
77 print(f
"Finished process {proc_scandata_sender}, exit_status = {status_scandata_sender}")
78 if config.run_simu_seconds_before_shutdown > 0:
79 time.sleep(config.run_simu_seconds_before_shutdown)
80 if config.ros_version ==
"none":
81 proc_shutdown =
start_process(config.os_name, config.cmd_init + config.cmd_shutdown, 0, 5)
85 verify_success =
False
86 if len(config.save_messages_jsonfile) > 0:
87 if config.ros_version ==
"none":
88 sick_scan_xd_monitor.import_received_messages_from_jsonfile(f
"{config.log_folder}/{config.save_messages_jsonfile}")
90 sick_scan_xd_monitor.export_received_messages_to_jsonfile(f
"{config.log_folder}/{config.save_messages_jsonfile}")
91 report.append_file_links(SickScanXdMsgStatus.INFO,
"sick_scan_xd_simu: received messages exported to file ", [config.save_messages_jsonfile])
92 if len(config.reference_messages_jsonfile) > 0:
93 report.append_file_links(SickScanXdMsgStatus.INFO,
"sick_scan_xd_simu: references messages from file ", [config.reference_messages_jsonfile])
94 print(
"sick_scan_xd_simu finished, verifying messages ...")
95 verify_success = sick_scan_xd_monitor.verify_messages(report)
96 if len(proc_sick_scan_xd_logfile) > 0:
97 report.append_file_links(SickScanXdMsgStatus.INFO,
"sick_scan_xd_simu: sick_scan_xd process log in file ", [proc_sick_scan_xd_logfile])
98 log_error_messages =
filter_logfile(f
"{config.log_folder}/{proc_sick_scan_xd_logfile}", [
"ERROR" ])
99 if len(log_error_messages) > 0:
100 report.set_exit_status(SickScanXdStatus.TEST_ERROR)
101 report.append_message(SickScanXdMsgStatus.ERROR, f
"\n## ERROR messages found in logfile {proc_sick_scan_xd_logfile}:\n")
102 for log_error_message
in log_error_messages:
103 report.append_message(SickScanXdMsgStatus.ERROR, log_error_message)
104 report.append_message(SickScanXdMsgStatus.ERROR, f
"\n## ERROR in sick_scan_xd_simu: ERROR messages found in logfile {proc_sick_scan_xd_logfile}, TEST FAILED\n")
105 if len(proc_sopas_server_logfile) > 0:
106 report.append_file_links(SickScanXdMsgStatus.INFO,
"sick_scan_xd_simu: sopas server process log in file ", [proc_sopas_server_logfile])
107 if len(proc_scandata_sender_logfile) > 0:
108 report.append_file_links(SickScanXdMsgStatus.INFO,
"sick_scan_xd_simu: udp scandata sender log in file ", [proc_scandata_sender_logfile])
110 report.append_message(SickScanXdMsgStatus.INFO, f
"\nsick_scan_xd_monitor.verify_messages sucessful, TEST PASSED\n")
112 report.set_exit_status(SickScanXdStatus.TEST_ERROR)
113 report.append_message(SickScanXdMsgStatus.ERROR, f
"\n## ERROR in sick_scan_xd_simu: sick_scan_xd_monitor.verify_messages returned without success, TEST FAILED\n")
114 if report.get_exit_status() == SickScanXdStatus.SUCCESS:
115 print(
"\nsick_scan_xd_simu finished, messages successfully verified, TEST PASSED\n")
117 print(
"\n## sick_scan_xd_simu finished with ERROR, TEST FAILED\n")
120 if config.ros_version !=
"none":
121 proc_shutdown =
start_process(config.os_name, config.cmd_init + config.cmd_shutdown, 0, 5)
125 print(f
"\nsick_scan_xd_simu finished with exit status {int(report.get_exit_status())}\n")
126 report.save_md_file(config.log_folder, config.report_md_filename)
127 with open(f
"{config.log_folder}/sick_scan_xd_summary.md",
"w")
as file_stream:
128 if report.get_exit_status() == SickScanXdStatus.SUCCESS:
129 status_text =
"test passed"
131 status_text =
"**TEST FAILED**"
132 report_md_filepath = f
"{os.path.basename(config.log_folder)}/{config.report_md_filename}"
133 report_html_filepath = f
"{report_md_filepath}.html"
134 file_stream.write(f
"sick_scan_xd_simu {simu_start_time_str} on {config.os_name}, ROS {config.ros_version}, {os.path.basename(config.config_file)}: {status_text}, [{report_md_filepath}]({report_md_filepath}), [{report_html_filepath}]({report_html_filepath})\n\n")
135 report.print_messages()
136 return int(report.get_exit_status())
138 if __name__ ==
'__main__':