event_engine/iomgr_engine/timer_manager.cc
Go to the documentation of this file.
1 /*
2  *
3  * Copyright 2017 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
20 
22 
23 #include <algorithm>
24 #include <memory>
25 #include <utility>
26 
27 #include "absl/memory/memory.h"
28 #include "absl/time/time.h"
29 #include "absl/types/optional.h"
30 
32 #include <grpc/support/log.h>
33 #include <grpc/support/time.h>
34 
35 #include "src/core/lib/gprpp/thd.h"
36 
37 namespace grpc_event_engine {
38 namespace iomgr_engine {
39 
40 namespace {
41 class ThreadCollector {
42  public:
43  ThreadCollector() = default;
44  ~ThreadCollector();
45 
46  void Collect(std::vector<grpc_core::Thread> threads) {
47  GPR_ASSERT(threads_.empty());
49  }
50 
51  private:
52  std::vector<grpc_core::Thread> threads_;
53 };
54 
55 ThreadCollector::~ThreadCollector() {
56  for (auto& t : threads_) t.Join();
57 }
58 } // namespace
59 
61  ++waiter_count_;
62  ++thread_count_;
63  auto* thread = new RunThreadArgs();
64  thread->self = this;
65  thread->thread = grpc_core::Thread(
66  "timer_manager", &TimerManager::RunThread, thread, nullptr,
67  grpc_core::Thread::Options().set_tracked(false));
68  thread->thread.Start();
69 }
70 
72  std::vector<experimental::EventEngine::Closure*> timers) {
73  // if there's something to execute...
74  ThreadCollector collector;
75  {
77  if (shutdown_ || forking_) return;
78  // remove a waiter from the pool, and start another thread if necessary
79  --waiter_count_;
80  if (waiter_count_ == 0) {
81  // The number of timer threads is always increasing until all the threads
82  // are stopped, with the exception that all threads are shut down on fork
83  // events. In rare cases, if a large number of timers fire simultaneously,
84  // we may end up using a large number of threads.
85  // TODO(ctiller): We could avoid this by exiting threads in WaitUntil().
86  StartThread();
87  } else {
88  // if there's no thread waiting with a timeout, kick an existing untimed
89  // waiter so that the next deadline is not missed
90  if (!has_timed_waiter_) {
91  cv_.Signal();
92  }
93  }
94  }
95  for (auto* timer : timers) {
96  timer->Run();
97  }
98  {
100  collector.Collect(std::move(completed_threads_));
101  // get ready to wait again
102  ++waiter_count_;
103  }
104 }
105 
106 // wait until 'next' (or forever if there is already a timed waiter in the pool)
107 // returns true if the thread should continue executing (false if it should
108 // shutdown)
110  grpc_core::MutexLock lock(&mu_);
111 
112  if (shutdown_) return false;
113  if (forking_) return false;
114 
115  // TODO(ctiller): if there are too many waiting threads, this would be a good
116  // place to exit the current thread.
117 
118  // If kicked_ is true at this point, it means there was a kick from the timer
119  // system that the timer-manager threads here missed. We cannot trust 'next'
120  // here any longer (since there might be an earlier deadline). So if kicked_
121  // is true at this point, we should quickly exit this and get the next
122  // deadline from the timer system
123 
124  if (!kicked_) {
125  // if there's no timed waiter, we should become one: that waiter waits
126  // only until the next timer should expire. All other timers wait forever
127  //
128  // 'timed_waiter_generation_' is a global generation counter. The idea here
129  // is that the thread becoming a timed-waiter increments and stores this
130  // global counter locally in 'my_timed_waiter_generation' before going to
131  // sleep. After waking up, if my_timed_waiter_generation ==
132  // timed_waiter_generation_, it can be sure that it was the timed_waiter
133  // thread (and that no other thread took over while this was asleep)
134  //
135  // Initialize my_timed_waiter_generation to some value that is NOT equal to
136  // timed_waiter_generation_
137  uint64_t my_timed_waiter_generation = timed_waiter_generation_ - 1;
138 
139  /* If there's no timed waiter, we should become one: that waiter waits only
140  until the next timer should expire. All other timer threads wait forever
141  unless their 'next' is earlier than the current timed-waiter's deadline
142  (in which case the thread with earlier 'next' takes over as the new timed
143  waiter) */
145  if (!has_timed_waiter_ || (next < timed_waiter_deadline_)) {
146  my_timed_waiter_generation = ++timed_waiter_generation_;
147  has_timed_waiter_ = true;
148  timed_waiter_deadline_ = next;
149  } else { // timed_waiter_ == true && next >= timed_waiter_deadline_
151  }
152  }
153 
155  absl::Milliseconds((next - host_.Now()).millis()));
156 
157  // if this was the timed waiter, then we need to check timers, and flag
158  // that there's now no timed waiter... we'll look for a replacement if
159  // there's work to do after checking timers (code above)
160  if (my_timed_waiter_generation == timed_waiter_generation_) {
161  ++wakeups_;
162  has_timed_waiter_ = false;
163  timed_waiter_deadline_ = grpc_core::Timestamp::InfFuture();
164  }
165  }
166 
167  kicked_ = false;
168 
169  return true;
170 }
171 
173  for (;;) {
176  check_result = timer_list_->TimerCheck(&next);
177  if (check_result.has_value()) {
178  if (!check_result->empty()) {
179  RunSomeTimers(std::move(*check_result));
180  continue;
181  }
182  } else {
183  /* This case only happens under contention, meaning more than one timer
184  manager thread checked timers concurrently.
185 
186  If that happens, we're guaranteed that some other thread has just
187  checked timers, and this will avalanche into some other thread seeing
188  empty timers and doing a timed sleep.
189 
190  Consequently, we can just sleep forever here and be happy at some
191  saved wakeup cycles. */
193  }
194  if (!WaitUntil(next)) return;
195  }
196 }
197 
199  std::unique_ptr<RunThreadArgs> thread(static_cast<RunThreadArgs*>(arg));
200  thread->self->MainLoop();
201  {
202  grpc_core::MutexLock lock(&thread->self->mu_);
203  thread->self->thread_count_--;
204  thread->self->completed_threads_.push_back(std::move(thread->thread));
205  }
206  thread->self->cv_.Signal();
207 }
208 
209 TimerManager::TimerManager() : host_(this) {
210  timer_list_ = absl::make_unique<TimerList>(&host_);
211  grpc_core::MutexLock lock(&mu_);
212  StartThread();
213 }
214 
218 }
219 
222  timer_list_->TimerInit(timer, deadline, closure);
223 }
224 
226  return timer_list_->TimerCancel(timer);
227 }
228 
230  {
231  grpc_core::MutexLock lock(&mu_);
232  shutdown_ = true;
233  cv_.SignalAll();
234  }
235  while (true) {
236  ThreadCollector collector;
237  grpc_core::MutexLock lock(&mu_);
238  collector.Collect(std::move(completed_threads_));
239  if (thread_count_ == 0) break;
240  cv_.Wait(&mu_);
241  }
242 }
243 
244 void TimerManager::Host::Kick() { timer_manager_->Kick(); }
245 
247  grpc_core::MutexLock lock(&mu_);
248  has_timed_waiter_ = false;
249  timed_waiter_deadline_ = grpc_core::Timestamp::InfFuture();
250  ++timed_waiter_generation_;
251  kicked_ = true;
252  cv_.Signal();
253 }
254 
256  {
257  grpc_core::MutexLock lock(&mu_);
258  forking_ = true;
259  prefork_thread_count_ = thread_count_;
260  cv_.SignalAll();
261  }
262  while (true) {
263  grpc_core::MutexLock lock(&mu_);
264  ThreadCollector collector;
265  collector.Collect(std::move(completed_threads_));
266  if (thread_count_ == 0) break;
267  cv_.Wait(&mu_);
268  }
269 }
270 
272  grpc_core::MutexLock lock(&mu_);
273  for (int i = 0; i < prefork_thread_count_; i++) {
274  StartThread();
275  }
277  forking_ = false;
278 }
279 
281  grpc_core::MutexLock lock(&mu_);
282  for (int i = 0; i < prefork_thread_count_; i++) {
283  StartThread();
284  }
286  forking_ = false;
287 }
288 
289 } // namespace iomgr_engine
290 } // namespace grpc_event_engine
grpc_event_engine::iomgr_engine::TimerManager::Host::Kick
void Kick() override
Definition: event_engine/iomgr_engine/timer_manager.cc:244
grpc_event_engine::iomgr_engine::TimerManager::Kick
void Kick()
Definition: event_engine/iomgr_engine/timer_manager.cc:246
grpc_core::CondVar::WaitWithTimeout
bool WaitWithTimeout(Mutex *mu, absl::Duration timeout)
Definition: src/core/lib/gprpp/sync.h:138
grpc_event_engine::iomgr_engine::TimerManager::StartThread
void StartThread() ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_)
Definition: event_engine/iomgr_engine/timer_manager.cc:60
threads_
std::vector< grpc_core::Thread > threads_
Definition: event_engine/iomgr_engine/timer_manager.cc:52
grpc_event_engine::iomgr_engine::TimerManager::MainLoop
void MainLoop()
Definition: event_engine/iomgr_engine/timer_manager.cc:172
grpc_event_engine::iomgr_engine::TimerManager::prefork_thread_count_
int prefork_thread_count_
Definition: event_engine/iomgr_engine/timer_manager.h:114
log.h
grpc_event_engine::iomgr_engine::TimerManager::~TimerManager
~TimerManager() override
Definition: event_engine/iomgr_engine/timer_manager.cc:229
grpc_event_engine::iomgr_engine::TimerManager::RunSomeTimers
void RunSomeTimers(std::vector< experimental::EventEngine::Closure * > timers)
Definition: event_engine/iomgr_engine/timer_manager.cc:71
grpc_core::MutexLock
Definition: src/core/lib/gprpp/sync.h:88
grpc_core::Timestamp
Definition: src/core/lib/gprpp/time.h:62
grpc_event_engine::iomgr_engine::TimerManager::TimerInit
void TimerInit(Timer *timer, grpc_core::Timestamp deadline, experimental::EventEngine::Closure *closure)
Definition: event_engine/iomgr_engine/timer_manager.cc:220
grpc_event_engine::iomgr_engine::TimerManager::WaitUntil
bool WaitUntil(grpc_core::Timestamp next)
Definition: event_engine/iomgr_engine/timer_manager.cc:109
grpc_event_engine::iomgr_engine::TimerManager::RunThreadArgs
Definition: event_engine/iomgr_engine/timer_manager.h:64
time.h
grpc_event_engine::iomgr_engine::TimerManager::cv_
grpc_core::CondVar cv_
Definition: event_engine/iomgr_engine/timer_manager.h:89
wakeups_
std::vector< uint32_t > wakeups_
Definition: filter_fuzzer.cc:572
threads
static uv_thread_t * threads
Definition: threadpool.c:38
absl::Milliseconds
constexpr Duration Milliseconds(T n)
Definition: third_party/abseil-cpp/absl/time/time.h:415
grpc_core::CondVar::SignalAll
void SignalAll()
Definition: src/core/lib/gprpp/sync.h:135
grpc_event_engine::iomgr_engine::TimerManager::PostforkParent
void PostforkParent() override
Definition: event_engine/iomgr_engine/timer_manager.cc:271
absl::move
constexpr absl::remove_reference_t< T > && move(T &&t) noexcept
Definition: abseil-cpp/absl/utility/utility.h:221
GPR_ASSERT
#define GPR_ASSERT(x)
Definition: include/grpc/impl/codegen/log.h:94
absl::optional::has_value
constexpr bool has_value() const noexcept
Definition: abseil-cpp/absl/types/optional.h:461
grpc_event_engine::iomgr_engine::TimerManager::TimerManager
TimerManager()
Definition: event_engine/iomgr_engine/timer_manager.cc:209
grpc_event_engine::iomgr_engine::TimerManager::mu_
grpc_core::Mutex mu_
Definition: event_engine/iomgr_engine/timer_manager.h:88
grpc_core::CondVar::Signal
void Signal()
Definition: src/core/lib/gprpp/sync.h:134
uint64_t
unsigned __int64 uint64_t
Definition: stdint-msvc2008.h:90
grpc_event_engine::iomgr_engine::TimerManager::PrepareFork
void PrepareFork() override
Definition: event_engine/iomgr_engine/timer_manager.cc:255
absl::optional
Definition: abseil-cpp/absl/types/internal/optional.h:61
arg
Definition: cmdline.cc:40
GPR_CLOCK_MONOTONIC
@ GPR_CLOCK_MONOTONIC
Definition: gpr_types.h:36
grpc_core::Thread::Options
Definition: thd.h:45
grpc_event_engine::iomgr_engine::TimerManager::Host::Now
grpc_core::Timestamp Now() override
Definition: event_engine/iomgr_engine/timer_manager.cc:215
gpr_now
GPRAPI gpr_timespec gpr_now(gpr_clock_type clock)
grpc_event_engine::iomgr_engine::TimerManager::RunThread
static void RunThread(void *arg)
Definition: event_engine/iomgr_engine/timer_manager.cc:198
grpc_event_engine::iomgr_engine::Timer
Definition: event_engine/iomgr_engine/timer.h:44
gpr_types.h
shutdown_
bool shutdown_
Definition: pick_first.cc:173
grpc_event_engine::experimental::EventEngine::Closure
Definition: event_engine.h:87
grpc_event_engine::iomgr_engine::TimerManager::host_
Host host_
Definition: event_engine/iomgr_engine/timer_manager.h:90
absl::str_format_internal::LengthMod::t
@ t
next
AllocList * next[kMaxLevel]
Definition: abseil-cpp/absl/base/internal/low_level_alloc.cc:100
grpc_core::Timestamp::FromTimespecRoundDown
static Timestamp FromTimespecRoundDown(gpr_timespec t)
Definition: src/core/lib/gprpp/time.cc:141
grpc_core::CondVar::Wait
void Wait(Mutex *mu)
Definition: src/core/lib/gprpp/sync.h:137
grpc_event_engine
Definition: endpoint_config.h:24
thd.h
timer_manager.h
closure
Definition: proxy.cc:59
grpc_core::Thread
Definition: thd.h:43
grpc_event_engine::iomgr_engine::TimerManager::PostforkChild
void PostforkChild() override
Definition: event_engine/iomgr_engine/timer_manager.cc:280
grpc_core::Timestamp::InfFuture
static constexpr Timestamp InfFuture()
Definition: src/core/lib/gprpp/time.h:79
grpc_event_engine::iomgr_engine::TimerManager::TimerCancel
bool TimerCancel(Timer *timer)
Definition: event_engine/iomgr_engine/timer_manager.cc:225
thread
static uv_thread_t thread
Definition: test-async-null-cb.c:29
i
uint64_t i
Definition: abseil-cpp/absl/container/btree_benchmark.cc:230
timer
static uv_timer_t timer
Definition: test-callback-stack.c:34
port_platform.h
grpc_event_engine::iomgr_engine::TimerManager::timer_list_
std::unique_ptr< TimerList > timer_list_
Definition: event_engine/iomgr_engine/timer_manager.h:113


grpc
Author(s):
autogenerated on Fri May 16 2025 03:00:38