Go to the documentation of this file.
23 #ifdef GRPC_POSIX_SOCKET_TCP_CLIENT
26 #include <netinet/in.h>
30 #include "absl/container/flat_hash_map.h"
31 #include "absl/strings/str_cat.h"
54 struct async_connect {
67 bool connect_cancelled;
70 struct ConnectionShard {
79 std::vector<ConnectionShard>* g_connection_shards =
nullptr;
80 std::atomic<int64_t> g_connection_id{1};
82 void do_tcp_client_global_init(
void) {
84 g_connection_shards =
new std::vector<struct ConnectionShard>(num_shards);
90 gpr_once_init(&g_tcp_client_posix_init, do_tcp_client_global_init);
132 async_connect* ac =
static_cast<async_connect*
>(acp);
138 if (ac->fd !=
nullptr) {
142 done = (--ac->refs == 0);
158 async_connect* ac =
static_cast<async_connect*
>(acp);
160 socklen_t so_error_size;
179 bool connect_cancelled = ac->connect_cancelled;
191 if (connect_cancelled) {
198 so_error_size =
sizeof(so_error);
201 }
while (
err < 0 && errno == EINTR);
244 if (!connect_cancelled) {
245 int shard_number = ac->connection_handle % (*g_connection_shards).size();
246 struct ConnectionShard* shard = &(*g_connection_shards)[shard_number];
249 shard->pending_connections.erase(ac->connection_handle);
257 done = (--ac->refs == 0);
278 if (!connect_cancelled) {
296 memcpy(mapped_addr,
addr,
sizeof(*mapped_addr));
306 memcpy(mapped_addr,
addr,
sizeof(*mapped_addr));
309 if ((
error = prepare_socket(mapped_addr, *fd, channel_args)) !=
322 err = connect(fd,
reinterpret_cast<const grpc_sockaddr*
>(
addr->addr),
324 }
while (
err < 0 && errno == EINTR);
327 if (!addr_uri.ok()) {
337 if (errno == EWOULDBLOCK || errno == EINPROGRESS) {
339 connection_id = g_connection_id.fetch_add(1, std::memory_order_acq_rel);
349 if (errno != EWOULDBLOCK && errno != EINPROGRESS) {
355 grpc_fd_orphan(fdobj,
nullptr,
nullptr,
"tcp_client_connect_error");
362 async_connect* ac =
new async_connect();
366 ac->interested_parties = interested_parties;
367 ac->addr_str = addr_uri.value();
368 ac->connection_handle = connection_id;
369 ac->connect_cancelled =
false;
373 grpc_schedule_on_exec_ctx);
377 gpr_log(
GPR_INFO,
"CLIENT_CONNECT: %s: asynchronously connecting fd %p",
378 ac->addr_str.c_str(), fdobj);
381 int shard_number = connection_id % (*g_connection_shards).size();
382 struct ConnectionShard* shard = &(*g_connection_shards)[shard_number];
385 shard->pending_connections.insert_or_assign(connection_id, ac);
393 return connection_id;
411 fd, channel_args, &mapped_addr,
415 static bool tcp_cancel_connect(
int64_t connection_handle) {
416 if (connection_handle <= 0) {
419 int shard_number = connection_handle % (*g_connection_shards).size();
420 struct ConnectionShard* shard = &(*g_connection_shards)[shard_number];
421 async_connect* ac =
nullptr;
424 auto it = shard->pending_connections.find(connection_handle);
425 if (
it != shard->pending_connections.end()) {
438 shard->pending_connections.erase(
it);
445 bool connection_cancel_success = (ac->fd !=
nullptr);
446 if (connection_cancel_success) {
449 ac->connect_cancelled =
true;
455 bool done = (--ac->refs == 0);
464 return connection_cancel_success;
#define GRPC_CLOSURE_INIT(closure, cb, cb_arg, scheduler)
grpc_error_handle grpc_set_socket_reuse_addr(int fd, int reuse)
GPRAPI unsigned gpr_cpu_num_cores(void)
void grpc_pollset_set_del_fd(grpc_pollset_set *pollset_set, grpc_fd *fd)
GPRAPI void gpr_mu_unlock(gpr_mu *mu)
std::string StrCat(const AlphaNum &a, const AlphaNum &b)
grpc_error_handle grpc_set_socket_low_latency(int fd, int low_latency)
static int finish(struct hexdump_ctx *ctx)
void grpc_fd_shutdown(grpc_fd *fd, grpc_error_handle why)
struct grpc_pollset_set grpc_pollset_set
grpc_error_handle grpc_set_socket_no_sigpipe_if_possible(int fd)
OPENSSL_EXPORT pem_password_cb void * u
#define ABSL_GUARDED_BY(x)
@ GRPC_ERROR_STR_DESCRIPTION
top-level textual description of this error
#define GRPC_TRACE_FLAG_ENABLED(f)
static void Run(grpc_closure *closure, grpc_error_handle error, ExecutorType executor_type=ExecutorType::DEFAULT, ExecutorJobType job_type=ExecutorJobType::SHORT)
int grpc_is_unix_socket(const grpc_resolved_address *resolved_addr)
grpc_endpoint * grpc_tcp_client_create_from_fd(grpc_fd *fd, const grpc_channel_args *channel_args, absl::string_view addr_str)
grpc_error_handle grpc_error_set_str(grpc_error_handle src, grpc_error_strs which, absl::string_view str)
GPRAPI void gpr_once_init(gpr_once *once, void(*init_function)(void))
@ GRPC_ERROR_STR_TARGET_ADDRESS
peer that we were trying to communicate when this error occurred
int64_t grpc_tcp_client_create_from_prepared_fd(grpc_pollset_set *interested_parties, grpc_closure *closure, const int fd, const grpc_channel_args *channel_args, const grpc_resolved_address *addr, grpc_core::Timestamp deadline, grpc_endpoint **ep)
GPRAPI void gpr_mu_destroy(gpr_mu *mu)
grpc_core::TraceFlag grpc_tcp_trace(false, "tcp")
memcpy(mem, inblock.get(), min(CONTAINING_RECORD(inblock.get(), MEMBLOCK, data) ->size, size))
std::vector< CordRep * > refs
grpc_error_handle grpc_set_socket_cloexec(int fd, int close_on_exec)
#define GRPC_OS_ERROR(err, call_name)
create an error associated with errno!=0 (an 'operating system' error)
def c_str(s, encoding='ascii')
GPRAPI void gpr_log(const char *file, int line, gpr_log_severity severity, const char *format,...) GPR_PRINT_FORMAT_CHECK(4
GPRAPI void gpr_mu_init(gpr_mu *mu)
grpc_fd * grpc_fd_create(int fd, const char *name, bool track_err)
grpc_endpoint * grpc_tcp_create(grpc_fd *fd, const grpc_channel_args *args, absl::string_view peer_string)
void grpc_fd_notify_on_write(grpc_fd *fd, grpc_closure *closure)
void grpc_channel_args_destroy(grpc_channel_args *a)
grpc_channel_args * grpc_channel_args_copy(const grpc_channel_args *src)
GPRAPI void gpr_mu_lock(gpr_mu *mu)
grpc_error_handle grpc_create_dualstack_socket(const grpc_resolved_address *addr, int type, int protocol, grpc_dualstack_mode *dsmode, int *newfd)
int grpc_sockaddr_to_v4mapped(const grpc_resolved_address *resolved_addr, grpc_resolved_address *resolved_addr6_out)
#define GRPC_ERROR_CREATE_FROM_STATIC_STRING(desc)
@ GRPC_ERROR_STR_OS_ERROR
operating system description of this error
grpc_error_handle grpc_tcp_client_prepare_fd(const grpc_channel_args *channel_args, const grpc_resolved_address *addr, grpc_resolved_address *mapped_addr, int *fd)
void grpc_timer_cancel(grpc_timer *timer)
#define GRPC_ERROR_REF(err)
std::string grpc_error_std_string(grpc_error_handle error)
UniquePtr< SSL_SESSION > ret
#define GRPC_ERROR_CREATE_FROM_CPP_STRING(desc)
@ GRPC_FD_CLIENT_CONNECTION_USAGE
void grpc_pollset_set_add_fd(grpc_pollset_set *pollset_set, grpc_fd *fd)
void grpc_timer_init(grpc_timer *timer, grpc_core::Timestamp deadline, grpc_closure *closure)
#define GRPC_ERROR_UNREF(err)
static void Run(const DebugLocation &location, grpc_closure *closure, grpc_error_handle error)
void grpc_fd_orphan(grpc_fd *fd, grpc_closure *on_done, int *release_fd, const char *reason)
void grpc_tcp_client_global_init()
bool grpc_error_get_str(grpc_error_handle err, grpc_error_strs which, std::string *s)
Returns false if the specified string is not set.
grpc_error_handle grpc_set_socket_nonblocking(int fd, int non_blocking)
grpc_error_handle grpc_set_socket_tcp_user_timeout(int fd, const grpc_channel_args *channel_args, bool is_client)
int grpc_sockaddr_is_v4mapped(const grpc_resolved_address *resolved_addr, grpc_resolved_address *resolved_addr4_out)
grpc_error_handle grpc_apply_socket_mutator_in_args(int fd, grpc_fd_usage usage, const grpc_channel_args *args)
absl::StatusOr< std::string > grpc_sockaddr_to_uri(const grpc_resolved_address *resolved_addr)
int grpc_fd_wrapped_fd(grpc_fd *fd)
#define GRPC_ERROR_IS_NONE(err)
grpc
Author(s):
autogenerated on Thu Mar 13 2025 03:01:29