sick_scan_xd_simu.py
Go to the documentation of this file.
1 import os, subprocess, sys, time
2 from datetime import datetime, timezone
3 from sick_scan_xd_simu_cfg import SickScanXdSimuConfig
4 from sick_scan_xd_subscriber import SickScanXdMonitor
5 from sick_scan_xd_simu_report import filter_logfile, SickScanXdStatus, SickScanXdMsgStatus, SickScanXdSimuReport
6 
7 def start_process(os_name, cmd_list, wait_before, wait_after, logfilepath=""):
8  """
9  Starts a subprocess running a given list of commands
10  """
11  proc = None
12  if len(cmd_list) > 0:
13  time.sleep(wait_before)
14  if os_name == "windows":
15  cmd = " && ".join(cmd_list)
16  proc = subprocess.Popen(["cmd", "/c", cmd])
17  else:
18  cmd = " ; ".join(cmd_list)
19  if len(logfilepath) > 0:
20  cmd = f"(({cmd}) | tee {logfilepath})"
21  proc = subprocess.Popen(["bash", "-c", cmd])
22  print(f"Started subprocess {proc}, running command {cmd}")
23  time.sleep(wait_after)
24  return proc
25 
26 def simu_main():
27  """
28  Runs a sick_scan_xd simulation:
29  * Start a tiny sopas test server to emulate sopas responses of a multiScan,
30  * Launch sick_scan_xd,
31  * Start rviz to display pointclouds and laserscan messages
32  * Replay UDP packets with scan data, previously recorded and converted to json-file
33  * Receive the pointcloud-, laserscan- and IMU-messages published by sick_scan_xd
34  * Compare the received messages to predefined reference messages
35  * Check against errors, verify complete and correct messages
36  * Return status 0 for success or an error code in case of any failures.
37  """
38 
39  # Parse commandline arguments and read configuration
40  simu_start_time = datetime.now(timezone.utc)
41  simu_start_time_str = f"{simu_start_time.strftime('%a %m/%d/%Y %H:%M:%S utc')}"
42  report = SickScanXdSimuReport()
43  config = SickScanXdSimuConfig(simu_start_time)
44  if len(config.error_messages) > 0:
45  report.append_message(SickScanXdMsgStatus.ERROR, f"## ERROR in sick_scan_xd_simu: invalid configuration:")
46  for error_message in config.error_messages:
47  report.append_message(SickScanXdMsgStatus.ERROR, error_message)
48  report.set_exit_status(SickScanXdStatus.CONFIG_ERROR)
49  elif len(config.cmd_sick_scan_xd) == 0:
50  report.append_message(SickScanXdMsgStatus.ERROR, f"## ERROR in sick_scan_xd_simu: ROS version {config.ros_version} on {config.os_name} not supported")
51  report.set_exit_status(SickScanXdStatus.CONFIG_ERROR)
52  if report.get_exit_status() != SickScanXdStatus.SUCCESS:
53  report.print_messages()
54  return int(report.get_exit_status())
55  report.append_message(SickScanXdMsgStatus.INFO, f"sick_scan_xd_simu started {simu_start_time_str} on {config.os_name}, ROS {config.ros_version}")
56  report.append_file_links(SickScanXdMsgStatus.INFO, "sick_scan_xd_simu config file: ", [config.config_file])
57 
58  # Run simulation and monitoring
59  proc_sopas_server_logfile = "proc_sopas_server.log"
60  if len(config.cmd_sopas_server) > 0:
61  proc_sopas_server = start_process(config.os_name, config.cmd_init + config.cmd_sopas_server, 0, 5, logfilepath=f"{config.log_folder}/{proc_sopas_server_logfile}")
62  if len(config.cmd_rviz) > 0:
63  proc_rviz = start_process(config.os_name, config.cmd_init + config.cmd_rviz, 0, 5)
64  proc_sick_scan_xd_logfile = "proc_sick_scan_xd.log"
65  proc_sick_scan_xd = start_process(config.os_name, config.cmd_init + config.cmd_sick_scan_xd, 0, 5, logfilepath=f"{config.log_folder}/{proc_sick_scan_xd_logfile}")
66  sick_scan_xd_monitor = SickScanXdMonitor(config, True)
67  proc_scandata_sender = None
68  proc_scandata_sender_logfile = ""
69  if len(config.cmd_udp_scandata_sender) > 0:
70  proc_scandata_sender_logfile = "proc_scandata_sender.log"
71  proc_scandata_sender = start_process(config.os_name, config.cmd_init + config.cmd_udp_scandata_sender, 1, 5, logfilepath=f"{config.log_folder}/{proc_scandata_sender_logfile}")
72 
73  # Wait until simulation finished
74  if proc_scandata_sender is not None:
75  status_scandata_sender = proc_scandata_sender.wait()
76  time.sleep(1)
77  print(f"Finished process {proc_scandata_sender}, exit_status = {status_scandata_sender}")
78  if config.run_simu_seconds_before_shutdown > 0:
79  time.sleep(config.run_simu_seconds_before_shutdown)
80  if config.ros_version == "none": # Shutdown and cleanup
81  proc_shutdown = start_process(config.os_name, config.cmd_init + config.cmd_shutdown, 0, 5)
82  proc_shutdown.wait()
83 
84  # Verification of received pointcloud and laserscan messages
85  verify_success = False
86  if len(config.save_messages_jsonfile) > 0:
87  if config.ros_version == "none":
88  sick_scan_xd_monitor.import_received_messages_from_jsonfile(f"{config.log_folder}/{config.save_messages_jsonfile}")
89  else:
90  sick_scan_xd_monitor.export_received_messages_to_jsonfile(f"{config.log_folder}/{config.save_messages_jsonfile}")
91  report.append_file_links(SickScanXdMsgStatus.INFO, "sick_scan_xd_simu: received messages exported to file ", [config.save_messages_jsonfile])
92  if len(config.reference_messages_jsonfile) > 0:
93  report.append_file_links(SickScanXdMsgStatus.INFO, "sick_scan_xd_simu: references messages from file ", [config.reference_messages_jsonfile])
94  print("sick_scan_xd_simu finished, verifying messages ...")
95  verify_success = sick_scan_xd_monitor.verify_messages(report)
96  if len(proc_sick_scan_xd_logfile) > 0:
97  report.append_file_links(SickScanXdMsgStatus.INFO, "sick_scan_xd_simu: sick_scan_xd process log in file ", [proc_sick_scan_xd_logfile])
98  log_error_messages = filter_logfile(f"{config.log_folder}/{proc_sick_scan_xd_logfile}", [ "ERROR" ])
99  if len(log_error_messages) > 0:
100  report.set_exit_status(SickScanXdStatus.TEST_ERROR)
101  report.append_message(SickScanXdMsgStatus.ERROR, f"\n## ERROR messages found in logfile {proc_sick_scan_xd_logfile}:\n")
102  for log_error_message in log_error_messages:
103  report.append_message(SickScanXdMsgStatus.ERROR, log_error_message)
104  report.append_message(SickScanXdMsgStatus.ERROR, f"\n## ERROR in sick_scan_xd_simu: ERROR messages found in logfile {proc_sick_scan_xd_logfile}, TEST FAILED\n")
105  if len(proc_sopas_server_logfile) > 0:
106  report.append_file_links(SickScanXdMsgStatus.INFO, "sick_scan_xd_simu: sopas server process log in file ", [proc_sopas_server_logfile])
107  if len(proc_scandata_sender_logfile) > 0:
108  report.append_file_links(SickScanXdMsgStatus.INFO, "sick_scan_xd_simu: udp scandata sender log in file ", [proc_scandata_sender_logfile])
109  if verify_success:
110  report.append_message(SickScanXdMsgStatus.INFO, f"\nsick_scan_xd_monitor.verify_messages sucessful, TEST PASSED\n")
111  else:
112  report.set_exit_status(SickScanXdStatus.TEST_ERROR)
113  report.append_message(SickScanXdMsgStatus.ERROR, f"\n## ERROR in sick_scan_xd_simu: sick_scan_xd_monitor.verify_messages returned without success, TEST FAILED\n")
114  if report.get_exit_status() == SickScanXdStatus.SUCCESS:
115  print("\nsick_scan_xd_simu finished, messages successfully verified, TEST PASSED\n")
116  else:
117  print("\n## sick_scan_xd_simu finished with ERROR, TEST FAILED\n")
118 
119  # Shutdown and cleanup
120  if config.ros_version != "none": # Shutdown and cleanup
121  proc_shutdown = start_process(config.os_name, config.cmd_init + config.cmd_shutdown, 0, 5)
122  proc_shutdown.wait()
123 
124  # Print final report
125  print(f"\nsick_scan_xd_simu finished with exit status {int(report.get_exit_status())}\n")
126  report.save_md_file(config.log_folder, config.report_md_filename)
127  with open(f"{config.log_folder}/sick_scan_xd_summary.md", "w") as file_stream:
128  if report.get_exit_status() == SickScanXdStatus.SUCCESS:
129  status_text = "test passed"
130  else:
131  status_text = "**TEST FAILED**"
132  report_md_filepath = f"{os.path.basename(config.log_folder)}/{config.report_md_filename}"
133  report_html_filepath = f"{report_md_filepath}.html"
134  file_stream.write(f"sick_scan_xd_simu {simu_start_time_str} on {config.os_name}, ROS {config.ros_version}, {os.path.basename(config.config_file)}: {status_text}, [{report_md_filepath}]({report_md_filepath}), [{report_html_filepath}]({report_html_filepath})\n\n")
135  report.print_messages()
136  return int(report.get_exit_status()) # 0 = success, otherwise error
137 
138 if __name__ == '__main__':
139  status = simu_main()
140  sys.exit(status)
sick_scan_xd_simu.start_process
def start_process(os_name, cmd_list, wait_before, wait_after, logfilepath="")
Definition: sick_scan_xd_simu.py:7
roswrap::console::print
ROSCONSOLE_DECL void print(FilterBase *filter, void *logger, Level level, const char *file, int line, const char *function, const char *fmt,...) ROSCONSOLE_PRINTF_ATTRIBUTE(7
Don't call this directly. Use the ROS_LOG() macro instead.
sick_scan_xd_simu_report.SickScanXdSimuReport
Definition: sick_scan_xd_simu_report.py:27
sick_scan_xd_simu.simu_main
def simu_main()
Definition: sick_scan_xd_simu.py:26
sick_scan_xd_subscriber.SickScanXdMonitor
Definition: sick_scan_xd_subscriber.py:315
sick_scan_xd_simu_report.filter_logfile
def filter_logfile(logfile, keywords)
Definition: sick_scan_xd_simu_report.py:5
sick_scan_xd_simu_cfg.SickScanXdSimuConfig
Definition: sick_scan_xd_simu_cfg.py:8


sick_scan_xd
Author(s): Michael Lehning , Jochen Sprickerhof , Martin Günther
autogenerated on Fri Oct 25 2024 02:47:11