mpmcqueue.h
Go to the documentation of this file.
1 /*
2  *
3  * Copyright 2019 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
19 #ifndef GRPC_CORE_LIB_IOMGR_EXECUTOR_MPMCQUEUE_H
20 #define GRPC_CORE_LIB_IOMGR_EXECUTOR_MPMCQUEUE_H
21 
23 
24 #include <atomic>
25 
28 
29 namespace grpc_core {
30 
32 
33 // Abstract base class of a Multiple-Producer-Multiple-Consumer(MPMC) queue
34 // interface
36  public:
37  virtual ~MPMCQueueInterface() {}
38 
39  // Puts elem into queue immediately at the end of queue.
40  // This might cause to block on full queue depending on implementation.
41  virtual void Put(void* elem) = 0;
42 
43  // Removes the oldest element from the queue and return it.
44  // This might cause to block on empty queue depending on implementation.
45  // Optional argument for collecting stats purpose.
46  virtual void* Get(gpr_timespec* wait_time) = 0;
47 
48  // Returns number of elements in the queue currently
49  virtual int count() const = 0;
50 };
51 
53  public:
54  // Creates a new MPMC Queue. The queue created will have infinite length.
56 
57  // Releases all resources held by the queue. The queue must be empty, and no
58  // one waits on conditional variables.
59  ~InfLenFIFOQueue() override;
60 
61  // Puts elem into queue immediately at the end of queue. Since the queue has
62  // infinite length, this routine will never block and should never fail.
63  void Put(void* elem) override;
64 
65  // Removes the oldest element from the queue and returns it.
66  // This routine will cause the thread to block if queue is currently empty.
67  // Argument wait_time should be passed in when trace flag turning on (for
68  // collecting stats info purpose.)
69  void* Get(gpr_timespec* wait_time) override;
70 
71  // Returns number of elements in queue currently.
72  // There might be concurrently add/remove on queue, so count might change
73  // quickly.
74  int count() const override { return count_.load(std::memory_order_relaxed); }
75 
76  struct Node {
77  Node* next = nullptr; // Linking
78  Node* prev = nullptr;
79  void* content = nullptr; // Points to actual element
80  gpr_timespec insert_time; // Time for stats
81  };
82 
83  // For test purpose only. Returns number of nodes allocated in queue.
84  // Any allocated node will be alive until the destruction of the queue.
85  int num_nodes() const { return num_nodes_; }
86 
87  // For test purpose only. Returns the initial number of nodes in queue.
88  int init_num_nodes() const { return kQueueInitNumNodes; }
89 
90  private:
91  // For Internal Use Only.
92  // Removes the oldest element from the queue and returns it. This routine
93  // will NOT check whether queue is empty, and it will NOT acquire mutex.
94  // Caller MUST check that queue is not empty and must acquire mutex before
95  // callling.
96  void* PopFront();
97 
98  // Stats of queue. This will only be collect when debug trace mode is on.
99  // All printed stats info will have time measurement in microsecond.
100  struct Stats {
101  uint64_t num_started; // Number of elements have been added to queue
102  uint64_t num_completed; // Number of elements have been removed from
103  // the queue
104  gpr_timespec total_queue_time; // Total waiting time that all the
105  // removed elements have spent in queue
106  gpr_timespec max_queue_time; // Max waiting time among all removed
107  // elements
108  gpr_timespec busy_queue_time; // Accumulated amount of time that queue
109  // was not empty
110 
111  Stats() {
112  num_started = 0;
113  num_completed = 0;
117  }
118  };
119 
120  // Node for waiting thread queue. Stands for one waiting thread, should have
121  // exact one thread waiting on its CondVar.
122  // Using a doubly linked list for waiting thread queue to wake up waiting
123  // threads in LIFO order to reduce cache misses.
124  struct Waiter {
128  };
129 
130  // Pushs waiter to the front of queue, require caller held mutex
131  void PushWaiter(Waiter* waiter);
132 
133  // Removes waiter from queue, require caller held mutex
134  void RemoveWaiter(Waiter* waiter);
135 
136  // Returns pointer to the waiter that should be waken up next, should be the
137  // last added waiter.
138  Waiter* TopWaiter();
139 
140  Mutex mu_; // Protecting lock
141  Waiter waiters_; // Head of waiting thread queue
142 
143  // Initial size for delete list
144  static const int kDeleteListInitSize = 1024;
145  // Initial number of nodes allocated
146  static const int kQueueInitNumNodes = 1024;
147 
148  Node** delete_list_ = nullptr; // Keeps track of all allocated array entries
149  // for deleting on destruction
150  size_t delete_list_count_ = 0; // Number of entries in list
151  size_t delete_list_size_ = 0; // Size of the list. List will be expanded to
152  // double size on full
153 
154  Node* queue_head_ = nullptr; // Head of the queue, remove position
155  Node* queue_tail_ = nullptr; // End of queue, insert position
156  std::atomic<int> count_{0}; // Number of elements in queue
157  int num_nodes_ = 0; // Number of nodes allocated
158 
159  Stats stats_; // Stats info
160  gpr_timespec busy_time; // Start time of busy queue
161 
162  // Internal Helper.
163  // Allocates an array of nodes of size "num", links all nodes together except
164  // the first node's prev and last node's next. They should be set by caller
165  // manually afterward.
166  Node* AllocateNodes(int num);
167 };
168 
169 } // namespace grpc_core
170 
171 #endif /* GRPC_CORE_LIB_IOMGR_EXECUTOR_MPMCQUEUE_H */
GPR_TIMESPAN
@ GPR_TIMESPAN
Definition: gpr_types.h:45
grpc_core::InfLenFIFOQueue::PushWaiter
void PushWaiter(Waiter *waiter)
Definition: mpmcqueue.cc:167
grpc_core::InfLenFIFOQueue
Definition: mpmcqueue.h:52
grpc_core::InfLenFIFOQueue::Stats::Stats
Stats()
Definition: mpmcqueue.h:111
grpc_core::CondVar
Definition: src/core/lib/gprpp/sync.h:126
grpc_core::InfLenFIFOQueue::Waiter::next
Waiter * next
Definition: mpmcqueue.h:126
grpc_core::InfLenFIFOQueue::Stats::total_queue_time
gpr_timespec total_queue_time
Definition: mpmcqueue.h:104
grpc_core::InfLenFIFOQueue::num_nodes_
int num_nodes_
Definition: mpmcqueue.h:157
gpr_time_0
GPRAPI gpr_timespec gpr_time_0(gpr_clock_type type)
Definition: src/core/lib/gpr/time.cc:47
grpc_core
Definition: call_metric_recorder.h:31
grpc_core::InfLenFIFOQueue::busy_time
gpr_timespec busy_time
Definition: mpmcqueue.h:160
grpc_core::InfLenFIFOQueue::Waiter::prev
Waiter * prev
Definition: mpmcqueue.h:127
elem
Timer elem
Definition: event_engine/iomgr_event_engine/timer_heap_test.cc:109
grpc_core::InfLenFIFOQueue::Node::content
void * content
Definition: mpmcqueue.h:79
grpc_core::InfLenFIFOQueue::Put
void Put(void *elem) override
Definition: mpmcqueue.cc:102
grpc_core::MPMCQueueInterface::count
virtual int count() const =0
grpc_core::InfLenFIFOQueue::delete_list_count_
size_t delete_list_count_
Definition: mpmcqueue.h:150
grpc_core::InfLenFIFOQueue::Node::prev
Node * prev
Definition: mpmcqueue.h:78
grpc_core::grpc_thread_pool_trace
DebugOnlyTraceFlag grpc_thread_pool_trace(false, "thread_pool")
Definition: mpmcqueue.h:31
grpc_core::MPMCQueueInterface::~MPMCQueueInterface
virtual ~MPMCQueueInterface()
Definition: mpmcqueue.h:37
stats.h
grpc_core::InfLenFIFOQueue::delete_list_
Node ** delete_list_
Definition: mpmcqueue.h:148
grpc_core::InfLenFIFOQueue::count
int count() const override
Definition: mpmcqueue.h:74
grpc_core::InfLenFIFOQueue::TopWaiter
Waiter * TopWaiter()
Definition: mpmcqueue.cc:180
grpc_core::InfLenFIFOQueue::queue_head_
Node * queue_head_
Definition: mpmcqueue.h:154
grpc_core::InfLenFIFOQueue::Waiter::cv
CondVar cv
Definition: mpmcqueue.h:125
grpc_core::InfLenFIFOQueue::Stats::num_started
uint64_t num_started
Definition: mpmcqueue.h:101
grpc_core::InfLenFIFOQueue::Stats::max_queue_time
gpr_timespec max_queue_time
Definition: mpmcqueue.h:106
grpc_core::InfLenFIFOQueue::PopFront
void * PopFront()
Definition: mpmcqueue.cc:27
grpc_core::InfLenFIFOQueue::stats_
Stats stats_
Definition: mpmcqueue.h:159
grpc_core::InfLenFIFOQueue::Node
Definition: mpmcqueue.h:76
uint64_t
unsigned __int64 uint64_t
Definition: stdint-msvc2008.h:90
grpc_core::InfLenFIFOQueue::init_num_nodes
int init_num_nodes() const
Definition: mpmcqueue.h:88
grpc_core::InfLenFIFOQueue::delete_list_size_
size_t delete_list_size_
Definition: mpmcqueue.h:151
grpc_core::InfLenFIFOQueue::AllocateNodes
Node * AllocateNodes(int num)
Definition: mpmcqueue.cc:68
grpc_core::MPMCQueueInterface::Get
virtual void * Get(gpr_timespec *wait_time)=0
grpc_core::InfLenFIFOQueue::count_
std::atomic< int > count_
Definition: mpmcqueue.h:156
grpc_core::TraceFlag
Definition: debug/trace.h:63
grpc_core::InfLenFIFOQueue::mu_
Mutex mu_
Definition: mpmcqueue.h:140
grpc_core::InfLenFIFOQueue::Waiter
Definition: mpmcqueue.h:124
grpc_core::InfLenFIFOQueue::~InfLenFIFOQueue
~InfLenFIFOQueue() override
Definition: mpmcqueue.cc:94
grpc_core::Mutex
Definition: src/core/lib/gprpp/sync.h:61
grpc_core::InfLenFIFOQueue::RemoveWaiter
void RemoveWaiter(Waiter *waiter)
Definition: mpmcqueue.cc:174
grpc_core::MPMCQueueInterface
Definition: mpmcqueue.h:35
grpc_core::InfLenFIFOQueue::Node::next
Node * next
Definition: mpmcqueue.h:77
grpc_core::InfLenFIFOQueue::Node::insert_time
gpr_timespec insert_time
Definition: mpmcqueue.h:80
grpc_core::InfLenFIFOQueue::kQueueInitNumNodes
static const int kQueueInitNumNodes
Definition: mpmcqueue.h:146
grpc_core::InfLenFIFOQueue::waiters_
Waiter waiters_
Definition: mpmcqueue.h:141
xds_manager.num
num
Definition: xds_manager.py:56
grpc_core::InfLenFIFOQueue::num_nodes
int num_nodes() const
Definition: mpmcqueue.h:85
grpc_core::InfLenFIFOQueue::InfLenFIFOQueue
InfLenFIFOQueue()
Definition: mpmcqueue.cc:80
grpc_core::InfLenFIFOQueue::Stats::busy_queue_time
gpr_timespec busy_queue_time
Definition: mpmcqueue.h:108
gpr_timespec
Definition: gpr_types.h:50
grpc_core::InfLenFIFOQueue::queue_tail_
Node * queue_tail_
Definition: mpmcqueue.h:155
grpc_core::InfLenFIFOQueue::Get
void * Get(gpr_timespec *wait_time) override
Definition: mpmcqueue.cc:142
grpc_core::InfLenFIFOQueue::kDeleteListInitSize
static const int kDeleteListInitSize
Definition: mpmcqueue.h:144
grpc_core::InfLenFIFOQueue::Stats
Definition: mpmcqueue.h:100
grpc_core::InfLenFIFOQueue::Stats::num_completed
uint64_t num_completed
Definition: mpmcqueue.h:102
sync.h
grpc_core::MPMCQueueInterface::Put
virtual void Put(void *elem)=0
port_platform.h


grpc
Author(s):
autogenerated on Thu Mar 13 2025 03:00:41