msgpack_exporter.cpp
Go to the documentation of this file.
1 /*
2  * @brief msgpack_exporter runs a background thread to consume and export msgpack/compact data from the sick 3D lidar multiScan136
3  * to optionally csv file or plotted diagram
4  *
5  * Copyright (C) 2020 Ing.-Buero Dr. Michael Lehning, Hildesheim
6  * Copyright (C) 2020 SICK AG, Waldkirch
7  *
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  * http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  *
20  * All rights reserved.
21  *
22  * Redistribution and use in source and binary forms, with or without
23  * modification, are permitted provided that the following conditions are met:
24  *
25  * * Redistributions of source code must retain the above copyright
26  * notice, this list of conditions and the following disclaimer.
27  * * Redistributions in binary form must reproduce the above copyright
28  * notice, this list of conditions and the following disclaimer in the
29  * documentation and/or other materials provided with the distribution.
30  * * Neither the name of SICK AG nor the names of its
31  * contributors may be used to endorse or promote products derived from
32  * this software without specific prior written permission
33  * * Neither the name of Ing.-Buero Dr. Michael Lehning nor the names of its
34  * contributors may be used to endorse or promote products derived from
35  * this software without specific prior written permission
36  *
37  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
38  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
39  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
40  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
41  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
42  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
43  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
44  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
45  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
46  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
47  * POSSIBILITY OF SUCH DAMAGE.
48  *
49  * Authors:
50  * Michael Lehning <michael.lehning@lehning.de>
51  *
52  * Copyright 2020 SICK AG
53  * Copyright 2020 Ing.-Buero Dr. Michael Lehning
54  *
55  */
58 
59 /*
60  * @brief Default constructor.
61  */
62 sick_scansegment_xd::MsgPackExporter::MsgPackExporter() : m_udp_fifo(0), m_msgpack_fifo(0), m_logfolder(""), m_export_csv(false), m_verbose(false), m_measure_timing(false), m_exporter_thread(0), m_run_exporter_thread(false)
63 {
64 }
65 
66 /*
67  * @brief Initializing constructor
68  * @param[in] udp_fifo fifo buffering udp packages (for informational messages only)
69  * @param[in] msgpack_fifo fifo buffering ScanSegmentParserOutput data from multiScan136 (for csv export and visualization)
70  * @param[in] logfolder output folder for optional csv-files
71  * @param[in] export_csv true: export ScanSegmentParserOutput data to csv files
72  * @param[in] verbose true: enable debug output, false: quiet mode (default)
73  * @param[in] measure_timing true: duration and latency of msgpack conversion and export is measured, default: false
74  */
75 sick_scansegment_xd::MsgPackExporter::MsgPackExporter(sick_scansegment_xd::PayloadFifo* udp_fifo, sick_scansegment_xd::Fifo<ScanSegmentParserOutput>* msgpack_fifo, const std::string& logfolder, bool export_csv, bool verbose, bool measure_timing)
76 : m_udp_fifo(udp_fifo), m_msgpack_fifo(msgpack_fifo), m_logfolder(logfolder), m_export_csv(export_csv), m_verbose(verbose), m_measure_timing(measure_timing), m_exporter_thread(0), m_run_exporter_thread(false)
77 {
78 }
79 
80 /*
81  * @brief Default destructor.
82  */
84 {
85  Close();
86 }
87 
88 /*
89  * @brief Registers a listener to msgpack export data. MsgPackExporter will notify registered listeners
90  * whenever msgpack data have been received and successfully converted by calling listener->HandleMsgPackData().
91  */
93 {
94  std::unique_lock<std::mutex> lock(m_listener_mutex);
95  m_listener.push_back(listener);
96 }
97 
98 /*
99  * @brief Removes a registered listener.
100  */
102 {
103  std::unique_lock<std::mutex> lock(m_listener_mutex);
104  for (std::list<sick_scansegment_xd::MsgPackExportListenerIF*>::iterator iter = m_listener.begin(); iter != m_listener.end(); )
105  {
106  if (*iter == listener)
107  iter = m_listener.erase(iter);
108  else
109  iter++;
110  }
111 }
112 
113 /*
114  * @brief Returns the list of registered listeners
115  */
116 std::list<sick_scansegment_xd::MsgPackExportListenerIF*> sick_scansegment_xd::MsgPackExporter::GetExportListener()
117 {
118  std::unique_lock<std::mutex> lock(m_listener_mutex);
119  return m_listener;
120 }
121 
122 /*
123  * @brief Starts a background thread to pops msgpack data packages from the input fifo and optionally export them to csv and/or plot the lidar points.
124  * Note: If visualization is activated, export must run in the main thread, i.e. this function blocks and returns after pressing ESC, 'q' or 'Q'.
125  * If visualization is NOT activated, this function starts a background thread to run the data export, i.e. this function does NOT block and returns immediately.
126  */
128 {
129  m_run_exporter_thread = true;
130  m_exporter_thread = new std::thread(&sick_scansegment_xd::MsgPackExporter::RunCb, this); // Without visualization, msgpack export runs in background thread
131  return true;
132 }
133 
134 /*
135  * @brief Stops the background thread
136  */
138 {
139  m_run_exporter_thread = false;
140 }
141 
142 /*
143  * @brief Stops, joins and deletes the background thread
144  */
146 {
147  m_run_exporter_thread = false;
148  if (m_exporter_thread)
149  {
150  if (m_exporter_thread->joinable())
151  m_exporter_thread->join();
152  delete m_exporter_thread;
153  m_exporter_thread = 0;
154  }
155 }
156 
157 /*
158  * @brief Thread callback, runs the exporter. Pops msgpack data packages from the input fifo and optionally export them to csv and/or plot the lidar points.
159  */
161 {
162  if (!m_udp_fifo || !m_msgpack_fifo)
163  {
164  ROS_ERROR_STREAM("## ERROR MsgPack/Compact-Exporter::Run(): MsgPack/Compact-Exporter not initialized.");
165  return false;
166  }
167  try
168  {
169  fifo_timestamp recv_start_timestamp = fifo_clock::now();
170  fifo_timestamp last_print_timestamp = fifo_clock::now();
171  size_t msg_exported_counter = 0; // number of exported scandata (msgpack or compact)
172  size_t msg_first_udp_counter = 0; // number of udp datagrams received
173  size_t max_count_udp_messages_in_fifo = 0;
174  size_t max_count_output_messages_in_fifo = 0;
175  int msg_cnt_delta_max = 0;
176  sick_scansegment_xd::TimingStatistics duration_datahandling_milliseconds;
177  while (m_run_exporter_thread)
178  {
180  fifo_timestamp msgpack_timestamp;
181  size_t msgpack_counter = 0;
182  if (m_msgpack_fifo->Pop(msgpack_output, msgpack_timestamp, msgpack_counter) && m_run_exporter_thread)
183  {
184  // Notify registered listeners about new scandata (msgpack or compact)
185  std::list<sick_scansegment_xd::MsgPackExportListenerIF*> listener = GetExportListener();
186  for (std::list<sick_scansegment_xd::MsgPackExportListenerIF*>::iterator iter = listener.begin(); iter != listener.end(); iter++)
187  {
188  if (*iter)
189  {
190  (*iter)->HandleMsgPackData(msgpack_output);
191  }
192  }
193  // Optionally export to csv file
194  if (m_export_csv && !m_logfolder.empty())
195  {
196  std::string csv_file = m_logfolder + "/scansegment_" + sick_scansegment_xd::FormatNumber(msgpack_output.segmentIndex, 6, true, false, -1) + ".csv";
197  if (!sick_scansegment_xd::MsgPackParser::WriteCSV({ msgpack_output }, csv_file, true))
198  ROS_ERROR_STREAM("## ERROR MsgPackParser::WriteCSV() failed.");
199  }
200  // Profiling and time measurement
201  if (msg_exported_counter == 0) // first time receiving scandata (msgpack or compact)
202  {
203  msg_first_udp_counter = msgpack_counter;
204  recv_start_timestamp = msgpack_timestamp;
205  }
206  msg_exported_counter += 1;
207  size_t msg_udp_received_counter = (msgpack_counter - msg_first_udp_counter) + 1;
208  if (m_measure_timing)
209  {
210  int msg_cnt_delta = std::max(0, (int)msg_udp_received_counter - (int)msg_exported_counter);
211  double packages_lost_rate = std::abs((double)msg_cnt_delta) / (double)msg_udp_received_counter;
212  bool do_print_message = m_verbose &&
213  (sick_scansegment_xd::Fifo<ScanSegmentParserOutput>::Seconds(last_print_timestamp, fifo_clock::now()) > 1.0) && // avoid printing with more than 1 Hz
214  ((msg_exported_counter%100) == 0);
215  if (msg_udp_received_counter != msg_exported_counter && msg_cnt_delta > msg_cnt_delta_max && do_print_message) // Test mode only, multiScan emulator must be started after lidar3d_multiscan_recv
216  {
217  ROS_INFO_STREAM("MsgPack/Compact-Exporter::Run(): " << msg_udp_received_counter << " udp messages received, " << msg_exported_counter << " messages exported, " << (100.0 * packages_lost_rate) << "% package lost");
218  msg_cnt_delta_max = msg_cnt_delta;
219  last_print_timestamp = fifo_clock::now();
220  }
221  size_t current_udp_fifo_size = m_udp_fifo->Size();
222  size_t current_output_fifo_size = m_msgpack_fifo->Size();
223  double duration_datahandling_seconds = sick_scansegment_xd::Fifo<ScanSegmentParserOutput>::Seconds(msgpack_timestamp, fifo_clock::now());
224  duration_datahandling_milliseconds.AddTimeMilliseconds(1000.0 * duration_datahandling_seconds);
225  max_count_udp_messages_in_fifo = std::max(max_count_udp_messages_in_fifo, current_udp_fifo_size + 1);
226  max_count_output_messages_in_fifo = std::max(max_count_output_messages_in_fifo, current_output_fifo_size + 1);
227  double msg_exported_rate = (double)msg_exported_counter / sick_scansegment_xd::Fifo<ScanSegmentParserOutput>::Seconds(recv_start_timestamp, fifo_clock::now());
228  if (m_verbose && ((msg_exported_counter%100) == 0 || sick_scansegment_xd::Fifo<ScanSegmentParserOutput>::Seconds(last_print_timestamp, fifo_clock::now()) > 0.1)) // avoid printing with more than 100 Hz
229  {
230  ROS_INFO_STREAM("MsgPack/Compact-Exporter: " << current_udp_fifo_size << " udp packages still in input fifo, " << current_output_fifo_size << " messages still in output fifo, current segment index: " << msgpack_output.segmentIndex);
231  ROS_INFO_STREAM("MsgPack/Compact-Exporter: " << msg_udp_received_counter << " udp scandata messages received, " << msg_exported_counter << " messages exported (scan+imu), " << (100.0 * packages_lost_rate) << "% package lost.");
232  ROS_INFO_STREAM("MsgPack/Compact-Exporter: max. " << max_count_udp_messages_in_fifo << " udp messages buffered, max " << max_count_output_messages_in_fifo << " export messages buffered.");
233  std::stringstream s;
234  s << "MsgPack/Compact-Exporter: " << msg_exported_counter << " messages exported at " << std::fixed << std::setprecision(3) << msg_exported_rate << " Hz, mean time: "
235  << std::fixed << std::setprecision(3) << duration_datahandling_milliseconds.MeanMilliseconds() << " milliseconds/messages, "
236  << "stddev time: " << duration_datahandling_milliseconds.StddevMilliseconds() << ", " << "max time: " << duration_datahandling_milliseconds.MaxMilliseconds() << " milliseconds between udp receive and messages export, "
237  << "histogram=[" << duration_datahandling_milliseconds.PrintHistMilliseconds() << "]";
238  ROS_INFO_STREAM(s.str());
239  last_print_timestamp = fifo_clock::now();
240  }
241  }
242  }
243  }
244  if (m_measure_timing && m_verbose)
245  {
246  size_t current_udp_fifo_size = m_udp_fifo->Size();
247  size_t current_output_fifo_size = m_msgpack_fifo->Size();
248  double msg_exported_rate = (double)msg_exported_counter / sick_scansegment_xd::Fifo<ScanSegmentParserOutput>::Seconds(recv_start_timestamp, fifo_clock::now());
249  std::stringstream info1, info2;
250  info1 << "MsgPack/Compact-Exporter: finished, " << current_udp_fifo_size << " udp packages still in input fifo, " << current_output_fifo_size << " messages still in output fifo"
251  << ", max. " << max_count_udp_messages_in_fifo << " udp messages buffered, max " << max_count_output_messages_in_fifo << " export messages buffered.";
252  info2 << "MsgPack/Compact-Exporter: " << msg_exported_counter << " messages exported at " << msg_exported_rate << " Hz, mean time: " << duration_datahandling_milliseconds.MeanMilliseconds() << " milliseconds/messages, "
253  << "stddev time: " << duration_datahandling_milliseconds.StddevMilliseconds() << ", " << "max time: " << duration_datahandling_milliseconds.MaxMilliseconds() << " milliseconds between udp receive and messages export, "
254  << "histogram=[" << duration_datahandling_milliseconds.PrintHistMilliseconds() << "]";
255  ROS_INFO_STREAM(info1.str());
256  ROS_INFO_STREAM(info2.str());
257  std::cout << info1.str() << std::endl;
258  std::cout << info2.str() << std::endl;
259  }
260  m_run_exporter_thread = false;
261  return true;
262  }
263  catch (std::exception & e)
264  {
265  ROS_ERROR_STREAM("## ERROR MsgPack/Compact-Exporter::Run(): " << e.what());
266  }
267  m_run_exporter_thread = false;
268  return false;
269 }
sick_scansegment_xd::PayloadFifo
Definition: fifo.h:187
sick_scansegment_xd::MsgPackExporter::~MsgPackExporter
~MsgPackExporter()
Definition: msgpack_exporter.cpp:83
sick_scansegment_xd::TimingStatistics::MaxMilliseconds
double MaxMilliseconds(void) const
Definition: time_util.h:117
sick_scansegment_xd::TimingStatistics::MeanMilliseconds
double MeanMilliseconds(void) const
Definition: time_util.h:92
sick_scansegment_xd::TimingStatistics::StddevMilliseconds
double StddevMilliseconds(void) const
Definition: time_util.h:104
s
XmlRpcServer s
sick_scansegment_xd::FormatNumber
static std::string FormatNumber(const T &number, int width=-1, bool setfill=false, bool setfixed=false, int precision=-1)
Definition: include/sick_scansegment_xd/common.h:151
sick_scansegment_xd::TimingStatistics::PrintHistMilliseconds
std::string PrintHistMilliseconds(const std::string &separator=",") const
Definition: time_util.h:125
sick_scansegment_xd::MsgPackExporter::AddExportListener
void AddExportListener(sick_scansegment_xd::MsgPackExportListenerIF *listener)
Definition: msgpack_exporter.cpp:92
sick_scansegment_xd::ScanSegmentParserOutput::segmentIndex
int segmentIndex
Definition: scansegment_parser_output.h:192
multiscan_pcap_player.verbose
int verbose
Definition: multiscan_pcap_player.py:142
sick_scansegment_xd::MsgPackExporter::Start
bool Start(void)
Definition: msgpack_exporter.cpp:127
ROS::now
ROS::Time now(void)
Definition: ros_wrapper.cpp:116
ROS_INFO_STREAM
#define ROS_INFO_STREAM(...)
Definition: sick_scan_ros2_example.cpp:71
sick_scansegment_xd::ScanSegmentParserOutput
Definition: scansegment_parser_output.h:118
sick_scansegment_xd::Fifo
Definition: fifo.h:75
sick_scansegment_xd::Fifo::Seconds
static double Seconds(const fifo_timestamp &timestamp_start, const fifo_timestamp &timestamp_end=fifo_clock::now())
Definition: fifo.h:166
sick_scansegment_xd::MsgPackExporter::RemoveExportListener
void RemoveExportListener(sick_scansegment_xd::MsgPackExportListenerIF *listener)
Definition: msgpack_exporter.cpp:101
fifo_timestamp
std::chrono::time_point< std::chrono::system_clock > fifo_timestamp
Definition: fifo.h:68
msgpack_exporter.h
sick_scansegment_xd::MsgPackExporter::GetExportListener
std::list< sick_scansegment_xd::MsgPackExportListenerIF * > GetExportListener()
Definition: msgpack_exporter.cpp:116
sick_scansegment_xd::MsgPackExporter::RunCb
bool RunCb(void)
Definition: msgpack_exporter.cpp:160
sick_scansegment_xd::TimingStatistics::AddTimeMilliseconds
void AddTimeMilliseconds(double t)
Definition: time_util.h:79
sick_scansegment_xd::MsgPackExporter::Stop
void Stop(void)
Definition: msgpack_exporter.cpp:137
time_util.h
sick_scansegment_xd::TimingStatistics
Definition: time_util.h:67
sick_scansegment_xd::MsgPackExportListenerIF
Definition: msgpack_exporter.h:73
sick_scansegment_xd::MsgPackExporter::Close
void Close(void)
Definition: msgpack_exporter.cpp:145
ROS_ERROR_STREAM
#define ROS_ERROR_STREAM(...)
Definition: sick_scan_ros2_example.cpp:72
sick_scansegment_xd::MsgPackExporter::MsgPackExporter
MsgPackExporter()
Definition: msgpack_exporter.cpp:62
sick_scansegment_xd::MsgPackParser::WriteCSV
static bool WriteCSV(const std::vector< ScanSegmentParserOutput > &results, const std::string &csvFile, bool overwrite_existing_file)
Definition: msgpack_parser.cpp:874


sick_scan_xd
Author(s): Michael Lehning , Jochen Sprickerhof , Martin Günther
autogenerated on Fri Oct 25 2024 02:47:09