34 #include "absl/base/thread_annotations.h"
35 #include "absl/memory/memory.h"
36 #include "absl/status/status.h"
37 #include "absl/status/statusor.h"
38 #include "absl/strings/str_cat.h"
39 #include "absl/strings/str_format.h"
40 #include "absl/strings/string_view.h"
41 #include "absl/strings/strip.h"
91 #ifdef GPR_SUPPORT_CHANNELS_FROM_FD
95 #endif // GPR_SUPPORT_CHANNELS_FROM_FD
100 const char kUnixUriPrefix[] =
"unix:";
101 const char kUnixAbstractUriPrefix[] =
"unix-abstract:";
103 class Chttp2ServerListener :
public Server::ListenerInterface {
117 ~Chttp2ServerListener()
override;
120 const std::vector<grpc_pollset*>* pollsets)
override;
122 channelz::ListenSocketNode* channelz_listen_socket_node()
const override {
126 void SetOnDestroyDone(
grpc_closure* on_destroy_done)
override;
128 void Orphan()
override;
131 class ConfigFetcherWatcher
134 explicit ConfigFetcherWatcher(RefCountedPtr<Chttp2ServerListener> listener)
137 void UpdateConnectionManager(
138 RefCountedPtr<grpc_server_config_fetcher::ConnectionManager>
139 connection_manager)
override;
141 void StopServing()
override;
147 class ActiveConnection :
public InternallyRefCounted<ActiveConnection> {
149 class HandshakingState :
public InternallyRefCounted<HandshakingState> {
151 HandshakingState(RefCountedPtr<ActiveConnection> connection_ref,
156 ~HandshakingState()
override;
158 void Orphan()
override;
172 RefCountedPtr<HandshakeManager> handshake_mgr_
185 ~ActiveConnection()
override;
187 void Orphan()
override;
191 void Start(RefCountedPtr<Chttp2ServerListener> listener,
202 RefCountedPtr<Chttp2ServerListener>
listener_;
219 friend class RefCountedPtr<Chttp2ServerListener>;
222 void StartListening();
230 static void DestroyListener(
Server* ,
void*
arg,
240 void IncrementRefCount(
const DebugLocation& ,
247 return RefCountedPtr<Chttp2ServerListener>(
this);
249 RefCountedPtr<Chttp2ServerListener>
Ref(
const DebugLocation& ,
256 void Unref(
const DebugLocation& ,
const char* ) {
267 RefCountedPtr<grpc_server_config_fetcher::ConnectionManager>
277 std::map<ActiveConnection*, OrphanablePtr<ActiveConnection>> connections_
289 void Chttp2ServerListener::ConfigFetcherWatcher::UpdateConnectionManager(
290 RefCountedPtr<grpc_server_config_fetcher::ConnectionManager>
291 connection_manager) {
292 RefCountedPtr<grpc_server_config_fetcher::ConnectionManager>
293 connection_manager_to_destroy;
294 class GracefulShutdownExistingConnections {
296 ~GracefulShutdownExistingConnections() {
299 for (
auto& connection : connections_) {
300 connection.first->SendGoAway();
304 void set_connections(
305 std::map<ActiveConnection*, OrphanablePtr<ActiveConnection>>
312 std::map<ActiveConnection*, OrphanablePtr<ActiveConnection>> connections_;
313 } connections_to_shutdown;
316 connection_manager_to_destroy =
listener_->connection_manager_;
344 void Chttp2ServerListener::ConfigFetcherWatcher::StopServing() {
345 std::map<ActiveConnection*, OrphanablePtr<ActiveConnection>>
connections;
354 connection.first->SendGoAway();
369 Chttp2ServerListener::ActiveConnection::HandshakingState::HandshakingState(
370 RefCountedPtr<ActiveConnection> connection_ref,
384 Chttp2ServerListener::ActiveConnection::HandshakingState::~HandshakingState() {
390 void Chttp2ServerListener::ActiveConnection::HandshakingState::Orphan() {
393 if (handshake_mgr_ !=
nullptr) {
394 handshake_mgr_->Shutdown(
401 void Chttp2ServerListener::ActiveConnection::HandshakingState::Start(
404 RefCountedPtr<HandshakeManager> handshake_mgr;
407 if (handshake_mgr_ ==
nullptr)
return;
408 handshake_mgr = handshake_mgr_;
411 OnHandshakeDone,
this);
414 void Chttp2ServerListener::ActiveConnection::HandshakingState::OnTimeout(
416 HandshakingState*
self =
static_cast<HandshakingState*
>(
arg);
422 "Did not receive HTTP/2 settings before handshake timeout");
426 transport =
self->connection_->transport_;
433 void Chttp2ServerListener::ActiveConnection::HandshakingState::
435 HandshakingState*
self =
static_cast<HandshakingState*
>(
arg);
440 void Chttp2ServerListener::ActiveConnection::HandshakingState::OnHandshakeDone(
442 auto*
args =
static_cast<HandshakerArgs*
>(
arg);
443 HandshakingState*
self =
static_cast<HandshakingState*
>(
args->user_data);
444 OrphanablePtr<HandshakingState> handshaking_state_ref;
445 RefCountedPtr<HandshakeManager> handshake_mgr;
446 bool cleanup_connection =
false;
452 cleanup_connection =
true;
470 if (
args->endpoint !=
nullptr) {
474 self->connection_->listener_->server_->SetupTransport(
486 self->connection_->transport_ =
490 self->Ref().release();
492 self, grpc_schedule_on_exec_ctx);
497 if (
self->connection_->listener_->config_fetcher_watcher_ !=
500 self->connection_->Ref().release();
501 on_close = &
self->connection_->on_close_;
505 cleanup_connection =
true;
508 &
self->on_receive_settings_,
511 self->Ref().release();
513 grpc_schedule_on_exec_ctx);
523 cleanup_connection =
true;
527 cleanup_connection =
true;
535 handshaking_state_ref =
std::move(
self->connection_->handshaking_state_);
538 self->acceptor_ =
nullptr;
539 OrphanablePtr<ActiveConnection> connection;
540 if (cleanup_connection) {
542 auto it =
self->connection_->listener_->connections_.find(
543 self->connection_.get());
544 if (
it !=
self->connection_->listener_->connections_.end()) {
546 self->connection_->listener_->connections_.erase(
it);
556 Chttp2ServerListener::ActiveConnection::ActiveConnection(
559 : handshaking_state_(memory_owner.
MakeOrphanable<HandshakingState>(
560 Ref(), accepting_pollset, acceptor,
args)) {
562 grpc_schedule_on_exec_ctx);
565 Chttp2ServerListener::ActiveConnection::~ActiveConnection() {
571 void Chttp2ServerListener::ActiveConnection::Orphan() {
572 OrphanablePtr<HandshakingState> handshaking_state;
578 handshaking_state =
std::move(handshaking_state_);
583 void Chttp2ServerListener::ActiveConnection::SendGoAway() {
597 {10 * 60 * GPR_MS_PER_SEC, 0, INT_MAX})),
599 drain_grace_timer_expiry_callback_pending_ =
true;
606 "Server is stopping to serve requests.");
611 void Chttp2ServerListener::ActiveConnection::Start(
612 RefCountedPtr<Chttp2ServerListener> listener,
grpc_endpoint* endpoint,
614 RefCountedPtr<HandshakingState> handshaking_state_ref;
621 handshaking_state_ref = handshaking_state_->Ref();
623 handshaking_state_ref->Start(endpoint,
args);
626 void Chttp2ServerListener::ActiveConnection::OnClose(
628 ActiveConnection*
self =
static_cast<ActiveConnection*
>(
arg);
629 OrphanablePtr<ActiveConnection> connection;
635 if (!
self->shutdown_) {
636 auto it =
self->listener_->connections_.find(
self);
637 if (
it !=
self->listener_->connections_.end()) {
639 self->listener_->connections_.erase(
it);
641 self->shutdown_ =
true;
644 if (
self->drain_grace_timer_expiry_callback_pending_) {
651 void Chttp2ServerListener::ActiveConnection::OnDrainGraceTimeExpiry(
653 ActiveConnection*
self =
static_cast<ActiveConnection*
>(
arg);
664 "Drain grace time expired. Closing connection immediately.");
677 Chttp2ServerListener* listener =
nullptr;
683 listener =
new Chttp2ServerListener(
server,
args, args_modifier);
685 args, &listener->tcp_server_);
687 if (
server->config_fetcher() !=
nullptr) {
688 listener->resolved_address_ = *
addr;
699 if (!string_address.ok()) {
701 string_address.status().ToString());
703 listener->channelz_listen_socket_ =
704 MakeRefCounted<channelz::ListenSocketNode>(
709 server->AddListener(OrphanablePtr<Server::ListenerInterface>(listener));
713 if (listener !=
nullptr) {
714 if (listener->tcp_server_ !=
nullptr) {
730 Chttp2ServerListener* listener =
731 new Chttp2ServerListener(
server,
args, args_modifier);
733 &listener->tcp_server_shutdown_complete_,
args, &listener->tcp_server_);
739 TcpServerFdHandler** arg_val =
740 grpc_channel_args_find_pointer<TcpServerFdHandler*>(
args,
name);
742 server->AddListener(OrphanablePtr<Server::ListenerInterface>(listener));
746 Chttp2ServerListener::Chttp2ServerListener(
754 this, grpc_schedule_on_exec_ctx);
757 Chttp2ServerListener::~Chttp2ServerListener() {
761 if (on_destroy_done_ !=
nullptr) {
769 void Chttp2ServerListener::Start(
770 Server* ,
const std::vector<grpc_pollset*>* ) {
771 if (
server_->config_fetcher() !=
nullptr) {
772 auto watcher = absl::make_unique<ConfigFetcherWatcher>(
Ref());
774 server_->config_fetcher()->StartWatch(
787 void Chttp2ServerListener::StartListening() {
791 void Chttp2ServerListener::SetOnDestroyDone(
grpc_closure* on_destroy_done) {
793 on_destroy_done_ = on_destroy_done;
799 Chttp2ServerListener*
self =
static_cast<Chttp2ServerListener*
>(
arg);
802 RefCountedPtr<grpc_server_config_fetcher::ConnectionManager>
806 connection_manager =
self->connection_manager_;
813 if (
self->server_->config_fetcher() !=
nullptr) {
814 if (connection_manager ==
nullptr) {
816 "No ConnectionManager configured. Closing connection.");
817 endpoint_cleanup(
error);
825 connection_manager->UpdateChannelArgsForConnection(
args,
tcp);
826 if (!args_result.
ok()) {
828 args_result.
status().ToString().c_str());
834 args =
self->args_modifier_(*args_result, &
error);
838 endpoint_cleanup(
error);
842 args_to_destroy =
args;
844 auto memory_owner =
self->memory_quota_->CreateMemoryOwner(
846 auto connection = memory_owner.MakeOrphanable<ActiveConnection>(
852 RefCountedPtr<ActiveConnection> connection_ref = connection->Ref();
853 RefCountedPtr<Chttp2ServerListener> listener_ref;
858 if (!
self->shutdown_ &&
self->is_serving_ &&
859 connection_manager ==
self->connection_manager_) {
865 listener_ref =
self->Ref();
866 self->connections_.emplace(connection.get(),
std::move(connection));
869 if (connection !=
nullptr) {
877 void Chttp2ServerListener::TcpServerShutdownComplete(
void*
arg,
879 Chttp2ServerListener*
self =
static_cast<Chttp2ServerListener*
>(
arg);
880 self->channelz_listen_socket_.reset();
887 void Chttp2ServerListener::Orphan() {
893 std::map<ActiveConnection*, OrphanablePtr<ActiveConnection>>
connections;
905 started_cv_.Wait(&
mu_);
923 if (
addr ==
nullptr) {
925 "Invalid address: addr cannot be a nullptr.");
927 if (strncmp(
addr,
"external:", 9) == 0) {
928 return Chttp2ServerListener::CreateWithAcceptor(
server,
addr,
args,
933 std::vector<grpc_error_handle> error_list;
942 kUnixAbstractUriPrefix)) {
948 if (!resolved_or.
ok()) {
952 for (
auto&
addr : *resolved_or) {
961 args_modifier, &port_temp);
963 error_list.push_back(
error);
965 if (*port_num == -1) {
966 *port_num = port_temp;
972 if (error_list.size() == resolved_or->size()) {
974 "No address added out of total %" PRIuPTR
" resolved for '%s'",
975 resolved_or->size(),
addr);
977 msg.c_str(), error_list.data(), error_list.size());
978 }
else if (!error_list.empty()) {
980 "Only %" PRIuPTR
" addresses added out of total %" PRIuPTR
982 resolved_or->size() - error_list.size(), resolved_or->size());
984 msg.c_str(), error_list.data(), error_list.size());
1007 if (server_credentials ==
nullptr) {
1009 "Could not find server credentials");
1013 if (security_connector ==
nullptr) {
1015 absl::StrCat(
"Unable to create secure server with credentials of type ",
1016 server_credentials->
type().
name()));
1037 GRPC_API_TRACE(
"grpc_server_add_http2_port(server=%p, addr=%s, creds=%p)", 3,
1040 if (creds ==
nullptr) {
1042 "No credentials specified for secure server port (creds==NULL)");
1062 if (sc ==
nullptr) {
1064 "Unable to create secure server with credentials of type ",
1076 ModifyArgsForConnection, &port_num);
1087 #ifdef GPR_SUPPORT_CHANNELS_FROM_FD
1091 if (creds ==
nullptr ||
1106 server_args, server_endpoint,
false
1123 #else // !GPR_SUPPORT_CHANNELS_FROM_FD
1130 #endif // GPR_SUPPORT_CHANNELS_FROM_FD