28 #include <type_traits>
32 #include "absl/memory/memory.h"
34 #include <grpc/byte_buffer.h>
94 #define DEFAULT_MAX_SYNC_SERVER_THREADS INT_MAX
98 const char* kServerThreadpoolExhausted =
"Server Threadpool Exhausted";
103 const char* kUnknownRpcMethod =
"";
105 class DefaultGlobalCallbacks final :
public Server::GlobalCallbacks {
107 ~DefaultGlobalCallbacks()
override {}
108 void PreSynchronousRequest(ServerContext* )
override {}
109 void PostSynchronousRequest(ServerContext* )
override {}
112 std::shared_ptr<Server::GlobalCallbacks> g_callbacks =
nullptr;
115 void InitGlobalCallbacks() {
117 g_callbacks.reset(
new DefaultGlobalCallbacks());
121 class ShutdownTag :
public internal::CompletionQueueTag {
123 bool FinalizeResult(
void** ,
bool* )
override {
128 class PhonyTag :
public internal::CompletionQueueTag {
130 bool FinalizeResult(
void** ,
bool* )
override {
135 class UnimplementedAsyncRequestContext {
137 UnimplementedAsyncRequestContext() : generic_stream_(&server_context_) {}
139 GenericServerContext server_context_;
153 notification_cq_(notification_cq),
155 delete_on_finalize_(delete_on_finalize),
157 done_intercepting_(
false) {
167 call_cq_->CompleteAvalanching();
172 if (done_intercepting_) {
174 if (delete_on_finalize_) {
181 if (call_wrapper_.call() ==
nullptr) {
188 stream_->BindCall(&call_wrapper_);
190 if (*
status &&
call_ && call_wrapper_.server_rpc_info()) {
191 done_intercepting_ =
true;
193 interceptor_methods_.AddInterceptionHookPoint(
195 interceptor_methods_.SetRecvInitialMetadata(&
context_->client_metadata_);
196 if (interceptor_methods_.RunInterceptors(
197 [
this]() { ContinueFinalizeResultAfterInterception(); })) {
207 context_->BeginCompletionOp(&call_wrapper_,
nullptr,
nullptr);
210 if (delete_on_finalize_) {
218 context_->BeginCompletionOp(&call_wrapper_,
nullptr,
nullptr);
246 payload, call_cq_->cq(), notification_cq->
cq(),
255 delete_on_finalize) {
262 context->client_metadata_.arr(),
263 call_cq->
cq(), notification_cq->
cq(),
270 if (done_intercepting_) {
279 context_->deadline_ = call_details_.deadline;
288 *
server_->interceptor_creators()));
310 auto*
callback =
static_cast<ShutdownCallback*
>(
cb);
316 CompletionQueue*
cq_ =
nullptr;
325 :
private grpc::UnimplementedAsyncRequestContext,
333 bool FinalizeResult(
void**
tag,
bool*
status)
override;
343 grpc::internal::CallOpSendInitialMetadata,
344 grpc::internal::CallOpServerSendStatus> {
372 data->optional_payload = has_request_payload_ ? &request_payload_ :
nullptr;
381 data->details = call_details_;
388 if (has_request_payload_ && request_payload_) {
391 if (call_details_ !=
nullptr) {
393 delete call_details_;
396 server_->UnrefWithPossibleNotify();
410 void Run(
const std::shared_ptr<GlobalCallbacks>& global_callbacks,
415 ctx_->ctx.set_server_rpc_info(method_->name(), method_->method_type(),
416 server_->interceptor_creators_));
419 request_metadata_.count = 0;
421 global_callbacks_ = global_callbacks;
422 resources_ = resources;
424 interceptor_methods_.SetCall(&*wrapped_call_);
425 interceptor_methods_.SetReverse();
427 interceptor_methods_.AddInterceptionHookPoint(
429 interceptor_methods_.SetRecvInitialMetadata(&
ctx_->ctx.client_metadata_);
431 if (has_request_payload_) {
433 auto*
handler = resources_ ? method_->handler()
434 :
server_->resource_exhausted_handler_.get();
435 deserialized_request_ =
handler->Deserialize(
call_, request_payload_,
436 &request_status_,
nullptr);
437 if (!request_status_.ok()) {
440 request_payload_ =
nullptr;
441 interceptor_methods_.AddInterceptionHookPoint(
443 interceptor_methods_.SetRecvMessage(deserialized_request_,
nullptr);
446 if (interceptor_methods_.RunInterceptors(
447 [
this]() { ContinueRunAfterInterception(); })) {
448 ContinueRunAfterInterception();
456 ctx_->ctx.BeginCompletionOp(&*wrapped_call_,
nullptr,
nullptr);
457 global_callbacks_->PreSynchronousRequest(&
ctx_->ctx);
458 auto*
handler = resources_ ? method_->handler()
459 :
server_->resource_exhausted_handler_.get();
461 &*wrapped_call_, &
ctx_->ctx, deserialized_request_, request_status_,
463 global_callbacks_->PostSynchronousRequest(&
ctx_->ctx);
471 grpc::PhonyTag ignored_tag;
475 wrapped_call_.Destroy();
498 template <
class CallAllocation>
502 data->tag =
static_cast<void*
>(
this);
504 data->initial_metadata = &request_metadata_;
520 void* deserialized_request_ =
nullptr;
529 :
ctx(deadline, arr) {}
536 template <
class ServerContextType>
542 "ServerContextType must be derived from CallbackServerContext");
559 ?
server_->context_allocator()->NewCallbackServerContext()
563 data->optional_payload = has_request_payload_ ? &request_payload_ :
nullptr;
572 has_request_payload_(
false),
578 ->NewGenericCallbackServerContext()
582 data->details = call_details_;
586 delete call_details_;
588 if (has_request_payload_ && request_payload_) {
591 if (ctx_alloc_by_default_ ||
server_->context_allocator() ==
nullptr) {
592 default_ctx_.Destroy();
594 server_->UnrefWithPossibleNotify();
599 bool FinalizeResult(
void**
tag,
bool*
status)
override;
609 functor_run = &CallbackCallTag::StaticRun;
632 void* ignored =
req_;
647 req_->ctx_->BindDeadlineAndMetadata(
req_->deadline_,
648 &
req_->request_metadata_);
649 req_->request_metadata_.count = 0;
656 req_->server_->max_receive_message_size(),
657 req_->ctx_->set_server_rpc_info(
659 (
req_->method_ !=
nullptr)
660 ?
req_->method_->method_type()
662 req_->server_->interceptor_creators_));
664 req_->interceptor_methods_.SetCall(
call_);
665 req_->interceptor_methods_.SetReverse();
667 req_->interceptor_methods_.AddInterceptionHookPoint(
669 POST_RECV_INITIAL_METADATA);
670 req_->interceptor_methods_.SetRecvInitialMetadata(
671 &
req_->ctx_->client_metadata_);
673 if (
req_->has_request_payload_) {
675 req_->request_ =
req_->method_->handler()->Deserialize(
676 req_->call_,
req_->request_payload_, &
req_->request_status_,
677 &
req_->handler_data_);
678 if (!(
req_->request_status_.ok())) {
681 req_->request_payload_ =
nullptr;
682 req_->interceptor_methods_.AddInterceptionHookPoint(
684 req_->interceptor_methods_.SetRecvMessage(
req_->request_,
nullptr);
687 if (
req_->interceptor_methods_.RunInterceptors(
688 [
this] { ContinueRunAfterInterception(); })) {
689 ContinueRunAfterInterception();
697 ?
req_->method_->handler()
698 :
req_->server_->generic_handler_.get();
701 req_->handler_data_, [
this] { delete req_; }));
705 template <
class CallAllocation>
709 data->tag =
static_cast<void*
>(&
tag_);
711 data->initial_metadata = &request_metadata_;
712 if (
ctx_ ==
nullptr) {
714 ctx_ = &*default_ctx_;
715 ctx_alloc_by_default_ =
true;
717 ctx_->set_context_allocator(
server->context_allocator());
726 void* handler_data_ =
nullptr;
733 bool ctx_alloc_by_default_ =
false;
735 ServerContextType*
ctx_ =
nullptr;
764 return method_->name();
770 return ctx_->method().c_str();
779 std::shared_ptr<GlobalCallbacks> global_callbacks,
781 int max_pollers,
int cq_timeout_msec)
784 server_cq_(server_cq),
785 cq_timeout_msec_(cq_timeout_msec),
786 global_callbacks_(
std::
move(global_callbacks)) {}
796 switch (server_cq_->AsyncNext(
tag,
ok, deadline)) {
818 sync_req->
Run(global_callbacks_, resources);
824 grpc_core::Server::RegisteredCallAllocation result;
825 new SyncRequest(server_, method, &result);
828 has_sync_method_ =
true;
832 if (has_sync_method_) {
833 unknown_method_ = absl::make_unique<grpc::internal::RpcServiceMethod>(
838 grpc_core::Server::BatchCallAllocation result;
839 new SyncRequest(server_, unknown_method_.get(), &result);
847 server_cq_->Shutdown();
855 while (server_cq_->Next(&
tag, &
ok)) {
863 if (has_sync_method_) {
872 bool has_sync_method_ =
false;
880 std::shared_ptr<std::vector<std::unique_ptr<grpc::ServerCompletionQueue>>>
882 int min_pollers,
int max_pollers,
int sync_cq_timeout_msec,
883 std::vector<std::shared_ptr<grpc::internal::ExternalConnectionAcceptorImpl>>
888 std::unique_ptr<grpc::experimental::ServerInterceptorFactoryInterface>>
890 : acceptors_(
std::
move(acceptors)),
892 max_receive_message_size_(INT_MIN),
893 sync_server_cqs_(
std::
move(sync_server_cqs)),
896 shutdown_notified_(
false),
899 health_check_service_disabled_(
false) {
901 gpr_once_init(&grpc::g_once_init_callbacks, grpc::InitGlobalCallbacks);
902 global_callbacks_ = grpc::g_callbacks;
903 global_callbacks_->UpdateArguments(
args);
905 if (sync_server_cqs_ !=
nullptr) {
906 bool default_rq_created =
false;
907 if (server_rq ==
nullptr) {
911 default_rq_created =
true;
914 for (
const auto&
it : *sync_server_cqs_) {
915 sync_req_mgrs_.emplace_back(
new SyncRequestThreadManager(
916 this,
it.get(), global_callbacks_, server_rq, min_pollers,
917 max_pollers, sync_cq_timeout_msec));
920 if (default_rq_created) {
925 for (
auto& acceptor : acceptors_) {
926 acceptor->SetToChannelArgs(
args);
930 args->SetChannelArgs(&channel_args);
932 for (
size_t i = 0;
i < channel_args.
num_args;
i++) {
933 if (0 == strcmp(channel_args.
args[
i].
key,
936 health_check_service_disabled_ =
true;
938 health_check_service_.reset(
960 for (
const auto&
value : sync_req_mgrs_) {
964 callback_cq_.load(std::memory_order_relaxed);
965 if (callback_cq !=
nullptr) {
970 CompletionQueue::ReleaseCallbackAlternativeCQ(callback_cq);
972 callback_cq_.store(
nullptr, std::memory_order_release);
979 health_check_service_.reset();
991 std::shared_ptr<grpc::Channel> Server::InProcessChannel(
996 std::vector<std::unique_ptr<
1000 std::shared_ptr<grpc::Channel>
1001 Server::experimental_type::InProcessChannelWithInterceptors(
1004 std::unique_ptr<grpc::experimental::ClientInterceptorFactoryInterface>>
1005 interceptor_creators) {
1015 switch (
method->method_type()) {
1027 bool has_async_methods =
service->has_async_methods();
1028 if (has_async_methods) {
1030 "Can only register an asynchronous service against one server.");
1044 if (method_registration_tag ==
nullptr) {
1050 if (
method->handler() ==
nullptr) {
1051 method->set_server_tag(method_registration_tag);
1052 }
else if (
method->api_type() ==
1054 for (
const auto&
value : sync_req_mgrs_) {
1055 value->AddSyncMethod(
method.get(), method_registration_tag);
1058 has_callback_methods_ =
true;
1062 cq->cq(), method_registration_tag, [
this,
cq, method_value] {
1063 grpc_core::Server::RegisteredCallAllocation result;
1064 new CallbackRequest<grpc::CallbackServerContext>(this, method_value,
1077 if (std::getline(ss, service_name,
'/') &&
1078 std::getline(ss, service_name,
'/')) {
1079 services_.push_back(service_name);
1087 "Can only register an async generic service against one server.");
1089 has_async_generic_service_ =
true;
1092 void Server::RegisterCallbackGenericService(
1095 service->server_ ==
nullptr &&
1096 "Can only register a callback generic service against one server.");
1098 has_callback_generic_service_ =
true;
1099 generic_handler_.reset(
service->Handler());
1104 grpc_core::Server::BatchCallAllocation result;
1105 new CallbackRequest<grpc::GenericCallbackServerContext>(this, cq, &result);
1114 global_callbacks_->AddPort(
this,
addr, creds,
port);
1119 shutdown_refs_outstanding_.fetch_add(1, std::memory_order_relaxed);
1122 void Server::UnrefWithPossibleNotify() {
1124 1, std::memory_order_acq_rel) == 1)) {
1129 shutdown_done_ =
true;
1130 shutdown_done_cv_.Signal();
1134 void Server::UnrefAndWaitLocked() {
1136 1, std::memory_order_acq_rel) == 1)) {
1137 shutdown_done_ =
true;
1140 while (!shutdown_done_) {
1141 shutdown_done_cv_.Wait(&
mu_);
1147 global_callbacks_->PreServerStart(
this);
1154 default_health_check_service_impl =
nullptr;
1155 if (health_check_service_ ==
nullptr && !health_check_service_disabled_ &&
1158 health_check_service_.reset(default_hc_service);
1167 default_health_check_service_impl =
1168 default_hc_service->GetHealthCheckService(
1169 std::unique_ptr<grpc::ServerCompletionQueue>(health_check_cq));
1170 RegisterService(
nullptr, default_health_check_service_impl);
1173 for (
auto& acceptor : acceptors_) {
1174 acceptor->GetCredentials()->AddPortToServer(acceptor->name(),
server_);
1180 if (has_callback_methods_ && !has_callback_generic_service_) {
1186 for (
size_t i = 0;
i < num_cqs;
i++) {
1187 cq_list_.push_back(cqs[
i]);
1201 bool unknown_rpc_needed =
1202 !has_async_generic_service_ && !has_callback_generic_service_;
1204 if (unknown_rpc_needed && !sync_req_mgrs_.empty()) {
1205 sync_req_mgrs_[0]->AddUnknownSyncMethod();
1206 unknown_rpc_needed =
false;
1211 if (unknown_rpc_needed) {
1212 for (
size_t i = 0;
i < num_cqs;
i++) {
1213 if (cqs[
i]->IsFrequentlyPolled()) {
1217 if (health_check_cq !=
nullptr) {
1220 unknown_rpc_needed =
false;
1226 if (sync_server_cqs_ !=
nullptr && !sync_server_cqs_->empty()) {
1227 resource_exhausted_handler_ =
1228 absl::make_unique<grpc::internal::ResourceExhaustedHandler>(
1229 kServerThreadpoolExhausted);
1232 for (
const auto&
value : sync_req_mgrs_) {
1236 if (default_health_check_service_impl !=
nullptr) {
1240 for (
auto& acceptor : acceptors_) {
1253 for (
auto& acceptor : acceptors_) {
1254 acceptor->Shutdown();
1259 grpc::ShutdownTag shutdown_tag;
1278 UnrefAndWaitLocked();
1282 for (
const auto&
value : sync_req_mgrs_) {
1287 for (
const auto&
value : sync_req_mgrs_) {
1293 CompletionQueue* callback_cq = callback_cq_.load(std::memory_order_relaxed);
1294 if (callback_cq !=
nullptr) {
1299 CompletionQueue::ReleaseCallbackAlternativeCQ(callback_cq);
1301 callback_cq_.store(
nullptr, std::memory_order_release);
1310 shutdown_notified_ =
true;
1311 shutdown_cv_.SignalAll();
1316 for (
auto*
cq : cq_list_) {
1317 cq->UnregisterServer(
this);
1323 void Server::Wait() {
1325 while (
started_ && !shutdown_notified_) {
1326 shutdown_cv_.Wait(&
mu_);
1335 bool Server::UnimplementedAsyncRequest::FinalizeResult(
void**
tag,
1337 if (GenericAsyncRequest::FinalizeResult(
tag,
status)) {
1353 Server::UnimplementedAsyncResponse::UnimplementedAsyncResponse(
1358 kUnknownRpcMethod,
this);
1370 if (callback_cq !=
nullptr) {
1376 callback_cq =
callback_cq_.load(std::memory_order_relaxed);
1377 if (callback_cq !=
nullptr) {
1382 auto* shutdown_callback =
new grpc::ShutdownCallback;
1385 shutdown_callback});
1388 shutdown_callback->TakeCQ(callback_cq);
1394 callback_cq_.store(callback_cq, std::memory_order_release);