work_serializer.cc
Go to the documentation of this file.
1 /*
2  *
3  * Copyright 2019 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
20 
22 
23 namespace grpc_core {
24 
25 DebugOnlyTraceFlag grpc_work_serializer_trace(false, "work_serializer");
26 
28  public:
29  void Run(std::function<void()> callback, const DebugLocation& location);
30  void Schedule(std::function<void()> callback, const DebugLocation& location);
31  void DrainQueue();
32  void Orphan() override;
33 
34  private:
35  struct CallbackWrapper {
37  : callback(std::move(cb)), location(loc) {}
38 
40  const std::function<void()> callback;
42  };
43 
44  // Callers of DrainQueueOwned should make sure to grab the lock on the
45  // workserializer with
46  //
47  // prev_ref_pair =
48  // refs_.fetch_add(MakeRefPair(1, 1), std::memory_order_acq_rel);
49  //
50  // and only invoke DrainQueueOwned() if there was previously no owner. Note
51  // that the queue size is also incremented as part of the fetch_add to allow
52  // the callers to add a callback to the queue if another thread already holds
53  // the lock to the work serializer.
54  void DrainQueueOwned();
55 
56  // First 16 bits indicate ownership of the WorkSerializer, next 48 bits are
57  // queue size (i.e., refs).
59  GPR_ASSERT(size >> 48 == 0);
60  return (static_cast<uint64_t>(owners) << 48) + static_cast<int64_t>(size);
61  }
62  static uint32_t GetOwners(uint64_t ref_pair) {
63  return static_cast<uint32_t>(ref_pair >> 48);
64  }
65  static uint64_t GetSize(uint64_t ref_pair) {
66  return static_cast<uint64_t>(ref_pair & 0xffffffffffffu);
67  }
68 
69  // An initial size of 1 keeps track of whether the work serializer has been
70  // orphaned.
71  std::atomic<uint64_t> refs_{MakeRefPair(0, 1)};
73 };
74 
76  const DebugLocation& location) {
78  gpr_log(GPR_INFO, "WorkSerializer::Run() %p Scheduling callback [%s:%d]",
79  this, location.file(), location.line());
80  }
81  // Increment queue size for the new callback and owner count to attempt to
82  // take ownership of the WorkSerializer.
83  const uint64_t prev_ref_pair =
84  refs_.fetch_add(MakeRefPair(1, 1), std::memory_order_acq_rel);
85  // The work serializer should not have been orphaned.
86  GPR_DEBUG_ASSERT(GetSize(prev_ref_pair) > 0);
87  if (GetOwners(prev_ref_pair) == 0) {
88  // We took ownership of the WorkSerializer. Invoke callback and drain queue.
90  gpr_log(GPR_INFO, " Executing immediately");
91  }
92  callback();
94  } else {
95  // Another thread is holding the WorkSerializer, so decrement the ownership
96  // count we just added and queue the callback.
97  refs_.fetch_sub(MakeRefPair(1, 0), std::memory_order_acq_rel);
98  CallbackWrapper* cb_wrapper =
99  new CallbackWrapper(std::move(callback), location);
101  gpr_log(GPR_INFO, " Scheduling on queue : item %p", cb_wrapper);
102  }
103  queue_.Push(&cb_wrapper->mpscq_node);
104  }
105 }
106 
108  std::function<void()> callback, const DebugLocation& location) {
109  CallbackWrapper* cb_wrapper =
110  new CallbackWrapper(std::move(callback), location);
113  "WorkSerializer::Schedule() %p Scheduling callback %p [%s:%d]",
114  this, cb_wrapper, location.file(), location.line());
115  }
116  refs_.fetch_add(MakeRefPair(0, 1), std::memory_order_acq_rel);
117  queue_.Push(&cb_wrapper->mpscq_node);
118 }
119 
122  gpr_log(GPR_INFO, "WorkSerializer::Orphan() %p", this);
123  }
124  const uint64_t prev_ref_pair =
125  refs_.fetch_sub(MakeRefPair(0, 1), std::memory_order_acq_rel);
126  if (GetOwners(prev_ref_pair) == 0 && GetSize(prev_ref_pair) == 1) {
128  gpr_log(GPR_INFO, " Destroying");
129  }
130  delete this;
131  }
132 }
133 
134 // The thread that calls this loans itself to the work serializer so as to
135 // execute all the scheduled callbacks.
138  gpr_log(GPR_INFO, "WorkSerializer::DrainQueue() %p", this);
139  }
140  // Attempt to take ownership of the WorkSerializer. Also increment the queue
141  // size as required by `DrainQueueOwned()`.
142  const uint64_t prev_ref_pair =
143  refs_.fetch_add(MakeRefPair(1, 1), std::memory_order_acq_rel);
144  if (GetOwners(prev_ref_pair) == 0) {
145  // We took ownership of the WorkSerializer. Drain the queue.
146  DrainQueueOwned();
147  } else {
148  // Another thread is holding the WorkSerializer, so decrement the ownership
149  // count we just added and queue a no-op callback.
150  refs_.fetch_sub(MakeRefPair(1, 0), std::memory_order_acq_rel);
151  CallbackWrapper* cb_wrapper = new CallbackWrapper([]() {}, DEBUG_LOCATION);
152  queue_.Push(&cb_wrapper->mpscq_node);
153  }
154 }
155 
158  gpr_log(GPR_INFO, "WorkSerializer::DrainQueueOwned() %p", this);
159  }
160  while (true) {
161  auto prev_ref_pair = refs_.fetch_sub(MakeRefPair(0, 1));
162  // It is possible that while draining the queue, the last callback ended
163  // up orphaning the work serializer. In that case, delete the object.
164  if (GetSize(prev_ref_pair) == 1) {
166  gpr_log(GPR_INFO, " Queue Drained. Destroying");
167  }
168  delete this;
169  return;
170  }
171  if (GetSize(prev_ref_pair) == 2) {
172  // Queue drained. Give up ownership but only if queue remains empty.
173  uint64_t expected = MakeRefPair(1, 1);
174  if (refs_.compare_exchange_strong(expected, MakeRefPair(0, 1),
175  std::memory_order_acq_rel)) {
176  // Queue is drained.
177  return;
178  }
179  if (GetSize(expected) == 0) {
180  // WorkSerializer got orphaned while this was running
182  gpr_log(GPR_INFO, " Queue Drained. Destroying");
183  }
184  delete this;
185  return;
186  }
187  }
188  // There is at least one callback on the queue. Pop the callback from the
189  // queue and execute it.
190  CallbackWrapper* cb_wrapper = nullptr;
191  bool empty_unused;
192  while ((cb_wrapper = reinterpret_cast<CallbackWrapper*>(
193  queue_.PopAndCheckEnd(&empty_unused))) == nullptr) {
194  // This can happen due to a race condition within the mpscq
195  // implementation or because of a race with Run()/Schedule().
197  gpr_log(GPR_INFO, " Queue returned nullptr, trying again");
198  }
199  }
201  gpr_log(GPR_INFO, " Running item %p : callback scheduled at [%s:%d]",
202  cb_wrapper, cb_wrapper->location.file(),
203  cb_wrapper->location.line());
204  }
205  cb_wrapper->callback();
206  delete cb_wrapper;
207  }
208 }
209 
210 //
211 // WorkSerializer
212 //
213 
216 
218 
220  const DebugLocation& location) {
221  impl_->Run(std::move(callback), location);
222 }
223 
225  const DebugLocation& location) {
226  impl_->Schedule(std::move(callback), location);
227 }
228 
229 void WorkSerializer::DrainQueue() { impl_->DrainQueue(); }
230 
231 } // namespace grpc_core
GPR_INFO
#define GPR_INFO
Definition: include/grpc/impl/codegen/log.h:56
grpc_core::WorkSerializer::WorkSerializerImpl::GetSize
static uint64_t GetSize(uint64_t ref_pair)
Definition: work_serializer.cc:65
grpc_core::WorkSerializer::WorkSerializerImpl::CallbackWrapper
Definition: work_serializer.cc:35
grpc_core::DebugLocation
Definition: debug_location.h:31
grpc_core::WorkSerializer::DrainQueue
void DrainQueue()
Definition: work_serializer.cc:229
GPR_DEBUG_ASSERT
#define GPR_DEBUG_ASSERT(x)
Definition: include/grpc/impl/codegen/log.h:103
grpc_core::Orphanable
Definition: orphanable.h:39
uint16_t
unsigned short uint16_t
Definition: stdint-msvc2008.h:79
grpc_core
Definition: call_metric_recorder.h:31
grpc_core::MultiProducerSingleConsumerQueue
Definition: mpscq.h:35
loc
OPENSSL_EXPORT X509_EXTENSION int loc
Definition: x509.h:1418
u
OPENSSL_EXPORT pem_password_cb void * u
Definition: pem.h:351
mkowners.owners
owners
Definition: mkowners.py:119
grpc_core::WorkSerializer::Run
void Run(std::function< void()> callback, const DebugLocation &location)
Definition: work_serializer.cc:219
grpc_core::grpc_work_serializer_trace
DebugOnlyTraceFlag grpc_work_serializer_trace(false, "work_serializer")
grpc_core::WorkSerializer::WorkSerializer
WorkSerializer()
Definition: work_serializer.cc:214
grpc_core::WorkSerializer::WorkSerializerImpl::CallbackWrapper::location
const DebugLocation location
Definition: work_serializer.cc:41
grpc_core::DebugLocation::file
const char * file() const
Definition: debug_location.h:34
grpc_core::WorkSerializer::WorkSerializerImpl::GetOwners
static uint32_t GetOwners(uint64_t ref_pair)
Definition: work_serializer.cc:62
GRPC_TRACE_FLAG_ENABLED
#define GRPC_TRACE_FLAG_ENABLED(f)
Definition: debug/trace.h:114
grpc_core::WorkSerializer::Schedule
void Schedule(std::function< void()> callback, const DebugLocation &location)
Definition: work_serializer.cc:224
grpc_core::WorkSerializer::WorkSerializerImpl::MakeRefPair
static uint64_t MakeRefPair(uint16_t owners, uint64_t size)
Definition: work_serializer.cc:58
uint32_t
unsigned int uint32_t
Definition: stdint-msvc2008.h:80
DEBUG_LOCATION
#define DEBUG_LOCATION
Definition: debug_location.h:41
grpc_core::WorkSerializer::WorkSerializerImpl::CallbackWrapper::callback
const std::function< void()> callback
Definition: work_serializer.cc:40
absl::move
constexpr absl::remove_reference_t< T > && move(T &&t) noexcept
Definition: abseil-cpp/absl/utility/utility.h:221
GPR_ASSERT
#define GPR_ASSERT(x)
Definition: include/grpc/impl/codegen/log.h:94
int64_t
signed __int64 int64_t
Definition: stdint-msvc2008.h:89
grpc_core::DebugLocation::line
int line() const
Definition: debug_location.h:35
gpr_log
GPRAPI void gpr_log(const char *file, int line, gpr_log_severity severity, const char *format,...) GPR_PRINT_FORMAT_CHECK(4
work_serializer.h
uint64_t
unsigned __int64 uint64_t
Definition: stdint-msvc2008.h:90
grpc_core::MultiProducerSingleConsumerQueue::Push
bool Push(Node *node)
Definition: mpscq.cc:29
grpc_core::WorkSerializer::impl_
OrphanablePtr< WorkSerializerImpl > impl_
Definition: work_serializer.h:85
callback
static void callback(void *arg, int status, int timeouts, struct hostent *host)
Definition: acountry.c:224
grpc_core::WorkSerializer::WorkSerializerImpl::CallbackWrapper::mpscq_node
MultiProducerSingleConsumerQueue::Node mpscq_node
Definition: work_serializer.cc:39
grpc_core::WorkSerializer::WorkSerializerImpl::DrainQueue
void DrainQueue()
Definition: work_serializer.cc:136
grpc_core::WorkSerializer::WorkSerializerImpl::Orphan
void Orphan() override
Definition: work_serializer.cc:120
grpc_core::WorkSerializer::WorkSerializerImpl::Run
void Run(std::function< void()> callback, const DebugLocation &location)
Definition: work_serializer.cc:75
grpc_core::MultiProducerSingleConsumerQueue::Node
Definition: mpscq.h:38
std
Definition: grpcpp/impl/codegen/async_unary_call.h:407
grpc_core::MakeOrphanable
OrphanablePtr< T > MakeOrphanable(Args &&... args)
Definition: orphanable.h:67
grpc_core::WorkSerializer::WorkSerializerImpl::DrainQueueOwned
void DrainQueueOwned()
Definition: work_serializer.cc:156
grpc_core::WorkSerializer::WorkSerializerImpl::refs_
std::atomic< uint64_t > refs_
Definition: work_serializer.cc:71
grpc_core::WorkSerializer::WorkSerializerImpl::queue_
MultiProducerSingleConsumerQueue queue_
Definition: work_serializer.cc:72
grpc_core::WorkSerializer::~WorkSerializer
~WorkSerializer()
Definition: work_serializer.cc:217
function
std::function< bool(GrpcTool *, int, const char **, const CliCredentials &, GrpcToolOutputCallback)> function
Definition: grpc_tool.cc:250
size
voidpf void uLong size
Definition: bloaty/third_party/zlib/contrib/minizip/ioapi.h:136
grpc_core::WorkSerializer::WorkSerializerImpl
Definition: work_serializer.cc:27
grpc_core::WorkSerializer::WorkSerializerImpl::CallbackWrapper::CallbackWrapper
CallbackWrapper(std::function< void()> cb, const DebugLocation &loc)
Definition: work_serializer.cc:36
grpc_core::DebugOnlyTraceFlag
TraceFlag DebugOnlyTraceFlag
Definition: debug/trace.h:117
grpc_core::WorkSerializer::WorkSerializerImpl::Schedule
void Schedule(std::function< void()> callback, const DebugLocation &location)
Definition: work_serializer.cc:107
cb
OPENSSL_EXPORT pem_password_cb * cb
Definition: pem.h:351
port_platform.h


grpc
Author(s):
autogenerated on Thu Mar 13 2025 03:01:52