event_engine/iomgr_engine/timer.cc
Go to the documentation of this file.
1 /*
2  *
3  * Copyright 2015 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
20 
22 
23 #include <algorithm>
24 #include <atomic>
25 #include <limits>
26 #include <type_traits>
27 #include <utility>
28 
29 #include <grpc/support/cpu.h>
30 
34 
35 namespace grpc_event_engine {
36 namespace iomgr_engine {
37 
39 static const double kAddDeadlineScale = 0.33;
40 static const double kMinQueueWindowDuration = 0.01;
41 static const double kMaxQueueWindowDuration = 1.0;
42 
44  return heap.is_empty()
45  ? queue_deadline_cap + grpc_core::Duration::Epsilon()
47  heap.Top()->deadline);
48 }
49 
51 
53  : host_(host),
55  min_timer_(host_->Now().milliseconds_after_process_epoch()),
57  shard_queue_(new Shard*[num_shards_]) {
58  for (size_t i = 0; i < num_shards_; i++) {
59  Shard& shard = shards_[i];
60  shard.queue_deadline_cap =
62  min_timer_.load(std::memory_order_relaxed));
63  shard.shard_queue_index = i;
64  shard.list.next = shard.list.prev = &shard.list;
65  shard.min_deadline = shard.ComputeMinDeadline();
66  shard_queue_[i] = &shard;
67  }
68 }
69 
70 namespace {
71 /* returns true if the first element in the list */
72 void ListJoin(Timer* head, Timer* timer) {
73  timer->next = head;
74  timer->prev = head->prev;
75  timer->next->prev = timer->prev->next = timer;
76 }
77 
78 void ListRemove(Timer* timer) {
79  timer->next->prev = timer->prev;
80  timer->prev->next = timer->next;
81 }
82 } // namespace
83 
84 void TimerList::SwapAdjacentShardsInQueue(uint32_t first_shard_queue_index) {
85  Shard* temp;
86  temp = shard_queue_[first_shard_queue_index];
87  shard_queue_[first_shard_queue_index] =
88  shard_queue_[first_shard_queue_index + 1];
89  shard_queue_[first_shard_queue_index + 1] = temp;
90  shard_queue_[first_shard_queue_index]->shard_queue_index =
91  first_shard_queue_index;
92  shard_queue_[first_shard_queue_index + 1]->shard_queue_index =
93  first_shard_queue_index + 1;
94 }
95 
97  while (shard->shard_queue_index > 0 &&
98  shard->min_deadline <
99  shard_queue_[shard->shard_queue_index - 1]->min_deadline) {
100  SwapAdjacentShardsInQueue(shard->shard_queue_index - 1);
101  }
102  while (shard->shard_queue_index < num_shards_ - 1 &&
103  shard->min_deadline >
104  shard_queue_[shard->shard_queue_index + 1]->min_deadline) {
105  SwapAdjacentShardsInQueue(shard->shard_queue_index);
106  }
107 }
108 
111  bool is_first_timer = false;
113  timer->closure = closure;
114  timer->deadline = deadline.milliseconds_after_process_epoch();
115 
116 #ifndef NDEBUG
117  timer->hash_table_next = nullptr;
118 #endif
119 
120  {
121  grpc_core::MutexLock lock(&shard->mu);
122  timer->pending = true;
124  if (deadline <= now) {
125  deadline = now;
126  }
127 
128  shard->stats.AddSample((deadline - now).millis() / 1000.0);
129 
130  if (deadline < shard->queue_deadline_cap) {
131  is_first_timer = shard->heap.Add(timer);
132  } else {
133  timer->heap_index = kInvalidHeapIndex;
134  ListJoin(&shard->list, timer);
135  }
136  }
137 
138  /* Deadline may have decreased, we need to adjust the main queue. Note
139  that there is a potential racy unlocked region here. There could be a
140  reordering of multiple TimerInit calls, at this point, but the < test
141  below should ensure that we err on the side of caution. There could
142  also be a race with TimerCheck, which might beat us to the lock. In
143  that case, it is possible that the timer that we added will have already
144  run by the time we hold the lock, but that too is a safe error.
145  Finally, it's possible that the TimerCheck that intervened failed to
146  trigger the new timer because the min_deadline hadn't yet been reduced.
147  In that case, the timer will simply have to wait for the next
148  TimerCheck. */
149  if (is_first_timer) {
150  grpc_core::MutexLock lock(&mu_);
151  if (deadline < shard->min_deadline) {
152  grpc_core::Timestamp old_min_deadline = shard_queue_[0]->min_deadline;
153  shard->min_deadline = deadline;
154  NoteDeadlineChange(shard);
155  if (shard->shard_queue_index == 0 && deadline < old_min_deadline) {
157  std::memory_order_relaxed);
158  host_->Kick();
159  }
160  }
161  }
162 }
163 
166  grpc_core::MutexLock lock(&shard->mu);
167 
168  if (timer->pending) {
169  timer->pending = false;
170  if (timer->heap_index == kInvalidHeapIndex) {
171  ListRemove(timer);
172  } else {
173  shard->heap.Remove(timer);
174  }
175  return true;
176  }
177 
178  return false;
179 }
180 
181 /* Rebalances the timer shard by computing a new 'queue_deadline_cap' and moving
182  all relevant timers in shard->list (i.e timers with deadlines earlier than
183  'queue_deadline_cap') into into shard->heap.
184  Returns 'true' if shard->heap has at least ONE element */
186  /* Compute the new queue window width and bound by the limits: */
187  double computed_deadline_delta = stats.UpdateAverage() * kAddDeadlineScale;
188  double deadline_delta =
189  grpc_core::Clamp(computed_deadline_delta, kMinQueueWindowDuration,
191  Timer *timer, *next;
192 
193  /* Compute the new cap and put all timers under it into the queue: */
194  queue_deadline_cap = std::max(now, queue_deadline_cap) +
196 
197  for (timer = list.next; timer != &list; timer = next) {
198  next = timer->next;
199  auto timer_deadline =
201  timer->deadline);
202 
203  if (timer_deadline < queue_deadline_cap) {
204  ListRemove(timer);
205  heap.Add(timer);
206  }
207  }
208  return !heap.is_empty();
209 }
210 
211 /* This pops the next non-cancelled timer with deadline <= now from the
212  queue, or returns NULL if there isn't one. */
214  Timer* timer;
215  for (;;) {
216  if (heap.is_empty()) {
217  if (now < queue_deadline_cap) return nullptr;
218  if (!RefillHeap(now)) return nullptr;
219  }
220  timer = heap.Top();
221  auto timer_deadline =
223  timer->deadline);
224  if (timer_deadline > now) return nullptr;
225  timer->pending = false;
226  heap.Pop();
227  return timer;
228  }
229 }
230 
232  grpc_core::Timestamp now, grpc_core::Timestamp* new_min_deadline,
233  std::vector<experimental::EventEngine::Closure*>* out) {
234  grpc_core::MutexLock lock(&mu);
235  while (Timer* timer = PopOne(now)) {
236  out->push_back(timer->closure);
237  }
238  *new_min_deadline = ComputeMinDeadline();
239 }
240 
241 std::vector<experimental::EventEngine::Closure*> TimerList::FindExpiredTimers(
245  min_timer_.load(std::memory_order_relaxed));
246 
247  std::vector<experimental::EventEngine::Closure*> done;
248  if (now < min_timer) {
249  if (next != nullptr) *next = std::min(*next, min_timer);
250  return done;
251  }
252 
253  grpc_core::MutexLock lock(&mu_);
254 
255  while (shard_queue_[0]->min_deadline < now ||
257  shard_queue_[0]->min_deadline == now)) {
258  grpc_core::Timestamp new_min_deadline;
259 
260  /* For efficiency, we pop as many available timers as we can from the
261  shard. This may violate perfect timer deadline ordering, but that
262  shouldn't be a big deal because we don't make ordering guarantees. */
263  shard_queue_[0]->PopTimers(now, &new_min_deadline, &done);
264 
265  /* An TimerInit() on the shard could intervene here, adding a new
266  timer that is earlier than new_min_deadline. However,
267  TimerInit() will block on the mutex before it can call
268  set_min_deadline, so this one will complete first and then the Addtimer
269  will reduce the min_deadline (perhaps unnecessarily). */
270  shard_queue_[0]->min_deadline = new_min_deadline;
271  NoteDeadlineChange(shard_queue_[0]);
272  }
273 
274  if (next) {
275  *next = std::min(*next, shard_queue_[0]->min_deadline);
276  }
277 
278  min_timer_.store(
279  shard_queue_[0]->min_deadline.milliseconds_after_process_epoch(),
280  std::memory_order_relaxed);
281 
282  return done;
283 }
284 
287  // prelude
289 
290  /* fetch from a thread-local first: this avoids contention on a globally
291  mutable cacheline in the common case */
294  min_timer_.load(std::memory_order_relaxed));
295 
296  if (now < min_timer) {
297  if (next != nullptr) {
298  *next = std::min(*next, min_timer);
299  }
300  return std::vector<experimental::EventEngine::Closure*>();
301  }
302 
303  if (!checker_mu_.TryLock()) return absl::nullopt;
304  std::vector<experimental::EventEngine::Closure*> run =
307 
308  return std::move(run);
309 }
310 
311 } // namespace iomgr_engine
312 } // namespace grpc_event_engine
gpr_cpu_num_cores
GPRAPI unsigned gpr_cpu_num_cores(void)
grpc_event_engine::iomgr_engine::TimerList::num_shards_
const size_t num_shards_
Definition: event_engine/iomgr_engine/timer.h:175
fix_build_deps.temp
temp
Definition: fix_build_deps.py:488
gen_build_yaml.out
dictionary out
Definition: src/benchmark/gen_build_yaml.py:24
now
static double now(void)
Definition: test/core/fling/client.cc:130
grpc_event_engine::iomgr_engine::TimerList::Shard
Definition: event_engine/iomgr_engine/timer.h:142
grpc_core::Duration::FromSecondsAsDouble
static Duration FromSecondsAsDouble(double seconds)
Definition: src/core/lib/gprpp/time.h:279
grpc_event_engine::iomgr_engine::TimerList::TimerCheck
absl::optional< std::vector< experimental::EventEngine::Closure * > > TimerCheck(grpc_core::Timestamp *next)
Definition: event_engine/iomgr_engine/timer.cc:286
grpc_event_engine::iomgr_engine::TimerList::Shard::PopOne
Timer * PopOne(grpc_core::Timestamp now) ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu)
Definition: event_engine/iomgr_engine/timer.cc:213
grpc_event_engine::iomgr_engine::TimerListHost::Kick
virtual void Kick()=0
grpc_core
Definition: call_metric_recorder.h:31
grpc_core::MutexLock
Definition: src/core/lib/gprpp/sync.h:88
grpc_core::Timestamp
Definition: src/core/lib/gprpp/time.h:62
useful.h
grpc_event_engine::iomgr_engine::TimerList::mu_
grpc_core::Mutex mu_
Definition: event_engine/iomgr_engine/timer.h:176
u
OPENSSL_EXPORT pem_password_cb void * u
Definition: pem.h:351
env.new
def new
Definition: env.py:51
grpc_event_engine::iomgr_engine::TimerList::TimerList
TimerList(TimerListHost *host)
Definition: event_engine/iomgr_engine/timer.cc:52
uint32_t
unsigned int uint32_t
Definition: stdint-msvc2008.h:80
grpc_event_engine::iomgr_engine::kAddDeadlineScale
static const double kAddDeadlineScale
Definition: event_engine/iomgr_engine/timer.cc:39
absl::move
constexpr absl::remove_reference_t< T > && move(T &&t) noexcept
Definition: abseil-cpp/absl/utility/utility.h:221
max
int max
Definition: bloaty/third_party/zlib/examples/enough.c:170
mu
Mutex mu
Definition: server_config_selector_filter.cc:74
grpc_event_engine::iomgr_engine::TimerList::Shard::PopTimers
void PopTimers(grpc_core::Timestamp now, grpc_core::Timestamp *new_min_deadline, std::vector< experimental::EventEngine::Closure * > *out) ABSL_LOCKS_EXCLUDED(mu)
Definition: event_engine/iomgr_engine/timer.cc:231
closure
grpc_closure closure
Definition: src/core/lib/surface/server.cc:466
grpc_core::Timestamp::milliseconds_after_process_epoch
uint64_t milliseconds_after_process_epoch() const
Definition: src/core/lib/gprpp/time.h:109
gen_stats_data.stats
list stats
Definition: gen_stats_data.py:58
absl::optional
Definition: abseil-cpp/absl/types/internal/optional.h:61
min_timer
grpc_core::Timestamp min_timer
Definition: timer_generic.cc:220
grpc_event_engine::iomgr_engine::TimerList::Shard::ComputeMinDeadline
grpc_core::Timestamp ComputeMinDeadline() ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu)
Definition: event_engine/iomgr_engine/timer.cc:43
done
struct tab * done
Definition: bloaty/third_party/zlib/examples/enough.c:176
cpu.h
grpc_event_engine::iomgr_engine::kMinQueueWindowDuration
static const double kMinQueueWindowDuration
Definition: event_engine/iomgr_engine/timer.cc:40
grpc_event_engine::iomgr_engine::TimerList::host_
TimerListHost *const host_
Definition: event_engine/iomgr_engine/timer.h:174
time.h
grpc_event_engine::iomgr_engine::TimerList::min_timer_
std::atomic< uint64_t > min_timer_
Definition: event_engine/iomgr_engine/timer.h:178
grpc_event_engine::iomgr_engine::kInvalidHeapIndex
static const size_t kInvalidHeapIndex
Definition: event_engine/iomgr_engine/timer.cc:38
grpc_event_engine::iomgr_engine::TimerList::TimerCancel
bool TimerCancel(Timer *timer) GRPC_MUST_USE_RESULT
Definition: event_engine/iomgr_engine/timer.cc:164
grpc_core::HashPointer
constexpr size_t HashPointer(T *p, size_t range)
Definition: useful.h:102
grpc_core::Duration::Epsilon
static constexpr Duration Epsilon()
Definition: src/core/lib/gprpp/time.h:133
grpc_event_engine::iomgr_engine::TimerList::shards_
const std::unique_ptr< Shard[]> shards_
Definition: event_engine/iomgr_engine/timer.h:184
grpc_event_engine::iomgr_engine::TimerList::NoteDeadlineChange
void NoteDeadlineChange(Shard *shard) ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_)
Definition: event_engine/iomgr_engine/timer.cc:96
min
#define min(a, b)
Definition: qsort.h:83
grpc_event_engine::iomgr_engine::Timer
Definition: event_engine/iomgr_engine/timer.h:44
grpc_event_engine::iomgr_engine::Timer::prev
struct Timer * prev
Definition: event_engine/iomgr_engine/timer.h:50
grpc_core::Timestamp::FromMillisecondsAfterProcessEpoch
static constexpr Timestamp FromMillisecondsAfterProcessEpoch(int64_t millis)
Definition: src/core/lib/gprpp/time.h:73
grpc_event_engine::iomgr_engine::TimerList::TimerInit
void TimerInit(Timer *timer, grpc_core::Timestamp deadline, experimental::EventEngine::Closure *closure)
Definition: event_engine/iomgr_engine/timer.cc:109
grpc_event_engine::iomgr_engine::TimerList::Shard::mu
grpc_core::Mutex mu
Definition: event_engine/iomgr_engine/timer.h:153
absl::Now
ABSL_NAMESPACE_BEGIN Time Now()
Definition: abseil-cpp/absl/time/clock.cc:39
grpc_core::Mutex::Unlock
void Unlock() ABSL_UNLOCK_FUNCTION()
Definition: src/core/lib/gprpp/sync.h:70
grpc_event_engine::experimental::EventEngine::Closure
Definition: event_engine.h:87
grpc_core::Clamp
T Clamp(T val, T min, T max)
Definition: useful.h:31
client.run
def run()
Definition: examples/python/async_streaming/client.py:109
grpc_event_engine::iomgr_engine::TimerList::Shard::Shard
Shard()
Definition: event_engine/iomgr_engine/timer.cc:50
grpc_event_engine::iomgr_engine::TimerList::Shard::RefillHeap
bool RefillHeap(grpc_core::Timestamp now) ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu)
Definition: event_engine/iomgr_engine/timer.cc:185
grpc_core::Mutex::TryLock
bool TryLock() ABSL_EXCLUSIVE_TRYLOCK_FUNCTION(true)
Definition: src/core/lib/gprpp/sync.h:71
next
AllocList * next[kMaxLevel]
Definition: abseil-cpp/absl/base/internal/low_level_alloc.cc:100
grpc_event_engine
Definition: endpoint_config.h:24
timer_heap.h
grpc_event_engine::iomgr_engine::TimerList::SwapAdjacentShardsInQueue
void SwapAdjacentShardsInQueue(uint32_t first_shard_queue_index) ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_)
Definition: event_engine/iomgr_engine/timer.cc:84
closure
Definition: proxy.cc:59
grpc_event_engine::iomgr_engine::TimerListHost::Now
virtual grpc_core::Timestamp Now()=0
grpc_event_engine::iomgr_engine::TimerList::FindExpiredTimers
std::vector< experimental::EventEngine::Closure * > FindExpiredTimers(grpc_core::Timestamp now, grpc_core::Timestamp *next)
Definition: event_engine/iomgr_engine/timer.cc:241
grpc_event_engine::iomgr_engine::TimerList::checker_mu_
grpc_core::Mutex checker_mu_
Definition: event_engine/iomgr_engine/timer.h:181
grpc_core::Timestamp::InfFuture
static constexpr Timestamp InfFuture()
Definition: src/core/lib/gprpp/time.h:79
grpc_event_engine::iomgr_engine::kMaxQueueWindowDuration
static const double kMaxQueueWindowDuration
Definition: event_engine/iomgr_engine/timer.cc:41
heap
Definition: heap-inl.h:40
timer.h
i
uint64_t i
Definition: abseil-cpp/absl/container/btree_benchmark.cc:230
timer
static uv_timer_t timer
Definition: test-callback-stack.c:34
grpc_event_engine::iomgr_engine::TimerListHost
Definition: event_engine/iomgr_engine/timer.h:61
port_platform.h


grpc
Author(s):
autogenerated on Fri May 16 2025 03:00:38