Go to the documentation of this file.
23 #ifdef GRPC_POSIX_SOCKET_TCP
29 #include <sys/socket.h>
30 #include <sys/types.h>
68 static void create_sockets(
int sv[2]) {
70 GPR_ASSERT(socketpair(AF_UNIX, SOCK_STREAM, 0, sv) == 0);
71 flags = fcntl(sv[0], F_GETFL, 0);
73 flags = fcntl(sv[1], F_GETFL, 0);
77 static void create_inet_sockets(
int sv[2]) {
79 struct sockaddr_in
addr;
81 addr.sin_family = AF_INET;
82 int sock =
socket(AF_INET, SOCK_STREAM, 0);
84 GPR_ASSERT(bind(sock, (sockaddr*)&
addr,
sizeof(sockaddr_in)) == 0);
88 socklen_t
len =
sizeof(sockaddr_in);
97 }
while (
ret == -1 && errno == EINTR);
100 len =
sizeof(socklen_t);
103 server = accept(sock,
reinterpret_cast<sockaddr*
>(&
addr), &
len);
104 }
while (
server == -1 && errno == EINTR);
109 int flags = fcntl(sv[0], F_GETFL, 0);
111 flags = fcntl(sv[1], F_GETFL, 0);
115 static ssize_t fill_socket(
int fd) {
119 unsigned char buf[256];
120 for (
i = 0;
i < 256; ++
i) {
125 if (write_bytes > 0) {
126 total_bytes += write_bytes;
128 }
while (write_bytes >= 0 || errno == EINTR);
133 static size_t fill_socket_partial(
int fd,
size_t bytes) {
135 size_t total_bytes = 0;
144 if (write_bytes > 0) {
145 total_bytes +=
static_cast<size_t>(write_bytes);
147 }
while ((write_bytes >= 0 || errno == EINTR) &&
bytes > total_bytes);
154 struct read_socket_state {
156 int min_progress_size;
158 size_t target_read_bytes;
165 size_t num_bytes = 0;
168 for (
i = 0;
i < nslices; ++
i) {
172 *current_data = (*current_data + 1) % 256;
180 struct read_socket_state*
state =
181 static_cast<struct read_socket_state*
>(user_data);
188 current_data =
state->read_bytes % 256;
196 state->target_read_bytes);
197 if (
state->read_bytes >=
state->target_read_bytes) {
203 state->min_progress_size =
state->target_read_bytes -
state->read_bytes;
205 false,
state->min_progress_size);
210 static void read_test(
size_t num_bytes,
size_t slice_size,
211 int min_progress_size) {
214 struct read_socket_state
state;
215 size_t written_bytes;
220 gpr_log(
GPR_INFO,
"Read test of size %" PRIuPTR
", slice size %" PRIuPTR,
221 num_bytes, slice_size);
228 a[0].value.integer =
static_cast<int>(slice_size);
238 written_bytes = fill_socket_partial(sv[0], num_bytes);
242 state.read_bytes = 0;
243 state.target_read_bytes = written_bytes;
244 state.min_progress_size =
245 std::min(min_progress_size,
static_cast<int>(written_bytes));
250 state.min_progress_size);
253 while (
state.read_bytes <
state.target_read_bytes) {
272 static void large_read_test(
size_t slice_size,
int min_progress_size) {
275 struct read_socket_state
state;
281 gpr_log(
GPR_INFO,
"Start large read test, slice size %" PRIuPTR, slice_size);
288 a[0].value.integer =
static_cast<int>(slice_size);
298 written_bytes = fill_socket(sv[0]);
302 state.read_bytes = 0;
303 state.target_read_bytes =
static_cast<size_t>(written_bytes);
304 state.min_progress_size =
305 std::min(min_progress_size,
static_cast<int>(written_bytes));
310 state.min_progress_size);
313 while (
state.read_bytes <
state.target_read_bytes) {
330 struct write_socket_state {
336 size_t* num_blocks,
uint8_t* current_data) {
337 size_t nslices = num_bytes / slice_size + (num_bytes % slice_size ? 1
u : 0
u);
340 size_t num_bytes_left = num_bytes;
343 *num_blocks = nslices;
345 for (
i = 0;
i < nslices; ++
i) {
351 buf[
j] = *current_data;
359 static void write_done(
void* user_data ,
362 struct write_socket_state*
state =
363 static_cast<struct write_socket_state*
>(user_data);
365 state->write_done = 1;
371 void drain_socket_blocking(
int fd,
size_t num_bytes,
size_t read_size) {
374 size_t bytes_left = num_bytes;
380 flags = fcntl(fd, F_GETFL, 0);
400 current = (current + 1) % 256;
402 bytes_left -=
static_cast<size_t>(
bytes_read);
403 if (bytes_left == 0)
break;
405 flags = fcntl(fd, F_GETFL, 0);
427 static void write_test(
size_t num_bytes,
size_t slice_size,
428 bool collect_timestamps) {
431 struct write_socket_state
state;
446 "Start write test with %" PRIuPTR
" bytes, slice size %" PRIuPTR,
447 num_bytes, slice_size);
449 if (collect_timestamps) {
450 create_inet_sockets(sv);
458 a[0].value.integer =
static_cast<int>(slice_size);
469 state.write_done = 0;
476 grpc_schedule_on_exec_ctx);
485 drain_socket_blocking(sv[0], num_bytes, num_bytes);
490 if (
state.write_done &&
511 int*
done =
static_cast<int*
>(
arg);
519 static void release_fd_test(
size_t num_bytes,
size_t slice_size) {
522 struct read_socket_state
state;
523 size_t written_bytes;
529 int fd_released_done = 0;
531 grpc_schedule_on_exec_ctx);
534 "Release fd read_test of size %" PRIuPTR
", slice size %" PRIuPTR,
535 num_bytes, slice_size);
542 a[0].value.integer =
static_cast<int>(slice_size);
553 written_bytes = fill_socket_partial(sv[0], num_bytes);
557 state.read_bytes = 0;
558 state.target_read_bytes = written_bytes;
559 state.min_progress_size = 1;
564 state.min_progress_size);
567 while (
state.read_bytes <
state.target_read_bytes) {
584 while (!fd_released_done) {
594 written_bytes = fill_socket_partial(sv[0], num_bytes);
595 drain_socket_blocking(fd, written_bytes, written_bytes);
596 written_bytes = fill_socket_partial(fd, num_bytes);
597 drain_socket_blocking(sv[0], written_bytes, written_bytes);
605 for (
int i = 1;
i <= 8192;
i =
i * 2) {
606 read_test(100, 8192,
i);
607 read_test(10000, 8192,
i);
608 read_test(10000, 137,
i);
609 read_test(10000, 1,
i);
610 large_read_test(8192,
i);
611 large_read_test(1,
i);
613 write_test(100, 8192,
false);
614 write_test(100, 1,
false);
615 write_test(100000, 8192,
false);
616 write_test(100000, 1,
false);
617 write_test(100000, 137,
false);
619 write_test(100, 8192,
true);
620 write_test(100, 1,
true);
621 write_test(100000, 8192,
true);
622 write_test(100000, 1,
true);
623 write_test(100, 137,
true);
626 write_test(40320,
i,
false);
627 write_test(40320,
i,
true);
630 release_fd_test(100, 8192);
645 a[0].value.integer =
static_cast<int>(slice_size);
664 {
"tcp/tcp_socketpair", create_fixture_tcp_socketpair,
clean_up},
671 int main(
int argc,
char** argv) {
684 grpc_schedule_on_exec_ctx);
697 int main(
int argc,
char** argv) {
return 1; }
struct grpc_pollset_worker grpc_pollset_worker
int grpc_tcp_fd(grpc_endpoint *ep)
#define GRPC_CLOSURE_INIT(closure, cb, cb_arg, scheduler)
size_t grpc_pollset_size(void)
GPRAPI void gpr_mu_unlock(gpr_mu *mu)
static uv_pipe_t incoming[4]
gpr_timespec grpc_timeout_seconds_to_deadline(int64_t time_s)
GPRAPI void grpc_slice_buffer_addn(grpc_slice_buffer *sb, grpc_slice *slices, size_t n)
bool grpc_event_engine_can_track_errors()
static void read_cb(uv_stream_t *stream, ssize_t nread, const uv_buf_t *buf)
return memset(p, 0, total)
void grpc_tcp_destroy_and_release_fd(grpc_endpoint *ep, int *fd, grpc_closure *done)
struct grpc_resource_quota grpc_resource_quota
void grpc_endpoint_tests(grpc_endpoint_test_config config, grpc_pollset *pollset, gpr_mu *mu)
#define GRPC_ARG_RESOURCE_QUOTA
GPRAPI void gpr_free(void *ptr)
void grpc_endpoint_read(grpc_endpoint *ep, grpc_slice_buffer *slices, grpc_closure *cb, bool urgent, int min_progress_size)
void grpc_tcp_set_write_timestamps_callback(void(*fn)(void *, Timestamps *, grpc_error_handle error))
GPRAPI void * gpr_malloc(size_t size)
OPENSSL_EXPORT pem_password_cb void * u
GRPCAPI grpc_resource_quota * grpc_resource_quota_create(const char *trace_name)
static grpc_slice * allocate_blocks(size_t num_bytes, size_t slice_size, size_t *num_blocks, uint8_t *current_data)
grpc_error_handle grpc_pollset_work(grpc_pollset *pollset, grpc_pollset_worker **worker, grpc_core::Timestamp deadline)
#define GRPC_LOG_IF_ERROR(what, error)
static grpc_pollset * g_pollset
GPRAPI grpc_slice grpc_slice_malloc(size_t length)
void grpc_pollset_init(grpc_pollset *pollset, gpr_mu **mu)
GPRAPI void * gpr_zalloc(size_t size)
int run_tests(int benchmark_output)
std::unique_ptr< Server > server
BufferTimestamp sendmsg_time
size_t count_slices(grpc_slice *slices, size_t nslices, int *current_data)
BufferTimestamp acked_time
const GRPCAPI grpc_arg_pointer_vtable * grpc_resource_quota_arg_vtable(void)
gpr_timespec grpc_timeout_milliseconds_to_deadline(int64_t time_ms)
GPRAPI void gpr_log(const char *file, int line, gpr_log_severity severity, const char *format,...) GPR_PRINT_FORMAT_CHECK(4
grpc_fd * grpc_fd_create(int fd, const char *name, bool track_err)
#define gpr_atm_acq_load(p)
grpc_endpoint * grpc_tcp_create(grpc_fd *fd, const grpc_channel_args *args, absl::string_view peer_string)
#define GRPC_SLICE_START_PTR(slice)
#define gpr_atm_rel_store(p, value)
static grpc_end2end_test_config configs[]
static int read_bytes(int fd, char *buf, size_t read_size, int spin)
GPRAPI void gpr_mu_lock(gpr_mu *mu)
static void clean_up(void)
void grpc_endpoint_destroy(grpc_endpoint *ep)
int main(int argc, char **argv)
grpc_error_handle grpc_pollset_kick(grpc_pollset *pollset, grpc_pollset_worker *specific_worker)
#define GPR_GLOBAL_CONFIG_SET(name, value)
GPRAPI void grpc_slice_buffer_init(grpc_slice_buffer *sb)
#define GRPC_SLICE_LENGTH(slice)
gpr_clock_type clock_type
#define GPR_ARRAY_SIZE(array)
int read(izstream &zs, T *x, Items items)
static void destroy_pollset(void *p, grpc_error_handle)
grpc_core::ExecCtx exec_ctx
def listen(endpoint, test_case)
#define GPR_GLOBAL_CONFIG_DECLARE_BOOL(name)
void grpc_pollset_shutdown(grpc_pollset *pollset, grpc_closure *closure)
GRPCAPI void grpc_resource_quota_unref(grpc_resource_quota *resource_quota)
UniquePtr< SSL_SESSION > ret
void grpc_slice_buffer_destroy_internal(grpc_slice_buffer *sb)
BufferTimestamp scheduled_time
void grpc_endpoint_write(grpc_endpoint *ep, grpc_slice_buffer *slices, grpc_closure *cb, void *arg, int max_frame_size)
GRPCAPI void grpc_init(void)
static Timestamp FromTimespecRoundUp(gpr_timespec t)
#define GRPC_ARG_TCP_READ_CHUNK_SIZE
void grpc_endpoint_add_to_pollset(grpc_endpoint *ep, grpc_pollset *pollset)
void grpc_pollset_destroy(grpc_pollset *pollset)
GRPCAPI void grpc_shutdown(void)
#define GRPC_ERROR_IS_NONE(err)
grpc
Author(s):
autogenerated on Thu Mar 13 2025 03:01:29