Go to the documentation of this file.
28 #ifdef GRPC_POSIX_SOCKET_TCP_SERVER
33 #include <netinet/in.h>
34 #include <netinet/tcp.h>
36 #include <sys/socket.h>
38 #include <sys/types.h>
43 #include "absl/strings/str_cat.h"
44 #include "absl/strings/str_format.h"
65 static std::atomic<int64_t> num_dropped_connections{0};
72 s->expand_wildcard_addrs =
false;
73 for (
size_t i = 0;
i < (
args ==
nullptr ? 0 :
args->num_args);
i++) {
77 (
args->args[
i].value.integer != 0);
81 " must be an integer");
85 s->expand_wildcard_addrs = (
args->args[
i].value.integer != 0);
96 s->destroyed_ports = 0;
98 s->shutdown_starting.head =
nullptr;
99 s->shutdown_starting.tail =
nullptr;
101 s->on_accept_cb =
nullptr;
102 s->on_accept_cb_arg =
nullptr;
107 s->fd_handler =
nullptr;
119 if (
s->shutdown_complete !=
nullptr) {
130 delete s->fd_handler;
137 s->destroyed_ports++;
138 if (
s->destroyed_ports ==
s->nports) {
158 for (sp =
s->head; sp; sp = sp->
next) {
161 grpc_schedule_on_exec_ctx);
163 "tcp_listener_shutdown");
177 if (
s->active_ports) {
179 for (sp =
s->head; sp; sp = sp->
next) {
186 deactivated_all_ports(s);
202 addr.len =
static_cast<socklen_t
>(
sizeof(
struct sockaddr_storage));
207 if (errno == EINTR) {
209 }
else if (errno == EAGAIN || errno == ECONNABORTED ||
210 errno == EWOULDBLOCK) {
227 int64_t dropped_connections_count =
228 num_dropped_connections.fetch_add(1, std::memory_order_relaxed) + 1;
229 if (dropped_connections_count % 1000 == 1) {
231 "Dropped >= %" PRId64
232 " new connection attempts due to high memory pressure",
233 dropped_connections_count);
243 addr.len =
static_cast<socklen_t
>(
sizeof(
struct sockaddr_storage));
244 if (getsockname(fd,
reinterpret_cast<struct sockaddr*
>(
addr.addr),
261 if (!addr_uri.ok()) {
263 addr_uri.status().ToString().c_str());
291 read_notifier_pollset, acceptor);
300 deactivated_all_ports(sp->
server);
313 unsigned fd_index = 0;
331 requested_port = *out_port = sp->
port;
340 *out_port = sp2->
port;
349 "Failed to add :: listener, "
350 "the environment may not support IPv6: %s",
356 "Failed to add 0.0.0.0 listener, "
357 "the environment may not support IPv4: %s",
364 "Failed to add any wildcard listeners");
382 for (
unsigned i = 0;
i <
count;
i++) {
394 if (!addr_str.
ok()) {
433 unsigned port_index = 0;
437 if (
s->tail !=
nullptr) {
438 port_index =
s->tail->port_index + 1;
444 if (requested_port == 0) {
445 for (sp =
s->head; sp; sp = sp->
next) {
447 static_cast<socklen_t
>(
sizeof(
struct sockaddr_storage));
450 reinterpret_cast<grpc_sockaddr*
>(&sockname_temp.
addr),
451 &sockname_temp.
len)) {
456 requested_port = used_port;
457 addr = &sockname_temp;
464 return add_wildcard_addrs_to_server(s, port_index, requested_port,
468 addr = &addr6_v4mapped;
472 *out_port = sp->
port;
480 unsigned port_index) {
481 unsigned num_ports = 0;
483 for (sp =
s->head; sp; sp = sp->
next) {
485 if (++num_ports > port_index) {
493 unsigned tcp_server_port_fd_count(
grpc_tcp_server* s,
unsigned port_index) {
494 unsigned num_fds = 0;
508 for (; sp; sp = sp->
sibling, --fd_index) {
519 const std::vector<grpc_pollset*>* pollsets,
521 void* on_accept_cb_arg) {
528 s->on_accept_cb = on_accept_cb;
529 s->on_accept_cb_arg = on_accept_cb_arg;
530 s->pollsets = pollsets;
532 while (sp !=
nullptr) {
534 pollsets->size() > 1) {
536 "clone_port", clone_port(sp, (
unsigned)(pollsets->size() - 1))));
537 for (
i = 0;
i < pollsets->size();
i++) {
540 grpc_schedule_on_exec_ctx);
546 for (
i = 0;
i < pollsets->size();
i++) {
550 grpc_schedule_on_exec_ctx);
578 tcp_server_destroy(s);
584 s->shutdown_listeners =
true;
586 if (
s->active_ports) {
588 for (sp =
s->head; sp; sp = sp->
next) {
606 addr.len =
static_cast<socklen_t
>(
sizeof(
struct sockaddr_storage));
609 if (getpeername(fd,
reinterpret_cast<struct sockaddr*
>(
addr.addr),
617 if (!addr_uri.ok()) {
619 addr_uri.status().ToString().c_str());
628 read_notifier_pollset =
630 &
s_->next_pollset_to_assign, 1)) %
631 s_->pollsets->size()];
641 s_->on_accept_cb(
s_->on_accept_cb_arg,
643 read_notifier_pollset, acceptor);
653 s->fd_handler =
new ExternalConnectionHandler(s);
654 return s->fd_handler;
658 tcp_server_create, tcp_server_start,
659 tcp_server_add_port, tcp_server_create_fd_handler,
660 tcp_server_port_fd_count, tcp_server_port_fd,
661 tcp_server_ref, tcp_server_shutdown_starting_add,
662 tcp_server_unref, tcp_server_shutdown_listeners};
#define GRPC_CLOSURE_INIT(closure, cb, cb_arg, scheduler)
void grpc_tcp_server_shutdown_listeners(grpc_tcp_server *s)
GPRAPI void gpr_mu_unlock(gpr_mu *mu)
#define gpr_atm_no_barrier_store(p, value)
struct grpc_tcp_listener * sibling
std::string ToString(StatusToStringMode mode=StatusToStringMode::kDefault) const
std::string StrCat(const AlphaNum &a, const AlphaNum &b)
return memset(p, 0, total)
static void RunList(const DebugLocation &location, grpc_closure_list *list)
ABSL_MUST_USE_RESULT std::string StrFormat(const FormatSpec< Args... > &format, const Args &... args)
#define GRPC_ARG_EXPAND_WILDCARD_ADDRS
gpr_atm next_pollset_to_assign
void grpc_fd_shutdown(grpc_fd *fd, grpc_error_handle why)
grpc_tcp_server_cb on_accept_cb
grpc_error_handle grpc_set_socket_no_sigpipe_if_possible(int fd)
static void shutdown_complete(void *arg, int)
GPRAPI void gpr_free(void *ptr)
void grpc_sockaddr_make_wildcards(int port, grpc_resolved_address *wild4_out, grpc_resolved_address *wild6_out)
GPRAPI void * gpr_malloc(size_t size)
#define GRPC_LOG_IF_ERROR(what, error)
absl::StatusOr< std::string > grpc_sockaddr_to_string(const grpc_resolved_address *resolved_addr, bool normalize)
#define GRPC_TRACE_FLAG_ENABLED(f)
struct grpc_tcp_listener * next
int grpc_is_unix_socket(const grpc_resolved_address *resolved_addr)
#define GRPC_ARG_ALLOW_REUSEPORT
grpc_closure read_closure
void(* grpc_tcp_server_cb)(void *arg, grpc_endpoint *ep, grpc_pollset *accepting_pollset, grpc_tcp_server_acceptor *acceptor)
bool grpc_closure_list_append(grpc_closure_list *closure_list, grpc_closure *closure)
grpc_error_handle grpc_tcp_server_add_addr(grpc_tcp_server *s, const grpc_resolved_address *addr, unsigned port_index, unsigned fd_index, grpc_dualstack_mode *dsmode, grpc_tcp_listener **listener)
GPRAPI void gpr_mu_destroy(gpr_mu *mu)
grpc_core::TraceFlag grpc_tcp_trace(false, "tcp")
std::unique_ptr< Server > server
grpc_error_handle grpc_error_add_child(grpc_error_handle src, grpc_error_handle child)
memcpy(mem, inblock.get(), min(CONTAINING_RECORD(inblock.get(), MEMBLOCK, data) ->size, size))
bool grpc_is_socket_reuse_port_supported()
#define GRPC_MAX_SOCKADDR_SIZE
grpc_channel_args * channel_args
static grpc_closure on_read
def c_str(s, encoding='ascii')
int grpc_sockaddr_set_port(grpc_resolved_address *resolved_addr, int port)
GPRAPI void gpr_log(const char *file, int line, gpr_log_severity severity, const char *format,...) GPR_PRINT_FORMAT_CHECK(4
grpc_error_handle grpc_tcp_server_add_all_local_addrs(grpc_tcp_server *s, unsigned port_index, int requested_port, int *out_port)
GPRAPI void gpr_mu_init(gpr_mu *mu)
grpc_fd * grpc_fd_create(int fd, const char *name, bool track_err)
int grpc_accept4(int sockfd, grpc_resolved_address *resolved_addr, int nonblock, int cloexec)
grpc_resolved_address addr
grpc_endpoint * grpc_tcp_create(grpc_fd *fd, const grpc_channel_args *args, absl::string_view peer_string)
MemoryQuotaRefPtr memory_quota()
void grpc_channel_args_destroy(grpc_channel_args *a)
#define gpr_atm_no_barrier_fetch_add(p, delta)
grpc_channel_args * grpc_channel_args_copy(const grpc_channel_args *src)
GPRAPI void gpr_mu_lock(gpr_mu *mu)
grpc_tcp_server * from_server
grpc_error_handle grpc_create_dualstack_socket(const grpc_resolved_address *addr, int type, int protocol, grpc_dualstack_mode *dsmode, int *newfd)
void grpc_unlink_if_unix_domain_socket(const grpc_resolved_address *resolved_addr)
int grpc_sockaddr_to_v4mapped(const grpc_resolved_address *resolved_addr, grpc_resolved_address *resolved_addr6_out)
#define GRPC_ERROR_CREATE_FROM_STATIC_STRING(desc)
bool grpc_tcp_server_have_ifaddrs(void)
void grpc_fd_notify_on_read(grpc_fd *fd, grpc_closure *closure)
int grpc_sockaddr_get_port(const grpc_resolved_address *resolved_addr)
ABSL_MUST_USE_RESULT bool ok() const
ResourceQuotaRefPtr ResourceQuotaFromChannelArgs(const grpc_channel_args *args)
GPRAPI void gpr_ref_non_zero(gpr_refcount *r)
struct grpc_tcp_server grpc_tcp_server
grpc_byte_buffer * pending_data
grpc_core::ExecCtx exec_ctx
std::string grpc_error_std_string(grpc_error_handle error)
#define GRPC_ERROR_CREATE_FROM_CPP_STRING(desc)
grpc_closure destroyed_closure
grpc_error_handle grpc_tcp_server_prepare_socket(grpc_tcp_server *, int fd, const grpc_resolved_address *addr, bool so_reuseport, int *port)
void grpc_pollset_add_fd(grpc_pollset *pollset, struct grpc_fd *fd)
grpc_core::MemoryQuotaRefPtr memory_quota
#define GRPC_ERROR_UNREF(err)
static void Run(const DebugLocation &location, grpc_closure *closure, grpc_error_handle error)
void grpc_fd_orphan(grpc_fd *fd, grpc_closure *on_done, int *release_fd, const char *reason)
@ GRPC_FD_SERVER_CONNECTION_USAGE
GPRAPI void gpr_ref_init(gpr_refcount *r, int n)
int grpc_sockaddr_is_wildcard(const grpc_resolved_address *resolved_addr, int *port_out)
GPRAPI int gpr_unref(gpr_refcount *r)
const std::vector< grpc_pollset * > * pollsets
grpc_error_handle grpc_apply_socket_mutator_in_args(int fd, grpc_fd_usage usage, const grpc_channel_args *args)
absl::StatusOr< std::string > grpc_sockaddr_to_uri(const grpc_resolved_address *resolved_addr)
char addr[GRPC_MAX_SOCKADDR_SIZE]
const Status & status() const &
#define GRPC_ERROR_IS_NONE(err)
grpc
Author(s):
autogenerated on Thu Mar 13 2025 03:01:29