include/grpcpp/impl/codegen/completion_queue.h
Go to the documentation of this file.
1 /*
2  *
3  * Copyright 2015-2016 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
32 #ifndef GRPCPP_IMPL_CODEGEN_COMPLETION_QUEUE_H
33 #define GRPCPP_IMPL_CODEGEN_COMPLETION_QUEUE_H
34 
35 // IWYU pragma: private, include <grpcpp/completion_queue.h>
36 
37 #include <list>
38 
39 #include <grpc/impl/codegen/atm.h>
47 
49 
50 namespace grpc {
51 template <class R>
52 class ClientReader;
53 template <class W>
54 class ClientWriter;
55 template <class W, class R>
56 class ClientReaderWriter;
57 template <class R>
59 template <class W>
61 namespace internal {
62 template <class W, class R>
64 
65 template <class ResponseType>
68  grpc::Status&);
69 template <class ServiceType, class RequestType, class ResponseType,
70  class BaseRequestType, class BaseResponseType>
72 template <class ServiceType, class RequestType, class ResponseType>
74 template <class ServiceType, class RequestType, class ResponseType>
76 template <class Streamer, bool WriteNeeded>
78 template <grpc::StatusCode code>
79 class ErrorMethodHandler;
80 } // namespace internal
81 
82 class Channel;
83 class ChannelInterface;
84 class Server;
85 class ServerBuilder;
86 class ServerContextBase;
87 class ServerInterface;
88 
89 namespace internal {
90 class CompletionQueueTag;
91 class RpcMethod;
92 template <class InputMessage, class OutputMessage>
94 template <class Op1, class Op2, class Op3, class Op4, class Op5, class Op6>
95 class CallOpSet;
96 } // namespace internal
97 
99 
105  public:
111  nullptr}) {}
112 
116  explicit CompletionQueue(grpc_completion_queue* take);
117 
119  ~CompletionQueue() override {
121  }
122 
124  enum NextStatus {
127  TIMEOUT
129  };
130 
179  bool Next(void** tag, bool* ok) {
180  // Check return type == GOT_EVENT... cases:
181  // SHUTDOWN - queue has been shutdown, return false.
182  // TIMEOUT - we passed infinity time => queue has been shutdown, return
183  // false.
184  // GOT_EVENT - we actually got an event, return true.
185  return (AsyncNextInternal(tag, ok,
188  }
189 
201  template <typename T>
202  NextStatus AsyncNext(void** tag, bool* ok, const T& deadline) {
203  grpc::TimePoint<T> deadline_tp(deadline);
204  return AsyncNextInternal(tag, ok, deadline_tp.raw_time());
205  }
206 
221  template <typename T, typename F>
222  NextStatus DoThenAsyncNext(F&& f, void** tag, bool* ok, const T& deadline) {
224  f();
225  if (cache.Flush(tag, ok)) {
226  return GOT_EVENT;
227  } else {
228  return AsyncNext(tag, ok, deadline);
229  }
230  }
231 
242  void Shutdown();
243 
249  grpc_completion_queue* cq() { return cq_; }
250 
251  protected:
253  explicit CompletionQueue(const grpc_completion_queue_attributes& attributes) {
256  &attributes),
257  &attributes, nullptr);
258  InitialAvalanching(); // reserve this for the future shutdown
259  }
260 
261  private:
262  // Friends for access to server registration lists that enable checking and
263  // logging on shutdown
264  friend class grpc::ServerBuilder;
265  friend class grpc::Server;
266 
267  // Friend synchronous wrappers so that they can access Pluck(), which is
268  // a semi-private API geared towards the synchronous implementation.
269  template <class R>
270  friend class grpc::ClientReader;
271  template <class W>
272  friend class grpc::ClientWriter;
273  template <class W, class R>
275  template <class R>
276  friend class grpc::ServerReader;
277  template <class W>
278  friend class grpc::ServerWriter;
279  template <class W, class R>
281  template <class ResponseType>
284  grpc::Status&);
285  template <class ServiceType, class RequestType, class ResponseType>
287  template <class ServiceType, class RequestType, class ResponseType>
289  template <class Streamer, bool WriteNeeded>
291  template <grpc::StatusCode code>
294  friend class grpc::ServerInterface;
295  template <class InputMessage, class OutputMessage>
297 
298  // Friends that need access to constructor for callback CQ
299  friend class grpc::Channel;
300 
301  // For access to Register/CompleteAvalanching
302  template <class Op1, class Op2, class Op3, class Op4, class Op5, class Op6>
304 
310  public:
313  bool Flush(void** tag, bool* ok);
314 
315  private:
317  bool flushed_;
318  };
319 
320  NextStatus AsyncNextInternal(void** tag, bool* ok, gpr_timespec deadline);
321 
325  auto deadline =
327  while (true) {
329  cq_, tag, deadline, nullptr);
330  bool ok = ev.success != 0;
331  void* ignored = tag;
332  if (tag->FinalizeResult(&ignored, &ok)) {
333  GPR_CODEGEN_ASSERT(ignored == tag);
334  return ok;
335  }
336  }
337  }
338 
348  auto deadline =
351  cq_, tag, deadline, nullptr);
352  if (ev.type == GRPC_QUEUE_TIMEOUT) return;
353  bool ok = ev.success != 0;
354  void* ignored = tag;
355  // the tag must be swallowed if using TryPluck
356  GPR_CODEGEN_ASSERT(!tag->FinalizeResult(&ignored, &ok));
357  }
358 
365  gpr_timespec deadline) {
367  cq_, tag, deadline, nullptr);
368  if (ev.type == GRPC_QUEUE_TIMEOUT || ev.type == GRPC_QUEUE_SHUTDOWN) {
369  return;
370  }
371 
372  bool ok = ev.success != 0;
373  void* ignored = tag;
374  GPR_CODEGEN_ASSERT(!tag->FinalizeResult(&ignored, &ok));
375  }
376 
385  }
388  static_cast<gpr_atm>(1));
389  }
392  static_cast<gpr_atm>(-1)) == 1) {
394  }
395  }
396 
398  (void)server;
399 #ifndef NDEBUG
401  server_list_.push_back(server);
402 #endif
403  }
405  (void)server;
406 #ifndef NDEBUG
408  server_list_.remove(server);
409 #endif
410  }
411  bool ServerListEmpty() const {
412 #ifndef NDEBUG
414  return server_list_.empty();
415 #endif
416  return true;
417  }
418 
421 
423 
425 
426  // List of servers associated with this CQ. Even though this is only used with
427  // NDEBUG, instantiate it in all cases since otherwise the size will be
428  // inconsistent.
430  std::list<const grpc::Server*>
431  server_list_ /* GUARDED_BY(server_list_mutex_) */;
432 };
433 
437  public:
439 
440  protected:
443 
444  private:
452  grpc_cq_polling_type polling_type,
455  GRPC_CQ_CURRENT_VERSION, completion_type, polling_type,
456  shutdown_cb}),
457  polling_type_(polling_type) {}
458 
460  friend class grpc::ServerBuilder;
461  friend class grpc::Server;
462 };
463 
464 } // namespace grpc
465 
466 #endif // GRPCPP_IMPL_CODEGEN_COMPLETION_QUEUE_H
grpc::CompletionQueue::TryPluck
void TryPluck(grpc::internal::CompletionQueueTag *tag)
Definition: include/grpcpp/impl/codegen/completion_queue.h:347
grpc::ServerCompletionQueue
Definition: include/grpcpp/impl/codegen/completion_queue.h:436
grpc::CompletionQueue::SHUTDOWN
@ SHUTDOWN
The completion queue has been shutdown and fully-drained.
Definition: include/grpcpp/impl/codegen/completion_queue.h:125
grpc::ServerWriter
Definition: include/grpcpp/impl/codegen/completion_queue.h:60
atm.h
grpc._simple_stubs.RequestType
RequestType
Definition: _simple_stubs.py:27
grpc._simple_stubs.ResponseType
ResponseType
Definition: _simple_stubs.py:28
grpc::CompletionQueue::RegisterAvalanching
void RegisterAvalanching()
Definition: include/grpcpp/impl/codegen/completion_queue.h:386
GRPC_CQ_NEXT
@ GRPC_CQ_NEXT
Definition: grpc_types.h:760
grpc::internal::Mutex
Definition: include/grpcpp/impl/codegen/sync.h:59
rpc_service_method.h
grpc
Definition: grpcpp/alarm.h:33
grpc::CompletionQueue::GOT_EVENT
@ GOT_EVENT
Definition: include/grpcpp/impl/codegen/completion_queue.h:126
grpc_cq_polling_type
grpc_cq_polling_type
Definition: grpc_types.h:740
grpc::CompletionQueue::CompletionQueueTLSCache::CompletionQueueTLSCache
CompletionQueueTLSCache(CompletionQueue *cq)
Definition: completion_queue_cc.cc:170
grpc::CompletionQueue::CompleteAvalanching
void CompleteAvalanching()
Definition: include/grpcpp/impl/codegen/completion_queue.h:390
grpc::internal::BlockingUnaryCallImpl
Definition: grpcpp/impl/codegen/channel_interface.h:69
grpc::CoreCodegenInterface::grpc_completion_queue_shutdown
virtual void grpc_completion_queue_shutdown(grpc_completion_queue *cq)=0
grpc_cq_completion_type
grpc_cq_completion_type
Definition: grpc_types.h:758
grpc::CompletionQueue::CompletionQueue
CompletionQueue(const grpc_completion_queue_attributes &attributes)
Private constructor of CompletionQueue only visible to friend classes.
Definition: include/grpcpp/impl/codegen/completion_queue.h:253
grpc::ServerCompletionQueue::polling_type_
grpc_cq_polling_type polling_type_
Definition: include/grpcpp/impl/codegen/completion_queue.h:459
grpc::CompletionQueue::CompletionQueueTLSCache::cq_
CompletionQueue * cq_
Definition: include/grpcpp/impl/codegen/completion_queue.h:316
grpc::CoreCodegenInterface::gpr_inf_future
virtual gpr_timespec gpr_inf_future(gpr_clock_type type)=0
GRPC_QUEUE_SHUTDOWN
@ GRPC_QUEUE_SHUTDOWN
Definition: grpc_types.h:554
grpc::CompletionQueue::NextStatus
NextStatus
Tri-state return for AsyncNext: SHUTDOWN, GOT_EVENT, TIMEOUT.
Definition: include/grpcpp/impl/codegen/completion_queue.h:124
gpr_inf_future
GPRAPI gpr_timespec gpr_inf_future(gpr_clock_type type)
Definition: src/core/lib/gpr/time.cc:55
core_codegen_interface.h
grpc::internal::ErrorMethodHandler
Definition: include/grpcpp/impl/codegen/byte_buffer.h:49
grpc::CompletionQueue::~CompletionQueue
~CompletionQueue() override
Destructor. Destroys the owned wrapped completion queue / instance.
Definition: include/grpcpp/impl/codegen/completion_queue.h:119
grpc::CompletionQueue::CompletionQueueTLSCache
Definition: include/grpcpp/impl/codegen/completion_queue.h:309
grpc::CompletionQueue::TryPluck
void TryPluck(grpc::internal::CompletionQueueTag *tag, gpr_timespec deadline)
Definition: include/grpcpp/impl/codegen/completion_queue.h:364
grpc::internal::RpcMethodHandler
A wrapper class of an application provided rpc method handler.
Definition: include/grpcpp/impl/codegen/completion_queue.h:71
grpc::GrpcLibraryCodegen
Classes that require gRPC to be initialized should inherit from this class.
Definition: grpcpp/impl/codegen/grpc_library.h:40
grpc::TimePoint
Definition: include/grpcpp/impl/codegen/time.h:42
grpc::CompletionQueue::server_list_
std::list< const grpc::Server * > server_list_
Definition: include/grpcpp/impl/codegen/completion_queue.h:431
grpc::Channel
Channels represent a connection to an endpoint. Created by CreateChannel.
Definition: include/grpcpp/channel.h:54
grpc::internal::MutexLock
Definition: include/grpcpp/impl/codegen/sync.h:86
T
#define T(upbtypeconst, upbtype, ctype, default_value)
grpc::CompletionQueue::AsyncNext
NextStatus AsyncNext(void **tag, bool *ok, const T &deadline)
Definition: include/grpcpp/impl/codegen/completion_queue.h:202
grpc::CompletionQueue::Pluck
bool Pluck(grpc::internal::CompletionQueueTag *tag)
Definition: include/grpcpp/impl/codegen/completion_queue.h:324
grpc::g_core_codegen_interface
CoreCodegenInterface * g_core_codegen_interface
Definition: include/grpcpp/impl/codegen/completion_queue.h:98
grpc::ChannelInterface
Codegen interface for grpc::Channel.
Definition: grpcpp/impl/codegen/channel_interface.h:73
grpc::CompletionQueue::cq_
grpc_completion_queue * cq_
Definition: include/grpcpp/impl/codegen/completion_queue.h:422
grpc::ClientReaderWriter
Definition: grpcpp/impl/codegen/channel_interface.h:35
grpc::CompletionQueue::cq
grpc_completion_queue * cq()
Definition: include/grpcpp/impl/codegen/completion_queue.h:249
sync.h
GRPC_CQ_DEFAULT_POLLING
@ GRPC_CQ_DEFAULT_POLLING
Definition: grpc_types.h:743
GRPC_CQ_NON_LISTENING
@ GRPC_CQ_NON_LISTENING
Definition: grpc_types.h:748
grpc::ServerContextBase
Base class of ServerContext.
Definition: grpcpp/impl/codegen/server_context.h:126
grpc::internal::RpcMethod
Descriptor of an RPC method.
Definition: grpcpp/impl/codegen/rpc_method.h:31
tag
static void * tag(intptr_t t)
Definition: bad_client.cc:318
grpc::CompletionQueue::ReleaseCallbackAlternativeCQ
static void ReleaseCallbackAlternativeCQ(CompletionQueue *cq)
Definition: completion_queue_cc.cc:202
grpc::CompletionQueue::RegisterServer
void RegisterServer(const grpc::Server *server)
Definition: include/grpcpp/impl/codegen/completion_queue.h:397
grpc::ServerBuilder
A builder class for the creation and startup of grpc::Server instances.
Definition: grpcpp/server_builder.h:86
grpc_completion_queue_factory_lookup
const GRPCAPI grpc_completion_queue_factory * grpc_completion_queue_factory_lookup(const grpc_completion_queue_attributes *attributes)
Definition: completion_queue_factory.cc:48
grpc::CompletionQueue::CompletionQueue
CompletionQueue()
Definition: include/grpcpp/impl/codegen/completion_queue.h:108
grpc_completion_queue
Definition: completion_queue.cc:347
grpc::internal::ServerReaderWriterBody
Definition: include/grpcpp/impl/codegen/completion_queue.h:63
grpc::CompletionQueue::UnregisterServer
void UnregisterServer(const grpc::Server *server)
Definition: include/grpcpp/impl/codegen/completion_queue.h:404
grpc::CoreCodegenInterface::grpc_completion_queue_destroy
virtual void grpc_completion_queue_destroy(grpc_completion_queue *cq)=0
completion_queue_tag.h
grpc::CompletionQueue::CallbackAlternativeCQ
static CompletionQueue * CallbackAlternativeCQ()
Definition: completion_queue_cc.cc:196
grpc::ServerCompletionQueue::IsFrequentlyPolled
bool IsFrequentlyPolled()
Definition: include/grpcpp/impl/codegen/completion_queue.h:438
grpc::ClientWriter
Definition: grpcpp/impl/codegen/channel_interface.h:33
gpr_atm_rel_store
#define gpr_atm_rel_store(p, value)
Definition: impl/codegen/atm_gcc_atomic.h:54
gpr_atm_no_barrier_fetch_add
#define gpr_atm_no_barrier_fetch_add(p, delta)
Definition: impl/codegen/atm_gcc_atomic.h:59
grpc::ServerInterface
Definition: grpcpp/impl/codegen/server_interface.h:61
grpc::CompletionQueue::CompletionQueueTLSCache::flushed_
bool flushed_
Definition: include/grpcpp/impl/codegen/completion_queue.h:317
grpc_completion_queue_attributes
Definition: grpc_types.h:791
grpc::internal::CallOpSet
Definition: call_op_set.h:859
grpc::CompletionQueue::server_list_mutex_
grpc::internal::Mutex server_list_mutex_
Definition: include/grpcpp/impl/codegen/completion_queue.h:429
grpc::CompletionQueue::avalanches_in_flight_
gpr_atm avalanches_in_flight_
Definition: include/grpcpp/impl/codegen/completion_queue.h:424
grpc_library.h
gpr_atm
intptr_t gpr_atm
Definition: impl/codegen/atm_gcc_atomic.h:32
shutdown_cb
static void shutdown_cb(uv_shutdown_t *req, int status)
Definition: benchmark-tcp-write-batch.c:80
status.h
grpc::CompletionQueue::Next
bool Next(void **tag, bool *ok)
Definition: include/grpcpp/impl/codegen/completion_queue.h:179
grpc::CoreCodegenInterface::grpc_completion_queue_create
virtual grpc_completion_queue * grpc_completion_queue_create(const grpc_completion_queue_factory *factory, const grpc_completion_queue_attributes *attributes, void *reserved)=0
server
Definition: examples/python/async_streaming/server.py:1
grpc::Server
Definition: include/grpcpp/server.h:59
GRPC_CQ_CURRENT_VERSION
#define GRPC_CQ_CURRENT_VERSION
Definition: grpc_types.h:789
grpc::CompletionQueue::CompletionQueueTLSCache::~CompletionQueueTLSCache
~CompletionQueueTLSCache()
Definition: completion_queue_cc.cc:176
grpc::CoreCodegenInterface
Definition: grpcpp/impl/codegen/core_codegen_interface.h:40
grpc::CoreCodegenInterface::gpr_time_0
virtual gpr_timespec gpr_time_0(gpr_clock_type type)=0
grpc::CompletionQueue::ServerListEmpty
bool ServerListEmpty() const
Definition: include/grpcpp/impl/codegen/completion_queue.h:411
grpc::CompletionQueue::CompletionQueueTLSCache::Flush
bool Flush(void **tag, bool *ok)
Definition: completion_queue_cc.cc:180
grpc::CompletionQueue::TIMEOUT
@ TIMEOUT
deadline was reached.
Definition: include/grpcpp/impl/codegen/completion_queue.h:128
grpc::CompletionQueue::Shutdown
void Shutdown()
Definition: completion_queue_cc.cc:137
grpc::ServerReader
Definition: include/grpcpp/impl/codegen/completion_queue.h:58
grpc::internal::MethodHandler::HandlerParameter
Definition: grpcpp/impl/codegen/rpc_service_method.h:43
grpc::CompletionQueue::InitialAvalanching
void InitialAvalanching()
Definition: include/grpcpp/impl/codegen/completion_queue.h:383
grpc::Status
Definition: include/grpcpp/impl/codegen/status.h:35
ok
bool ok
Definition: async_end2end_test.cc:197
grpc_completion_queue_functor
Definition: grpc_types.h:773
grpc::CompletionQueue::DoThenAsyncNext
NextStatus DoThenAsyncNext(F &&f, void **tag, bool *ok, const T &deadline)
Definition: include/grpcpp/impl/codegen/completion_queue.h:222
grpc::ServerCompletionQueue::ServerCompletionQueue
ServerCompletionQueue(grpc_cq_completion_type completion_type, grpc_cq_polling_type polling_type, grpc_completion_queue_functor *shutdown_cb)
Definition: include/grpcpp/impl/codegen/completion_queue.h:451
GPR_CODEGEN_ASSERT
#define GPR_CODEGEN_ASSERT(x)
Codegen specific version of GPR_ASSERT.
Definition: grpcpp/impl/codegen/core_codegen_interface.h:151
time.h
internal
Definition: benchmark/test/output_test_helper.cc:20
grpc::ServerCompletionQueue::ServerCompletionQueue
ServerCompletionQueue()
Default constructor.
Definition: include/grpcpp/impl/codegen/completion_queue.h:442
grpc::CompletionQueue
Definition: include/grpcpp/impl/codegen/completion_queue.h:104
gpr_timespec
Definition: gpr_types.h:50
grpc::internal::TemplatedBidiStreamingHandler
Definition: include/grpcpp/impl/codegen/completion_queue.h:77
grpc::internal::CompletionQueueTag
An interface allowing implementors to process and filter event tags.
Definition: grpcpp/impl/codegen/completion_queue_tag.h:28
GPR_CLOCK_REALTIME
@ GPR_CLOCK_REALTIME
Definition: gpr_types.h:39
grpc::TimePoint::raw_time
gpr_timespec raw_time()=delete
GRPC_QUEUE_TIMEOUT
@ GRPC_QUEUE_TIMEOUT
Definition: grpc_types.h:556
grpc::internal::ServerStreamingHandler
A wrapper class of an application provided server streaming handler.
Definition: include/grpcpp/impl/codegen/byte_buffer.h:47
grpc::CompletionQueue::AsyncNextInternal
NextStatus AsyncNextInternal(void **tag, bool *ok, gpr_timespec deadline)
Definition: completion_queue_cc.cc:148
grpc::internal::ClientStreamingHandler
A wrapper class of an application provided client streaming handler.
Definition: include/grpcpp/impl/codegen/completion_queue.h:73
grpc::internal::UnaryRunHandlerHelper
void UnaryRunHandlerHelper(const grpc::internal::MethodHandler::HandlerParameter &, ResponseType *, grpc::Status &)
Definition: impl/codegen/method_handler.h:59
grpc::ClientReader
Definition: grpcpp/impl/codegen/channel_interface.h:31
grpc::CoreCodegenInterface::grpc_completion_queue_pluck
virtual grpc_event grpc_completion_queue_pluck(grpc_completion_queue *cq, void *tag, gpr_timespec deadline, void *reserved)=0


grpc
Author(s):
autogenerated on Thu Mar 13 2025 02:58:52