async_log_helper.h
Go to the documentation of this file.
1 //
2 // Copyright(c) 2015 Gabi Melman.
3 // Distributed under the MIT License (http://opensource.org/licenses/MIT)
4 //
5 
6 // async log helper :
7 // Process logs asynchronously using a back thread.
8 //
9 // If the internal queue of log messages reaches its max size,
10 // then the client call will block until there is more room.
11 //
12 
13 #pragma once
14 
15 #include "opc/spdlog/common.h"
16 #include "opc/spdlog/sinks/sink.h"
19 #include "opc/spdlog/details/os.h"
20 #include "opc/spdlog/formatter.h"
21 
22 #include <chrono>
23 #include <exception>
24 #include <functional>
25 #include <memory>
26 #include <string>
27 #include <thread>
28 #include <utility>
29 #include <vector>
30 
31 namespace spdlog
32 {
33 namespace details
34 {
35 
37 {
38  // Async msg to move to/from the queue
39  // Movable only. should never be copied
40  enum class async_msg_type
41  {
42  log,
43  flush,
44  terminate
45  };
46  struct async_msg
47  {
50  log_clock::time_point time;
51  size_t thread_id;
54  size_t msg_id;
55 
56  async_msg() = default;
57  ~async_msg() = default;
58 
59 
61  logger_name(std::move(other.logger_name)),
62  level(std::move(other.level)),
63  time(std::move(other.time)),
64  thread_id(other.thread_id),
65  txt(std::move(other.txt)),
66  msg_type(std::move(other.msg_type)),
67  msg_id(other.msg_id)
68  {}
69 
71  level(level::info),
72  thread_id(0),
73  msg_type(m_type),
74  msg_id(0)
75  {}
76 
78  {
79  logger_name = std::move(other.logger_name);
80  level = other.level;
81  time = std::move(other.time);
82  thread_id = other.thread_id;
83  txt = std::move(other.txt);
84  msg_type = other.msg_type;
85  msg_id = other.msg_id;
86  return *this;
87  }
88 
89  // never copy or assign. should only be moved..
90  async_msg(const async_msg&) = delete;
91  async_msg& operator=(const async_msg& other) = delete;
92 
93  // construct from log_msg
95  level(m.level),
96  time(m.time),
97  thread_id(m.thread_id),
98  txt(m.raw.data(), m.raw.size()),
99  msg_type(async_msg_type::log),
100  msg_id(m.msg_id)
101  {
102 #ifndef SPDLOG_NO_NAME
103  logger_name = *m.logger_name;
104 #endif
105  }
106 
107 
108  // copy into log_msg
110  {
111  msg.logger_name = &logger_name;
112  msg.level = level;
113  msg.time = time;
114  msg.thread_id = thread_id;
115  msg.raw << txt;
116  msg.msg_id = msg_id;
117  }
118  };
119 
120 public:
121 
124 
125  using clock = std::chrono::steady_clock;
126 
127 
129  const std::vector<sink_ptr>& sinks,
130  size_t queue_size,
131  const log_err_handler err_handler,
133  const std::function<void()>& worker_warmup_cb = nullptr,
134  const std::chrono::milliseconds& flush_interval_ms = std::chrono::milliseconds::zero(),
135  const std::function<void()>& worker_teardown_cb = nullptr);
136 
137  void log(const details::log_msg& msg);
138 
139  // stop logging and join the back thread
141 
143 
144  void flush(bool wait_for_q);
145 
146  void set_error_handler(spdlog::log_err_handler err_handler);
147 
148 private:
150  std::vector<std::shared_ptr<sinks::sink>> _sinks;
151 
152  // queue of messages to log
154 
156 
158 
160 
161 
162  // overflow policy
164 
165  // worker thread warmup callback - one can set thread priority, affinity, etc
166  const std::function<void()> _worker_warmup_cb;
167 
168  // auto periodic sink flush parameter
169  const std::chrono::milliseconds _flush_interval_ms;
170 
171  // worker thread teardown callback
172  const std::function<void()> _worker_teardown_cb;
173 
174  // worker thread
175  std::thread _worker_thread;
176 
177  void push_msg(async_msg&& new_msg);
178 
179  // worker thread main loop
180  void worker_loop();
181 
182  // pop next message from the queue and process it. will set the last_pop to the pop time
183  // return false if termination of the queue is required
184  bool process_next_msg(log_clock::time_point& last_pop, log_clock::time_point& last_flush);
185 
186  void handle_flush_interval(log_clock::time_point& now, log_clock::time_point& last_flush);
187 
188  // sleep,yield or return immediately using the time passed since last message as a hint
189  static void sleep_or_yield(const spdlog::log_clock::time_point& now, const log_clock::time_point& last_op_time);
190 
191  // wait until the queue is empty
192  void wait_empty_q();
193 
194 };
195 }
196 }
197 
199 // async_sink class implementation
203  const std::vector<sink_ptr>& sinks,
204  size_t queue_size,
205  log_err_handler err_handler,
206  const async_overflow_policy overflow_policy,
207  const std::function<void()>& worker_warmup_cb,
208  const std::chrono::milliseconds& flush_interval_ms,
209  const std::function<void()>& worker_teardown_cb):
210  _formatter(formatter),
211  _sinks(sinks),
212  _q(queue_size),
213  _err_handler(err_handler),
214  _flush_requested(false),
215  _terminate_requested(false),
216  _overflow_policy(overflow_policy),
217  _worker_warmup_cb(worker_warmup_cb),
218  _flush_interval_ms(flush_interval_ms),
219  _worker_teardown_cb(worker_teardown_cb),
221 {}
222 
223 // Send to the worker thread termination message(level=off)
224 // and wait for it to finish gracefully
226 {
227  try
228  {
230  _worker_thread.join();
231  }
232  catch (...) // don't crash in destructor
233  {
234  }
235 }
236 
237 
238 //Try to push and block until succeeded (if the policy is not to discard when the queue is full)
240 {
241  push_msg(async_msg(msg));
242 }
243 
245 {
247  {
248  auto last_op_time = details::os::now();
249  auto now = last_op_time;
250  do
251  {
252  now = details::os::now();
253  sleep_or_yield(now, last_op_time);
254  }
255  while (!_q.enqueue(std::move(new_msg)));
256  }
257 }
258 
259 // optionally wait for the queue be empty and request flush from the sinks
260 inline void spdlog::details::async_log_helper::flush(bool wait_for_q)
261 {
263  if (wait_for_q)
264  wait_empty_q(); //return only make after the above flush message was processed
265 }
266 
268 {
270  auto last_pop = details::os::now();
271  auto last_flush = last_pop;
272  auto active = true;
273  while (active)
274  {
275  try
276  {
277  active = process_next_msg(last_pop, last_flush);
278  }
279  catch (const std::exception &ex)
280  {
281  _err_handler(ex.what());
282  }
283  catch (...)
284  {
285  _err_handler("Unknown exception");
286  }
287  }
289 
290 
291 }
292 
293 // process next message in the queue
294 // return true if this thread should still be active (while no terminate msg was received)
295 inline bool spdlog::details::async_log_helper::process_next_msg(log_clock::time_point& last_pop, log_clock::time_point& last_flush)
296 {
297  async_msg incoming_async_msg;
298 
299  if (_q.dequeue(incoming_async_msg))
300  {
301  last_pop = details::os::now();
302  switch (incoming_async_msg.msg_type)
303  {
305  _flush_requested = true;
306  break;
307 
309  _flush_requested = true;
310  _terminate_requested = true;
311  break;
312 
313  default:
314  log_msg incoming_log_msg;
315  incoming_async_msg.fill_log_msg(incoming_log_msg);
316  _formatter->format(incoming_log_msg);
317  for (auto &s : _sinks)
318  {
319  if (s->should_log(incoming_log_msg.level))
320  {
321  s->log(incoming_log_msg);
322  }
323  }
324  }
325  return true;
326  }
327 
328  // Handle empty queue..
329  // This is the only place where the queue can terminate or flush to avoid losing messages already in the queue
330  else
331  {
332  auto now = details::os::now();
333  handle_flush_interval(now, last_flush);
334  sleep_or_yield(now, last_pop);
335  return !_terminate_requested;
336  }
337 }
338 
339 // flush all sinks if _flush_interval_ms has expired
340 inline void spdlog::details::async_log_helper::handle_flush_interval(log_clock::time_point& now, log_clock::time_point& last_flush)
341 {
342  auto should_flush = _flush_requested || (_flush_interval_ms != std::chrono::milliseconds::zero() && now - last_flush >= _flush_interval_ms);
343  if (should_flush)
344  {
345  for (auto &s : _sinks)
346  s->flush();
347  now = last_flush = details::os::now();
348  _flush_requested = false;
349  }
350 }
351 
353 {
354  _formatter = msg_formatter;
355 }
356 
357 
358 // spin, yield or sleep. use the time passed since last message as a hint
359 inline void spdlog::details::async_log_helper::sleep_or_yield(const spdlog::log_clock::time_point& now, const spdlog::log_clock::time_point& last_op_time)
360 {
361  using namespace std::this_thread;
362  using std::chrono::milliseconds;
363  using std::chrono::microseconds;
364 
365  auto time_since_op = now - last_op_time;
366 
367  // spin upto 50 micros
368  if (time_since_op <= microseconds(50))
369  return;
370 
371  // yield upto 150 micros
372  if (time_since_op <= microseconds(100))
373  return std::this_thread::yield();
374 
375  // sleep for 20 ms upto 200 ms
376  if (time_since_op <= milliseconds(200))
377  return sleep_for(milliseconds(20));
378 
379  // sleep for 200 ms
380  return sleep_for(milliseconds(200));
381 }
382 
383 // wait for the queue to be empty
385 {
386  auto last_op = details::os::now();
387  while (_q.approx_size() > 0)
388  {
389  sleep_or_yield(details::os::now(), last_op);
390  }
391 }
392 
394 {
395  _err_handler = err_handler;
396 }
397 
398 
399 
static void sleep_or_yield(const spdlog::log_clock::time_point &now, const log_clock::time_point &last_op_time)
time
Definition: server.py:52
const async_overflow_policy _overflow_policy
void log(const details::log_msg &msg)
const std::string * logger_name
Definition: log_msg.h:41
bool process_next_msg(log_clock::time_point &last_pop, log_clock::time_point &last_flush)
XmlRpcServer s
void handle_flush_interval(log_clock::time_point &now, log_clock::time_point &last_flush)
async_msg & operator=(async_msg &&other) SPDLOG_NOEXCEPT
#define SPDLOG_NOEXCEPT
std::shared_ptr< spdlog::formatter > formatter_ptr
async_msg(async_msg &&other) SPDLOG_NOEXCEPT
std::vector< std::shared_ptr< sinks::sink > > _sinks
void set_error_handler(spdlog::log_err_handler err_handler)
spdlog::log_clock::time_point now()
Definition: os.h:64
const std::function< void()> _worker_teardown_cb
size_t thread_id()
Definition: os.h:343
std::chrono::steady_clock clock
fmt::BufferedFile & move(fmt::BufferedFile &f)
Definition: posix.h:432
level::level_enum level
Definition: log_msg.h:42
fmt::MemoryWriter raw
Definition: log_msg.h:45
const std::chrono::milliseconds _flush_interval_ms
void push_msg(async_msg &&new_msg)
std::function< void(const std::string &err_msg)> log_err_handler
log_clock::time_point time
Definition: log_msg.h:43
async_log_helper(formatter_ptr formatter, const std::vector< sink_ptr > &sinks, size_t queue_size, const log_err_handler err_handler, const async_overflow_policy overflow_policy=async_overflow_policy::block_retry, const std::function< void()> &worker_warmup_cb=nullptr, const std::chrono::milliseconds &flush_interval_ms=std::chrono::milliseconds::zero(), const std::function< void()> &worker_teardown_cb=nullptr)
const std::function< void()> _worker_warmup_cb


ros_opcua_impl_freeopcua
Author(s): Denis Štogl
autogenerated on Tue Jan 19 2021 03:06:03