iomgr/timer_manager.cc
Go to the documentation of this file.
1 /*
2  *
3  * Copyright 2017 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
20 
22 
23 #include <inttypes.h>
24 
25 #include <grpc/support/alloc.h>
26 #include <grpc/support/log.h>
27 
29 #include "src/core/lib/gprpp/thd.h"
31 
35 };
36 
38 
39 // global mutex
40 static gpr_mu g_mu;
41 // are we multi-threaded
42 static bool g_threaded;
43 // cv to wait until a thread is needed
45 // cv for notification when threading ends
47 // number of threads in the system
48 static int g_thread_count;
49 // number of threads sitting around waiting
50 static int g_waiter_count;
51 // linked list of threads that have completed (and need joining)
53 // was the manager kicked by the timer system
54 static bool g_kicked;
55 // is there a thread waiting until the next timer should fire?
56 static bool g_has_timed_waiter;
57 // the deadline of the current timed waiter thread (only relevant if
58 // g_has_timed_waiter is true)
60 // generation counter to track which thread is waiting for the next timer
62 // number of timer wakeups
64 
65 static void timer_thread(void* completed_thread_ptr);
66 
67 static void gc_completed_threads(void) {
68  if (g_completed_threads != nullptr) {
70  g_completed_threads = nullptr;
72  while (to_gc != nullptr) {
73  to_gc->thd.Join();
74  completed_thread* next = to_gc->next;
75  gpr_free(to_gc);
76  to_gc = next;
77  }
78  gpr_mu_lock(&g_mu);
79  }
80 }
81 
82 static void start_timer_thread_and_unlock(void) {
88  gpr_log(GPR_INFO, "Spawn timer thread");
89  }
90  completed_thread* ct =
91  static_cast<completed_thread*>(gpr_malloc(sizeof(*ct)));
92  ct->thd = grpc_core::Thread("grpc_global_timer", timer_thread, ct);
93  ct->thd.Start();
94 }
95 
98  grpc_timer_check(nullptr);
99 }
100 
101 static void run_some_timers() {
102  // In the case of timers, the ExecCtx for the thread is declared
103  // in the timer thread itself, but this is the point where we
104  // could start seeing application-level callbacks. No need to
105  // create a new ExecCtx, though, since there already is one and it is
106  // flushed (but not destructed) in this function itself
107  grpc_core::ApplicationCallbackExecCtx callback_exec_ctx(
109 
110  // if there's something to execute...
111  gpr_mu_lock(&g_mu);
112  // remove a waiter from the pool, and start another thread if necessary
113  --g_waiter_count;
114  if (g_waiter_count == 0 && g_threaded) {
115  // The number of timer threads is always increasing until all the threads
116  // are stopped. In rare cases, if a large number of timers fire
117  // simultaneously, we may end up using a large number of threads.
119  } else {
120  // if there's no thread waiting with a timeout, kick an existing untimed
121  // waiter so that the next deadline is not missed
122  if (!g_has_timed_waiter) {
124  gpr_log(GPR_INFO, "kick untimed waiter");
125  }
127  }
129  }
130  // without our lock, flush the exec_ctx
132  gpr_log(GPR_INFO, "flush exec_ctx");
133  }
135  gpr_mu_lock(&g_mu);
136  // garbage collect any threads that are dead
138  // get ready to wait again
139  ++g_waiter_count;
141 }
142 
143 // wait until 'next' (or forever if there is already a timed waiter in the pool)
144 // returns true if the thread should continue executing (false if it should
145 // shutdown)
147  gpr_mu_lock(&g_mu);
148  // if we're not threaded anymore, leave
149  if (!g_threaded) {
151  return false;
152  }
153 
154  // If g_kicked is true at this point, it means there was a kick from the timer
155  // system that the timer-manager threads here missed. We cannot trust 'next'
156  // here any longer (since there might be an earlier deadline). So if g_kicked
157  // is true at this point, we should quickly exit this and get the next
158  // deadline from the timer system
159 
160  if (!g_kicked) {
161  // if there's no timed waiter, we should become one: that waiter waits
162  // only until the next timer should expire. All other timers wait forever
163  //
164  // 'g_timed_waiter_generation' is a global generation counter. The idea here
165  // is that the thread becoming a timed-waiter increments and stores this
166  // global counter locally in 'my_timed_waiter_generation' before going to
167  // sleep. After waking up, if my_timed_waiter_generation ==
168  // g_timed_waiter_generation, it can be sure that it was the timed_waiter
169  // thread (and that no other thread took over while this was asleep)
170  //
171  // Initialize my_timed_waiter_generation to some value that is NOT equal to
172  // g_timed_waiter_generation
173  uint64_t my_timed_waiter_generation = g_timed_waiter_generation - 1;
174 
175  /* If there's no timed waiter, we should become one: that waiter waits only
176  until the next timer should expire. All other timer threads wait forever
177  unless their 'next' is earlier than the current timed-waiter's deadline
178  (in which case the thread with earlier 'next' takes over as the new timed
179  waiter) */
182  my_timed_waiter_generation = ++g_timed_waiter_generation;
183  g_has_timed_waiter = true;
185 
187  grpc_core::Duration wait_time =
189  gpr_log(GPR_INFO, "sleep for a %" PRId64 " milliseconds",
190  wait_time.millis());
191  }
192  } else { // g_timed_waiter == true && next >= g_timed_waiter_deadline
194  }
195  }
196 
199  gpr_log(GPR_INFO, "sleep until kicked");
200  }
201 
203 
205  gpr_log(GPR_INFO, "wait ended: was_timed:%d kicked:%d",
206  my_timed_waiter_generation == g_timed_waiter_generation,
207  g_kicked);
208  }
209  // if this was the timed waiter, then we need to check timers, and flag
210  // that there's now no timed waiter... we'll look for a replacement if
211  // there's work to do after checking timers (code above)
212  if (my_timed_waiter_generation == g_timed_waiter_generation) {
213  ++g_wakeups;
214  g_has_timed_waiter = false;
216  }
217  }
218 
219  // if this was a kick from the timer system, consume it (and don't stop
220  // this thread yet)
221  if (g_kicked) {
223  g_kicked = false;
224  }
225 
227  return true;
228 }
229 
230 static void timer_main_loop() {
231  for (;;) {
234 
235  // check timer state, updates next to the next time to run a check
236  switch (grpc_timer_check(&next)) {
237  case GRPC_TIMERS_FIRED:
238  run_some_timers();
239  break;
241  /* This case only happens under contention, meaning more than one timer
242  manager thread checked timers concurrently.
243 
244  If that happens, we're guaranteed that some other thread has just
245  checked timers, and this will avalanche into some other thread seeing
246  empty timers and doing a timed sleep.
247 
248  Consequently, we can just sleep forever here and be happy at some
249  saved wakeup cycles. */
251  gpr_log(GPR_INFO, "timers not checked: expect another thread to");
252  }
256  if (!wait_until(next)) {
257  return;
258  }
259  break;
260  }
261  }
262 }
263 
265  gpr_mu_lock(&g_mu);
266  // terminate the thread: drop the waiter count, thread count, and let whomever
267  // stopped the threading stuff know that we're done
268  --g_waiter_count;
269  --g_thread_count;
270  if (0 == g_thread_count) {
272  }
274  g_completed_threads = ct;
277  gpr_log(GPR_INFO, "End timer thread");
278  }
279 }
280 
281 static void timer_thread(void* completed_thread_ptr) {
282  // this threads exec_ctx: we try to run things through to completion here
283  // since it's easy to spin up new threads
285  timer_main_loop();
286 
287  timer_thread_cleanup(static_cast<completed_thread*>(completed_thread_ptr));
288 }
289 
290 static void start_threads(void) {
291  gpr_mu_lock(&g_mu);
292  if (!g_threaded) {
293  g_threaded = true;
295  } else {
297  }
298 }
299 
301  gpr_mu_init(&g_mu);
304  g_threaded = false;
305  g_thread_count = 0;
306  g_waiter_count = 0;
307  g_completed_threads = nullptr;
308 
309  g_has_timed_waiter = false;
311 
312  start_threads();
313 }
314 
315 static void stop_threads(void) {
316  gpr_mu_lock(&g_mu);
318  gpr_log(GPR_INFO, "stop timer threads: threaded=%d", g_threaded);
319  }
320  if (g_threaded) {
321  g_threaded = false;
324  gpr_log(GPR_INFO, "num timer threads: %d", g_thread_count);
325  }
326  while (g_thread_count > 0) {
329  gpr_log(GPR_INFO, "num timer threads: %d", g_thread_count);
330  }
332  }
333  }
334  g_wakeups = 0;
336 }
337 
339  stop_threads();
340 
344 }
345 
347  if (enabled) {
348  start_threads();
349  } else {
350  stop_threads();
351  }
352 }
353 
354 void grpc_kick_poller(void) {
355  gpr_mu_lock(&g_mu);
356  g_kicked = true;
357  g_has_timed_waiter = false;
362 }
363 
gpr_cv_signal
GPRAPI void gpr_cv_signal(gpr_cv *cv)
trace.h
GPR_INFO
#define GPR_INFO
Definition: include/grpc/impl/codegen/log.h:56
gpr_mu_unlock
GPRAPI void gpr_mu_unlock(gpr_mu *mu)
log.h
wait_until
static bool wait_until(grpc_core::Timestamp next)
Definition: iomgr/timer_manager.cc:146
timer_main_loop
static void timer_main_loop()
Definition: iomgr/timer_manager.cc:230
g_wakeups
static uint64_t g_wakeups
Definition: iomgr/timer_manager.cc:63
g_cv_shutdown
static gpr_cv g_cv_shutdown
Definition: iomgr/timer_manager.cc:46
grpc_timer_manager_set_threading
void grpc_timer_manager_set_threading(bool enabled)
Definition: iomgr/timer_manager.cc:346
timer_manager.h
GRPC_TIMERS_NOT_CHECKED
@ GRPC_TIMERS_NOT_CHECKED
Definition: iomgr/timer.h:56
completed_thread::next
completed_thread * next
Definition: iomgr/timer_manager.cc:34
gpr_cv
pthread_cond_t gpr_cv
Definition: impl/codegen/sync_posix.h:48
grpc_core::Timestamp
Definition: src/core/lib/gprpp/time.h:62
gpr_free
GPRAPI void gpr_free(void *ptr)
Definition: alloc.cc:51
grpc_timer_manager_init
void grpc_timer_manager_init(void)
Definition: iomgr/timer_manager.cc:300
gpr_malloc
GPRAPI void * gpr_malloc(size_t size)
Definition: alloc.cc:29
gpr_inf_future
GPRAPI gpr_timespec gpr_inf_future(gpr_clock_type type)
Definition: src/core/lib/gpr/time.cc:55
grpc_core::ApplicationCallbackExecCtx
Definition: exec_ctx.h:283
grpc_timer_check
grpc_timer_check_result grpc_timer_check(grpc_core::Timestamp *next)
Definition: iomgr/timer.cc:38
grpc_timer_manager_tick
void grpc_timer_manager_tick()
Definition: iomgr/timer_manager.cc:96
gc_completed_threads
static void gc_completed_threads(void)
Definition: iomgr/timer_manager.cc:67
GRPC_TRACE_FLAG_ENABLED
#define GRPC_TRACE_FLAG_ENABLED(f)
Definition: debug/trace.h:114
GRPC_TIMERS_FIRED
@ GRPC_TIMERS_FIRED
Definition: iomgr/timer.h:58
start_timer_thread_and_unlock
static void start_timer_thread_and_unlock(void)
Definition: iomgr/timer_manager.cc:82
gpr_mu_destroy
GPRAPI void gpr_mu_destroy(gpr_mu *mu)
stop_threads
static void stop_threads(void)
Definition: iomgr/timer_manager.cc:315
start_threads
static void start_threads(void)
Definition: iomgr/timer_manager.cc:290
GPR_ASSERT
#define GPR_ASSERT(x)
Definition: include/grpc/impl/codegen/log.h:94
g_threaded
static bool g_threaded
Definition: iomgr/timer_manager.cc:42
gpr_cv_destroy
GPRAPI void gpr_cv_destroy(gpr_cv *cv)
grpc_core::ExecCtx::Flush
bool Flush()
Definition: exec_ctx.cc:69
g_kicked
static bool g_kicked
Definition: iomgr/timer_manager.cc:54
gpr_log
GPRAPI void gpr_log(const char *file, int line, gpr_log_severity severity, const char *format,...) GPR_PRINT_FORMAT_CHECK(4
gpr_mu_init
GPRAPI void gpr_mu_init(gpr_mu *mu)
uint64_t
unsigned __int64 uint64_t
Definition: stdint-msvc2008.h:90
run_some_timers
static void run_some_timers()
Definition: iomgr/timer_manager.cc:101
g_timed_waiter_deadline
static grpc_core::Timestamp g_timed_waiter_deadline
Definition: iomgr/timer_manager.cc:59
gpr_cv_wait
GPRAPI int gpr_cv_wait(gpr_cv *cv, gpr_mu *mu, gpr_timespec abs_deadline)
GPR_CLOCK_MONOTONIC
@ GPR_CLOCK_MONOTONIC
Definition: gpr_types.h:36
grpc_core::Thread::Join
void Join()
Definition: thd.h:141
gpr_mu_lock
GPRAPI void gpr_mu_lock(gpr_mu *mu)
GRPC_TIMERS_CHECKED_AND_EMPTY
@ GRPC_TIMERS_CHECKED_AND_EMPTY
Definition: iomgr/timer.h:57
grpc_timer_manager_shutdown
void grpc_timer_manager_shutdown(void)
Definition: iomgr/timer_manager.cc:338
grpc_core::ExecCtx
Definition: exec_ctx.h:97
grpc_core::Thread::Start
void Start()
Definition: thd.h:125
grpc_core::TraceFlag
Definition: debug/trace.h:63
completed_thread
Definition: iomgr/timer_manager.cc:32
g_timed_waiter_generation
static uint64_t g_timed_waiter_generation
Definition: iomgr/timer_manager.cc:61
GRPC_APP_CALLBACK_EXEC_CTX_FLAG_IS_INTERNAL_THREAD
#define GRPC_APP_CALLBACK_EXEC_CTX_FLAG_IS_INTERNAL_THREAD
Definition: exec_ctx.h:55
grpc_core::Duration::millis
constexpr int64_t millis() const
Definition: src/core/lib/gprpp/time.h:208
g_thread_count
static int g_thread_count
Definition: iomgr/timer_manager.cc:48
gpr_mu
pthread_mutex_t gpr_mu
Definition: impl/codegen/sync_posix.h:47
exec_ctx
grpc_core::ExecCtx exec_ctx
Definition: end2end_binder_transport_test.cc:75
alloc.h
next
AllocList * next[kMaxLevel]
Definition: abseil-cpp/absl/base/internal/low_level_alloc.cc:100
thd.h
grpc_timer_consume_kick
void grpc_timer_consume_kick()
Definition: iomgr/timer.cc:46
g_mu
static gpr_mu g_mu
Definition: iomgr/timer_manager.cc:40
grpc_kick_poller
void grpc_kick_poller(void)
Definition: iomgr/timer_manager.cc:354
grpc_core::Thread
Definition: thd.h:43
timer.h
gpr_cv_broadcast
GPRAPI void gpr_cv_broadcast(gpr_cv *cv)
g_cv_wait
static gpr_cv g_cv_wait
Definition: iomgr/timer_manager.cc:44
timer_thread
static void timer_thread(void *completed_thread_ptr)
Definition: iomgr/timer_manager.cc:281
g_completed_threads
static completed_thread * g_completed_threads
Definition: iomgr/timer_manager.cc:52
grpc_core::Timestamp::InfFuture
static constexpr Timestamp InfFuture()
Definition: src/core/lib/gprpp/time.h:79
grpc_core::ExecCtx::Now
Timestamp Now()
Definition: exec_ctx.cc:90
g_waiter_count
static int g_waiter_count
Definition: iomgr/timer_manager.cc:50
grpc_core::Duration
Definition: src/core/lib/gprpp/time.h:122
GRPC_EXEC_CTX_FLAG_IS_INTERNAL_THREAD
#define GRPC_EXEC_CTX_FLAG_IS_INTERNAL_THREAD
Definition: exec_ctx.h:51
grpc_timer_check_trace
grpc_core::TraceFlag grpc_timer_check_trace
ABSL_FALLTHROUGH_INTENDED
#define ABSL_FALLTHROUGH_INTENDED
Definition: abseil-cpp/absl/base/attributes.h:641
g_has_timed_waiter
static bool g_has_timed_waiter
Definition: iomgr/timer_manager.cc:56
completed_thread::thd
grpc_core::Thread thd
Definition: iomgr/timer_manager.cc:33
grpc_core::ExecCtx::Get
static ExecCtx * Get()
Definition: exec_ctx.h:205
gpr_cv_init
GPRAPI void gpr_cv_init(gpr_cv *cv)
timer_thread_cleanup
static void timer_thread_cleanup(completed_thread *ct)
Definition: iomgr/timer_manager.cc:264
grpc_core::ExecCtx::InvalidateNow
void InvalidateNow()
Definition: exec_ctx.h:188
grpc_timer_manager_get_wakeups_testonly
uint64_t grpc_timer_manager_get_wakeups_testonly(void)
Definition: iomgr/timer_manager.cc:364
port_platform.h


grpc
Author(s):
autogenerated on Fri May 16 2025 03:00:38