mpmcqueue.cc
Go to the documentation of this file.
1 /*
2  *
3  * Copyright 2019 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
20 
22 
23 namespace grpc_core {
24 
25 DebugOnlyTraceFlag grpc_thread_pool_trace(false, "thread_pool");
26 
27 inline void* InfLenFIFOQueue::PopFront() {
28  // Caller should already check queue is not empty and has already held the
29  // mutex. This function will assume that there is at least one element in the
30  // queue (i.e. queue_head_->content is valid).
31  void* result = queue_head_->content;
32  count_.store(count_.load(std::memory_order_relaxed) - 1,
33  std::memory_order_relaxed);
34 
35  // Updates Stats when trace flag turned on.
37  gpr_timespec wait_time =
43 
44  if (count_.load(std::memory_order_relaxed) == 0) {
48  }
49 
51  "[InfLenFIFOQueue PopFront] num_completed: %" PRIu64
52  " total_queue_time: %f max_queue_time: %f busy_queue_time: %f",
57  }
58 
60  // Signal waiting thread
61  if (count_.load(std::memory_order_relaxed) > 0) {
62  TopWaiter()->cv.Signal();
63  }
64 
65  return result;
66 }
67 
70  Node* new_chunk = new Node[num];
71  new_chunk[0].next = &new_chunk[1];
72  new_chunk[num - 1].prev = &new_chunk[num - 2];
73  for (int i = 1; i < num - 1; ++i) {
74  new_chunk[i].prev = &new_chunk[i - 1];
75  new_chunk[i].next = &new_chunk[i + 1];
76  }
77  return new_chunk;
78 }
79 
83 
85  delete_list_[delete_list_count_++] = new_chunk;
86  queue_head_ = queue_tail_ = new_chunk;
87  new_chunk[0].prev = &new_chunk[kQueueInitNumNodes - 1];
88  new_chunk[kQueueInitNumNodes - 1].next = &new_chunk[0];
89 
92 }
93 
95  GPR_ASSERT(count_.load(std::memory_order_relaxed) == 0);
96  for (size_t i = 0; i < delete_list_count_; ++i) {
97  delete[] delete_list_[i];
98  }
99  delete[] delete_list_;
100 }
101 
103  MutexLock l(&mu_);
104 
105  int curr_count = count_.load(std::memory_order_relaxed);
106 
107  if (queue_tail_ == queue_head_ && curr_count != 0) {
108  // List is full. Expands list to double size by inserting new chunk of nodes
109  Node* new_chunk = AllocateNodes(curr_count);
110  delete_list_[delete_list_count_++] = new_chunk;
111  // Expands delete list on full.
115  }
116  new_chunk[0].prev = queue_tail_->prev;
117  new_chunk[curr_count - 1].next = queue_head_;
118  queue_tail_->prev->next = new_chunk;
119  queue_head_->prev = &new_chunk[curr_count - 1];
120  queue_tail_ = new_chunk;
121  }
122  queue_tail_->content = static_cast<void*>(elem);
123 
124  // Updates Stats info
127  gpr_log(GPR_INFO, "[InfLenFIFOQueue Put] num_started: %" PRIu64,
129  auto current_time = gpr_now(GPR_CLOCK_MONOTONIC);
130  if (curr_count == 0) {
131  busy_time = current_time;
132  }
133  queue_tail_->insert_time = current_time;
134  }
135 
136  count_.store(curr_count + 1, std::memory_order_relaxed);
138 
139  TopWaiter()->cv.Signal();
140 }
141 
143  MutexLock l(&mu_);
144 
145  if (count_.load(std::memory_order_relaxed) == 0) {
148  wait_time != nullptr) {
150  }
151 
152  Waiter self;
153  PushWaiter(&self);
154  do {
155  self.cv.Wait(&mu_);
156  } while (count_.load(std::memory_order_relaxed) == 0);
157  RemoveWaiter(&self);
159  wait_time != nullptr) {
161  }
162  }
163  GPR_DEBUG_ASSERT(count_.load(std::memory_order_relaxed) > 0);
164  return PopFront();
165 }
166 
168  waiter->next = waiters_.next;
169  waiter->prev = &waiters_;
170  waiter->next->prev = waiter;
171  waiter->prev->next = waiter;
172 }
173 
175  GPR_DEBUG_ASSERT(waiter != &waiters_);
176  waiter->next->prev = waiter->prev;
177  waiter->prev->next = waiter->next;
178 }
179 
181 
182 } // namespace grpc_core
gpr_timespec_to_micros
GPRAPI double gpr_timespec_to_micros(gpr_timespec t)
Definition: src/core/lib/gpr/time.cc:237
GPR_TIMESPAN
@ GPR_TIMESPAN
Definition: gpr_types.h:45
_gevent_test_main.result
result
Definition: _gevent_test_main.py:96
GPR_INFO
#define GPR_INFO
Definition: include/grpc/impl/codegen/log.h:56
grpc_core::InfLenFIFOQueue::PushWaiter
void PushWaiter(Waiter *waiter)
Definition: mpmcqueue.cc:167
grpc_core::InfLenFIFOQueue::Waiter::next
Waiter * next
Definition: mpmcqueue.h:126
GPR_DEBUG_ASSERT
#define GPR_DEBUG_ASSERT(x)
Definition: include/grpc/impl/codegen/log.h:103
grpc_core::InfLenFIFOQueue::Stats::total_queue_time
gpr_timespec total_queue_time
Definition: mpmcqueue.h:104
grpc_core::InfLenFIFOQueue::num_nodes_
int num_nodes_
Definition: mpmcqueue.h:157
grpc_core
Definition: call_metric_recorder.h:31
grpc_core::MutexLock
Definition: src/core/lib/gprpp/sync.h:88
grpc_core::InfLenFIFOQueue::busy_time
gpr_timespec busy_time
Definition: mpmcqueue.h:160
grpc_core::InfLenFIFOQueue::Waiter::prev
Waiter * prev
Definition: mpmcqueue.h:127
mpmcqueue.h
elem
Timer elem
Definition: event_engine/iomgr_event_engine/timer_heap_test.cc:109
grpc_core::InfLenFIFOQueue::Node::content
void * content
Definition: mpmcqueue.h:79
grpc_core::InfLenFIFOQueue::Put
void Put(void *elem) override
Definition: mpmcqueue.cc:102
start_time
static int64_t start_time
Definition: benchmark-getaddrinfo.c:37
grpc_core::InfLenFIFOQueue::delete_list_count_
size_t delete_list_count_
Definition: mpmcqueue.h:150
grpc_core::InfLenFIFOQueue::Node::prev
Node * prev
Definition: mpmcqueue.h:78
grpc_core::grpc_thread_pool_trace
DebugOnlyTraceFlag grpc_thread_pool_trace(false, "thread_pool")
Definition: mpmcqueue.h:31
GRPC_TRACE_FLAG_ENABLED
#define GRPC_TRACE_FLAG_ENABLED(f)
Definition: debug/trace.h:114
grpc_core::InfLenFIFOQueue::delete_list_
Node ** delete_list_
Definition: mpmcqueue.h:148
grpc_core::InfLenFIFOQueue::TopWaiter
Waiter * TopWaiter()
Definition: mpmcqueue.cc:180
grpc_core::InfLenFIFOQueue::queue_head_
Node * queue_head_
Definition: mpmcqueue.h:154
grpc_core::InfLenFIFOQueue::Waiter::cv
CondVar cv
Definition: mpmcqueue.h:125
GPR_ASSERT
#define GPR_ASSERT(x)
Definition: include/grpc/impl/codegen/log.h:94
grpc_core::InfLenFIFOQueue::Stats::num_started
uint64_t num_started
Definition: mpmcqueue.h:101
grpc_core::InfLenFIFOQueue::Stats::max_queue_time
gpr_timespec max_queue_time
Definition: mpmcqueue.h:106
gpr_time_sub
GPRAPI gpr_timespec gpr_time_sub(gpr_timespec a, gpr_timespec b)
Definition: src/core/lib/gpr/time.cc:168
grpc_core::InfLenFIFOQueue::PopFront
void * PopFront()
Definition: mpmcqueue.cc:27
gpr_log
GPRAPI void gpr_log(const char *file, int line, gpr_log_severity severity, const char *format,...) GPR_PRINT_FORMAT_CHECK(4
grpc_core::InfLenFIFOQueue::stats_
Stats stats_
Definition: mpmcqueue.h:159
grpc_core::CondVar::Signal
void Signal()
Definition: src/core/lib/gprpp/sync.h:134
grpc_core::InfLenFIFOQueue::Node
Definition: mpmcqueue.h:76
grpc_core::InfLenFIFOQueue::delete_list_size_
size_t delete_list_size_
Definition: mpmcqueue.h:151
GPR_CLOCK_MONOTONIC
@ GPR_CLOCK_MONOTONIC
Definition: gpr_types.h:36
grpc_core::InfLenFIFOQueue::AllocateNodes
Node * AllocateNodes(int num)
Definition: mpmcqueue.cc:68
gpr_now
GPRAPI gpr_timespec gpr_now(gpr_clock_type clock)
grpc_core::InfLenFIFOQueue::count_
std::atomic< int > count_
Definition: mpmcqueue.h:156
grpc_core::InfLenFIFOQueue::mu_
Mutex mu_
Definition: mpmcqueue.h:140
grpc_core::InfLenFIFOQueue::Waiter
Definition: mpmcqueue.h:124
grpc_core::InfLenFIFOQueue::~InfLenFIFOQueue
~InfLenFIFOQueue() override
Definition: mpmcqueue.cc:94
grpc_core::InfLenFIFOQueue::RemoveWaiter
void RemoveWaiter(Waiter *waiter)
Definition: mpmcqueue.cc:174
gpr_time_max
GPRAPI gpr_timespec gpr_time_max(gpr_timespec a, gpr_timespec b)
Definition: src/core/lib/gpr/time.cc:43
grpc_core::InfLenFIFOQueue::Node::next
Node * next
Definition: mpmcqueue.h:77
grpc_core::InfLenFIFOQueue::Node::insert_time
gpr_timespec insert_time
Definition: mpmcqueue.h:80
gpr_time_add
GPRAPI gpr_timespec gpr_time_add(gpr_timespec a, gpr_timespec b)
Definition: src/core/lib/gpr/time.cc:135
gpr_convert_clock_type
GPRAPI gpr_timespec gpr_convert_clock_type(gpr_timespec t, gpr_clock_type clock_type)
Definition: src/core/lib/gpr/time.cc:241
grpc_core::InfLenFIFOQueue::kQueueInitNumNodes
static const int kQueueInitNumNodes
Definition: mpmcqueue.h:146
grpc_core::InfLenFIFOQueue::waiters_
Waiter waiters_
Definition: mpmcqueue.h:141
xds_manager.num
num
Definition: xds_manager.py:56
grpc_core::InfLenFIFOQueue::InfLenFIFOQueue
InfLenFIFOQueue()
Definition: mpmcqueue.cc:80
grpc_core::InfLenFIFOQueue::Stats::busy_queue_time
gpr_timespec busy_queue_time
Definition: mpmcqueue.h:108
gpr_timespec
Definition: gpr_types.h:50
grpc_core::InfLenFIFOQueue::queue_tail_
Node * queue_tail_
Definition: mpmcqueue.h:155
grpc_core::InfLenFIFOQueue::Get
void * Get(gpr_timespec *wait_time) override
Definition: mpmcqueue.cc:142
grpc_core::InfLenFIFOQueue::kDeleteListInitSize
static const int kDeleteListInitSize
Definition: mpmcqueue.h:144
grpc_core::DebugOnlyTraceFlag
TraceFlag DebugOnlyTraceFlag
Definition: debug/trace.h:117
grpc_core::InfLenFIFOQueue::Stats::num_completed
uint64_t num_completed
Definition: mpmcqueue.h:102
i
uint64_t i
Definition: abseil-cpp/absl/container/btree_benchmark.cc:230
port_platform.h


grpc
Author(s):
autogenerated on Thu Mar 13 2025 03:00:41