Go to the documentation of this file.
18 #ifndef GRPCPP_IMPL_CODEGEN_SERVER_CALLBACK_HANDLERS_H
19 #define GRPCPP_IMPL_CODEGEN_SERVER_CALLBACK_HANDLERS_H
32 template <
class RequestType,
class ResponseType>
49 auto* allocator_state =
57 param.call, allocator_state, param.call_requester);
58 param.server_context->BeginCompletionOp(
62 if (param.status.ok()) {
63 reactor = grpc::internal::CatchingReactorGetter<ServerUnaryReactor>(
69 if (reactor ==
nullptr) {
78 call->SetupReactor(reactor);
95 *handler_data = allocator_state;
125 reactor_.load(std::memory_order_relaxed)->InternalInlineable());
160 ServerUnaryReactor* reactor =
161 reactor_.load(std::memory_order_relaxed);
162 reactor->OnSendInitialMetadataDone(ok);
163 this->MaybeDone(true);
205 reactor_.load(std::memory_order_relaxed)->OnDone();
218 return reactor_.load(std::memory_order_relaxed);
251 template <
class RequestType,
class ResponseType>
267 param.call, param.call_requester);
271 param.server_context->BeginCompletionOp(
277 if (param.status.ok()) {
279 grpc::internal::CatchingReactorGetter<ServerReadReactor<RequestType>>(
285 if (reactor ==
nullptr) {
293 reader->SetupReactor(reactor);
313 this->MaybeDone(false);
344 ServerReadReactor<RequestType>* reactor =
345 reactor_.load(std::memory_order_relaxed);
346 reactor->OnSendInitialMetadataDone(ok);
347 this->MaybeDone(true);
382 if (GPR_UNLIKELY(!ok)) {
383 ctx_->MaybeMarkCancelledOnRead();
386 this->MaybeDone(
true);
403 reactor_.load(std::memory_order_relaxed)->OnDone();
405 auto call_requester =
std::move(call_requester_);
406 if (
ctx_->context_allocator() !=
nullptr) {
407 ctx_->context_allocator()->Release(
ctx_);
415 return reactor_.load(std::memory_order_relaxed);
435 std::atomic<ServerReadReactor<RequestType>*>
reactor_;
437 std::atomic<intptr_t> callbacks_outstanding_{
442 template <
class RequestType,
class ResponseType>
449 : get_reactor_(
std::
move(get_reactor)) {}
458 param.call,
static_cast<RequestType*
>(param.request),
459 param.call_requester);
463 param.server_context->BeginCompletionOp(
469 if (param.status.ok()) {
476 if (reactor ==
nullptr) {
484 writer->SetupReactor(reactor);
520 this->MaybeDone(false);
523 finish_ops_.set_core_cq_tag(&finish_tag_);
525 if (!
ctx_->sent_initial_metadata_) {
526 finish_ops_.SendInitialMetadata(&
ctx_->initial_metadata_,
527 ctx_->initial_metadata_flags());
528 if (
ctx_->compression_level_set()) {
529 finish_ops_.set_compression_level(
ctx_->compression_level());
531 ctx_->sent_initial_metadata_ =
true;
533 finish_ops_.ServerSendStatus(&
ctx_->trailing_metadata_, s);
534 call_.PerformOps(&finish_ops_);
546 ServerWriteReactor<ResponseType>* reactor =
547 reactor_.load(std::memory_order_relaxed);
548 reactor->OnSendInitialMetadataDone(ok);
549 this->MaybeDone(true);
552 meta_ops_.SendInitialMetadata(&
ctx_->initial_metadata_,
553 ctx_->initial_metadata_flags());
554 if (
ctx_->compression_level_set()) {
555 meta_ops_.set_compression_level(
ctx_->compression_level());
557 ctx_->sent_initial_metadata_ =
true;
558 meta_ops_.set_core_cq_tag(&meta_tag_);
559 call_.PerformOps(&meta_ops_);
567 if (!
ctx_->sent_initial_metadata_) {
568 write_ops_.SendInitialMetadata(&
ctx_->initial_metadata_,
569 ctx_->initial_metadata_flags());
570 if (
ctx_->compression_level_set()) {
571 write_ops_.set_compression_level(
ctx_->compression_level());
573 ctx_->sent_initial_metadata_ =
true;
577 call_.PerformOps(&write_ops_);
597 call_requester_(
std::
move(call_requester)) {}
600 reactor_.store(reactor, std::memory_order_relaxed);
606 [
this, reactor](
bool ok) {
607 reactor->OnWriteDone(ok);
608 this->MaybeDone(true);
611 write_ops_.set_core_cq_tag(&write_tag_);
612 this->BindReactor(reactor);
613 this->MaybeCallOnCancel(reactor);
617 this->MaybeDone(
false);
620 if (
req_ !=
nullptr) {
621 req_->~RequestType();
628 reactor_.load(std::memory_order_relaxed)->OnDone();
630 auto call_requester =
std::move(call_requester_);
631 if (
ctx_->context_allocator() !=
nullptr) {
632 ctx_->context_allocator()->Release(
ctx_);
640 return reactor_.load(std::memory_order_relaxed);
661 std::atomic<ServerWriteReactor<ResponseType>*>
reactor_;
663 std::atomic<intptr_t> callbacks_outstanding_{
668 template <
class RequestType,
class ResponseType>
675 : get_reactor_(
std::
move(get_reactor)) {}
683 param.call, param.call_requester);
687 param.server_context->BeginCompletionOp(
693 if (param.status.ok()) {
700 if (reactor ==
nullptr) {
709 stream->SetupReactor(reactor);
713 std::function<ServerBidiReactor<RequestType, ResponseType>*(
730 this->MaybeDone(false);
733 finish_ops_.set_core_cq_tag(&finish_tag_);
735 if (!
ctx_->sent_initial_metadata_) {
736 finish_ops_.SendInitialMetadata(&
ctx_->initial_metadata_,
737 ctx_->initial_metadata_flags());
738 if (
ctx_->compression_level_set()) {
739 finish_ops_.set_compression_level(
ctx_->compression_level());
741 ctx_->sent_initial_metadata_ =
true;
743 finish_ops_.ServerSendStatus(&
ctx_->trailing_metadata_, s);
744 call_.PerformOps(&finish_ops_);
756 ServerBidiReactor<RequestType, ResponseType>* reactor =
757 reactor_.load(std::memory_order_relaxed);
758 reactor->OnSendInitialMetadataDone(ok);
759 this->MaybeDone(true);
762 meta_ops_.SendInitialMetadata(&
ctx_->initial_metadata_,
763 ctx_->initial_metadata_flags());
764 if (
ctx_->compression_level_set()) {
765 meta_ops_.set_compression_level(
ctx_->compression_level());
767 ctx_->sent_initial_metadata_ =
true;
768 meta_ops_.set_core_cq_tag(&meta_tag_);
769 call_.PerformOps(&meta_ops_);
777 if (!
ctx_->sent_initial_metadata_) {
778 write_ops_.SendInitialMetadata(&
ctx_->initial_metadata_,
779 ctx_->initial_metadata_flags());
780 if (
ctx_->compression_level_set()) {
781 write_ops_.set_compression_level(
ctx_->compression_level());
783 ctx_->sent_initial_metadata_ =
true;
787 call_.PerformOps(&write_ops_);
799 read_ops_.RecvMessage(
req);
800 call_.PerformOps(&read_ops_);
812 reactor_.store(reactor, std::memory_order_relaxed);
818 [
this, reactor](
bool ok) {
819 reactor->OnWriteDone(ok);
820 this->MaybeDone(true);
823 write_ops_.set_core_cq_tag(&write_tag_);
826 [
this, reactor](
bool ok) {
827 if (GPR_UNLIKELY(!ok)) {
828 ctx_->MaybeMarkCancelledOnRead();
831 this->MaybeDone(
true);
834 read_ops_.set_core_cq_tag(&read_tag_);
835 this->BindReactor(reactor);
836 this->MaybeCallOnCancel(reactor);
840 this->MaybeDone(
false);
844 reactor_.load(std::memory_order_relaxed)->OnDone();
846 auto call_requester =
std::move(call_requester_);
847 if (
ctx_->context_allocator() !=
nullptr) {
848 ctx_->context_allocator()->Release(
ctx_);
856 return reactor_.load(std::memory_order_relaxed);
879 std::atomic<ServerBidiReactor<RequestType, ResponseType>*>
reactor_;
881 std::atomic<intptr_t> callbacks_outstanding_{
889 #endif // GRPCPP_IMPL_CODEGEN_SERVER_CALLBACK_HANDLERS_H
grpc::internal::CallOpSet< grpc::internal::CallOpRecvMessage< RequestType > > read_ops_
std::atomic< ServerReadReactor< RequestType > * > reactor_
std::function< void()> call_requester_
void SendInitialMetadata() override
void SetupReactor(ServerBidiReactor< RequestType, ResponseType > *reactor)
ServerReadReactor is the interface for a client-streaming RPC.
grpc::internal::CallOpSet< grpc::internal::CallOpSendInitialMetadata > meta_ops_
std::multimap< std::string, std::string > trailing_metadata_
grpc::internal::CallbackWithSuccessTag meta_tag_
void * Deserialize(grpc_call *call, grpc_byte_buffer *req, grpc::Status *status, void **handler_data) final
grpc::internal::CallbackWithSuccessTag meta_tag_
virtual bool InternalInlineable()
grpc::internal::Call call_
void set_message_allocator_state(RpcAllocatorState *allocator_state)
bool compression_level_set() const
void SendInitialMetadata() override
grpc::internal::CallOpSet< grpc::internal::CallOpSendInitialMetadata, grpc::internal::CallOpSendMessage, grpc::internal::CallOpServerSendStatus > finish_ops_
std::function< void()> call_requester_
void SendInitialMetadata() override
void CallOnDone() override
CallbackClientStreamingHandler(std::function< ServerReadReactor< RequestType > *(grpc::CallbackServerContext *, ResponseType *)> get_reactor)
void SetupReactor(ServerWriteReactor< ResponseType > *reactor)
grpc::internal::CallOpSet< grpc::internal::CallOpSendInitialMetadata, grpc::internal::CallOpSendMessage > write_ops_
void set_core_cq_tag(void *core_cq_tag)
virtual void * grpc_call_arena_alloc(grpc_call *call, size_t length)=0
ResponseType * response()
grpc::CallbackServerContext *const ctx_
virtual MessageHolder< RequestT, ResponseT > * AllocateMessages()=0
std::atomic< intptr_t > callbacks_outstanding_
ServerBidiReactor is the interface for a bidirectional streaming RPC.
CallbackServerStreamingHandler(std::function< ServerWriteReactor< ResponseType > *(grpc::CallbackServerContext *, const RequestType *)> get_reactor)
bool is_last_message() const
grpc::internal::CallbackWithSuccessTag write_tag_
void RunHandler(const HandlerParameter ¶m) final
void CallOnDone() override
Per-message write options.
const RequestType * request()
grpc::internal::CallOpSet< grpc::internal::CallOpSendInitialMetadata > meta_ops_
grpc::internal::CallOpSet< grpc::internal::CallOpSendInitialMetadata, grpc::internal::CallOpSendMessage, grpc::internal::CallOpServerSendStatus > finish_ops_
void CallOnDone() override
ServerCallbackReaderImpl(grpc::CallbackServerContext *ctx, grpc::internal::Call *call, std::function< void()> call_requester)
CoreCodegenInterface * g_core_codegen_interface
void Write(const ResponseType *resp, grpc::WriteOptions options) override
void WriteAndFinish(const ResponseType *resp, grpc::WriteOptions options, grpc::Status s) override
void Read(RequestType *req) override
grpc::internal::CallbackWithSuccessTag read_tag_
const RequestType * request()
grpc_compression_level compression_level() const
Return the compression algorithm to be used by the server call.
constexpr absl::remove_reference_t< T > && move(T &&t) noexcept
~ServerCallbackWriterImpl()
void SetupReactor(ServerReadReactor< RequestType > *reactor)
void Ref()
Increases the reference count.
uint32_t initial_metadata_flags() const
ServerCallbackUnaryImpl(grpc::CallbackServerContext *ctx, grpc::internal::Call *call, MessageHolder< RequestType, ResponseType > *allocator_state, std::function< void()> call_requester)
grpc::internal::CallbackWithSuccessTag finish_tag_
grpc::internal::CallbackWithSuccessTag meta_tag_
ServerCallbackWriterImpl(grpc::CallbackServerContext *ctx, grpc::internal::Call *call, const RequestType *req, std::function< void()> call_requester)
void BindReactor(Reactor *reactor)
void RunHandler(const HandlerParameter ¶m) final
struct grpc_call grpc_call
std::multimap< std::string, std::string > initial_metadata_
void PerformOps(CallOpSetInterface *ops)
void Finish(grpc::Status s) override
void RunHandler(const HandlerParameter ¶m) final
grpc::internal::CallbackWithSuccessTag finish_tag_
grpc::internal::CallOpSet< grpc::internal::CallOpSendInitialMetadata > meta_ops_
ServerReactor * reactor() override
void WriteAndFinish(const ResponseType *resp, grpc::WriteOptions options, grpc::Status s) override
void SetupReactor(ServerUnaryReactor *reactor)
grpc::internal::CallbackWithSuccessTag meta_tag_
void BindReactor(ServerReadReactor< RequestType > *reactor)
void SetMessageAllocator(MessageAllocator< RequestType, ResponseType > *allocator)
Straightforward wrapping of the C call object.
std::function< void()> call_requester_
grpc::internal::CallbackWithSuccessTag read_tag_
CallbackBidiHandler(std::function< ServerBidiReactor< RequestType, ResponseType > *(grpc::CallbackServerContext *)> get_reactor)
bool sent_initial_metadata_
grpc::CallbackServerContext *const ctx_
grpc::internal::CallOpSet< grpc::internal::CallOpSendInitialMetadata, grpc::internal::CallOpSendMessage, grpc::internal::CallOpServerSendStatus > finish_ops_
FinishOnlyReactor< ServerUnaryReactor > UnimplementedUnaryReactor
grpc::internal::CallbackWithSuccessTag finish_tag_
Base class for running an RPC handler.
void Read(RequestType *req) override
grpc::CallbackServerContext *const ctx_
virtual void Release(CallbackServerContext *)
grpc::internal::CallOpSet< grpc::internal::CallOpSendInitialMetadata, grpc::internal::CallOpSendMessage > write_ops_
std::atomic< ServerWriteReactor< ResponseType > * > reactor_
MessageHolder< RequestType, ResponseType > *const allocator_state_
void Finish(grpc::Status s) override
std::atomic< ServerUnaryReactor * > reactor_
void Set(grpc_call *call, std::function< void(bool)> f, CompletionQueueTag *ops, bool can_inline)
virtual void OnReadDone(bool)
double Finish(Counter const &c, IterationCount iterations, double cpu_time, double num_threads)
ResponseType * response()
grpc::internal::Call call_
ContextAllocator * context_allocator() const
grpc::CallbackServerContext *const ctx_
grpc::internal::Call call_
~ServerCallbackReaderImpl()
ServerReactor * reactor() override
void Finish(grpc::Status s) override
grpc::internal::Call call_
virtual void grpc_call_ref(grpc_call *call)=0
#define GPR_CODEGEN_ASSERT(x)
Codegen specific version of GPR_ASSERT.
std::function< void()> call_requester_
ServerCallbackReaderWriterImpl(grpc::CallbackServerContext *ctx, grpc::internal::Call *call, std::function< void()> call_requester)
ServerReactor * reactor() override
std::function< ServerReadReactor< RequestType > *(grpc::CallbackServerContext *, ResponseType *)> get_reactor_
CallbackUnaryHandler(std::function< ServerUnaryReactor *(grpc::CallbackServerContext *, const RequestType *, ResponseType *)> get_reactor)
MessageAllocator< RequestType, ResponseType > * allocator_
grpc::internal::CallbackWithSuccessTag write_tag_
std::atomic< ServerBidiReactor< RequestType, ResponseType > * > reactor_
void * Deserialize(grpc_call *call, grpc_byte_buffer *req, grpc::Status *status, void **) final
Reactor * CatchingReactorGetter(Func &&func, Args &&... args)
internal::RefMatcher< T & > Ref(T &x)
grpc::internal::CallOpSet< grpc::internal::CallOpSendInitialMetadata, grpc::internal::CallOpSendMessage, grpc::internal::CallOpServerSendStatus > finish_ops_
void RunHandler(const HandlerParameter ¶m) final
grpc::internal::CallOpSet< grpc::internal::CallOpRecvMessage< RequestType > > read_ops_
grpc::internal::CallOpSet< grpc::internal::CallOpSendInitialMetadata > meta_ops_
ServerWriteReactor is the interface for a server-streaming RPC.
grpc::internal::CallbackWithSuccessTag finish_tag_
virtual void grpc_call_unref(grpc_call *call)=0
ServerReactor * reactor() override
void CallOnDone() override
void SendInitialMetadata() override
void Finish(grpc::Status s) override
std::function< ServerUnaryReactor *(grpc::CallbackServerContext *, const RequestType *, ResponseType *)> get_reactor_
void Write(const ResponseType *resp, grpc::WriteOptions options) override
grpc
Author(s):
autogenerated on Thu Mar 13 2025 03:01:17