tcp_posix_test.cc
Go to the documentation of this file.
1 /*
2  *
3  * Copyright 2015 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
21 
22 // This test won't work except with posix sockets enabled
23 #ifdef GRPC_POSIX_SOCKET_TCP
24 
25 #include <errno.h>
26 #include <fcntl.h>
27 #include <limits.h>
28 #include <string.h>
29 #include <sys/socket.h>
30 #include <sys/types.h>
31 #include <unistd.h>
32 
33 #include <grpc/grpc.h>
34 #include <grpc/support/alloc.h>
35 #include <grpc/support/log.h>
36 #include <grpc/support/time.h>
37 
46 
47 static gpr_mu* g_mu;
48 static grpc_pollset* g_pollset;
49 
50 GPR_GLOBAL_CONFIG_DECLARE_BOOL(grpc_experimental_enable_tcp_frame_size_tuning);
51 
52 /*
53  General test notes:
54 
55  All tests which write data into a socket write i%256 into byte i, which is
56  verified by readers.
57 
58  In general there are a few interesting things to vary which may lead to
59  exercising different codepaths in an implementation:
60  1. Total amount of data written to the socket
61  2. Size of slice allocations
62  3. Amount of data we read from or write to the socket at once
63 
64  The tests here tend to parameterize these where applicable.
65 
66  */
67 
68 static void create_sockets(int sv[2]) {
69  int flags;
70  GPR_ASSERT(socketpair(AF_UNIX, SOCK_STREAM, 0, sv) == 0);
71  flags = fcntl(sv[0], F_GETFL, 0);
72  GPR_ASSERT(fcntl(sv[0], F_SETFL, flags | O_NONBLOCK) == 0);
73  flags = fcntl(sv[1], F_GETFL, 0);
74  GPR_ASSERT(fcntl(sv[1], F_SETFL, flags | O_NONBLOCK) == 0);
75 }
76 
77 static void create_inet_sockets(int sv[2]) {
78  /* Prepare listening socket */
79  struct sockaddr_in addr;
80  memset(&addr, 0, sizeof(struct sockaddr_in));
81  addr.sin_family = AF_INET;
82  int sock = socket(AF_INET, SOCK_STREAM, 0);
83  GPR_ASSERT(sock);
84  GPR_ASSERT(bind(sock, (sockaddr*)&addr, sizeof(sockaddr_in)) == 0);
85  listen(sock, 1);
86 
87  /* Prepare client socket and connect to server */
88  socklen_t len = sizeof(sockaddr_in);
89  GPR_ASSERT(getsockname(sock, (sockaddr*)&addr, &len) == 0);
90 
91  int client = socket(AF_INET, SOCK_STREAM, 0);
93  int ret;
94  do {
95  ret = connect(client, reinterpret_cast<sockaddr*>(&addr),
96  sizeof(sockaddr_in));
97  } while (ret == -1 && errno == EINTR);
98 
99  /* Accept client connection */
100  len = sizeof(socklen_t);
101  int server;
102  do {
103  server = accept(sock, reinterpret_cast<sockaddr*>(&addr), &len);
104  } while (server == -1 && errno == EINTR);
105  GPR_ASSERT(server != -1);
106 
107  sv[0] = server;
108  sv[1] = client;
109  int flags = fcntl(sv[0], F_GETFL, 0);
110  GPR_ASSERT(fcntl(sv[0], F_SETFL, flags | O_NONBLOCK) == 0);
111  flags = fcntl(sv[1], F_GETFL, 0);
112  GPR_ASSERT(fcntl(sv[1], F_SETFL, flags | O_NONBLOCK) == 0);
113 }
114 
115 static ssize_t fill_socket(int fd) {
116  ssize_t write_bytes;
117  ssize_t total_bytes = 0;
118  int i;
119  unsigned char buf[256];
120  for (i = 0; i < 256; ++i) {
121  buf[i] = static_cast<uint8_t>(i);
122  }
123  do {
124  write_bytes = write(fd, buf, 256);
125  if (write_bytes > 0) {
126  total_bytes += write_bytes;
127  }
128  } while (write_bytes >= 0 || errno == EINTR);
129  GPR_ASSERT(errno == EAGAIN);
130  return total_bytes;
131 }
132 
133 static size_t fill_socket_partial(int fd, size_t bytes) {
134  ssize_t write_bytes;
135  size_t total_bytes = 0;
136  unsigned char* buf = static_cast<unsigned char*>(gpr_malloc(bytes));
137  unsigned i;
138  for (i = 0; i < bytes; ++i) {
139  buf[i] = static_cast<uint8_t>(i % 256);
140  }
141 
142  do {
143  write_bytes = write(fd, buf, bytes - total_bytes);
144  if (write_bytes > 0) {
145  total_bytes += static_cast<size_t>(write_bytes);
146  }
147  } while ((write_bytes >= 0 || errno == EINTR) && bytes > total_bytes);
148 
149  gpr_free(buf);
150 
151  return total_bytes;
152 }
153 
154 struct read_socket_state {
155  grpc_endpoint* ep;
156  int min_progress_size;
157  size_t read_bytes;
158  size_t target_read_bytes;
161 };
162 
163 static size_t count_slices(grpc_slice* slices, size_t nslices,
164  int* current_data) {
165  size_t num_bytes = 0;
166  unsigned i, j;
167  unsigned char* buf;
168  for (i = 0; i < nslices; ++i) {
170  for (j = 0; j < GRPC_SLICE_LENGTH(slices[i]); ++j) {
171  GPR_ASSERT(buf[j] == *current_data);
172  *current_data = (*current_data + 1) % 256;
173  }
174  num_bytes += GRPC_SLICE_LENGTH(slices[i]);
175  }
176  return num_bytes;
177 }
178 
179 static void read_cb(void* user_data, grpc_error_handle error) {
180  struct read_socket_state* state =
181  static_cast<struct read_socket_state*>(user_data);
182  size_t read_bytes;
183  int current_data;
184 
186 
187  gpr_mu_lock(g_mu);
188  current_data = state->read_bytes % 256;
189  // The number of bytes read each time this callback is invoked must be >=
190  // the min_progress_size.
191  GPR_ASSERT(state->min_progress_size <= state->incoming.length);
192  read_bytes = count_slices(state->incoming.slices, state->incoming.count,
193  &current_data);
194  state->read_bytes += read_bytes;
195  gpr_log(GPR_INFO, "Read %" PRIuPTR " bytes of %" PRIuPTR, read_bytes,
196  state->target_read_bytes);
197  if (state->read_bytes >= state->target_read_bytes) {
198  GPR_ASSERT(
199  GRPC_LOG_IF_ERROR("kick", grpc_pollset_kick(g_pollset, nullptr)));
201  } else {
203  state->min_progress_size = state->target_read_bytes - state->read_bytes;
204  grpc_endpoint_read(state->ep, &state->incoming, &state->read_cb,
205  /*urgent=*/false, state->min_progress_size);
206  }
207 }
208 
209 /* Write to a socket, then read from it using the grpc_tcp API. */
210 static void read_test(size_t num_bytes, size_t slice_size,
211  int min_progress_size) {
212  int sv[2];
213  grpc_endpoint* ep;
214  struct read_socket_state state;
215  size_t written_bytes;
219 
220  gpr_log(GPR_INFO, "Read test of size %" PRIuPTR ", slice size %" PRIuPTR,
221  num_bytes, slice_size);
222 
223  create_sockets(sv);
224 
225  grpc_arg a[2];
226  a[0].key = const_cast<char*>(GRPC_ARG_TCP_READ_CHUNK_SIZE);
227  a[0].type = GRPC_ARG_INTEGER,
228  a[0].value.integer = static_cast<int>(slice_size);
229  a[1].key = const_cast<char*>(GRPC_ARG_RESOURCE_QUOTA);
230  a[1].type = GRPC_ARG_POINTER;
231  a[1].value.pointer.p = grpc_resource_quota_create("test");
232  a[1].value.pointer.vtable = grpc_resource_quota_arg_vtable();
234  ep =
235  grpc_tcp_create(grpc_fd_create(sv[1], "read_test", false), &args, "test");
237 
238  written_bytes = fill_socket_partial(sv[0], num_bytes);
239  gpr_log(GPR_INFO, "Wrote %" PRIuPTR " bytes", written_bytes);
240 
241  state.ep = ep;
242  state.read_bytes = 0;
243  state.target_read_bytes = written_bytes;
244  state.min_progress_size =
245  std::min(min_progress_size, static_cast<int>(written_bytes));
246  grpc_slice_buffer_init(&state.incoming);
247  GRPC_CLOSURE_INIT(&state.read_cb, read_cb, &state, grpc_schedule_on_exec_ctx);
248 
249  grpc_endpoint_read(ep, &state.incoming, &state.read_cb, /*urgent=*/false,
250  /*min_progress_size=*/state.min_progress_size);
251 
252  gpr_mu_lock(g_mu);
253  while (state.read_bytes < state.target_read_bytes) {
254  grpc_pollset_worker* worker = nullptr;
256  "pollset_work", grpc_pollset_work(g_pollset, &worker, deadline)));
258 
259  gpr_mu_lock(g_mu);
260  }
261  GPR_ASSERT(state.read_bytes == state.target_read_bytes);
263 
267  static_cast<grpc_resource_quota*>(a[1].value.pointer.p));
268 }
269 
270 /* Write to a socket until it fills up, then read from it using the grpc_tcp
271  API. */
272 static void large_read_test(size_t slice_size, int min_progress_size) {
273  int sv[2];
274  grpc_endpoint* ep;
275  struct read_socket_state state;
276  ssize_t written_bytes;
280 
281  gpr_log(GPR_INFO, "Start large read test, slice size %" PRIuPTR, slice_size);
282 
283  create_sockets(sv);
284 
285  grpc_arg a[2];
286  a[0].key = const_cast<char*>(GRPC_ARG_TCP_READ_CHUNK_SIZE);
287  a[0].type = GRPC_ARG_INTEGER;
288  a[0].value.integer = static_cast<int>(slice_size);
289  a[1].key = const_cast<char*>(GRPC_ARG_RESOURCE_QUOTA);
290  a[1].type = GRPC_ARG_POINTER;
291  a[1].value.pointer.p = grpc_resource_quota_create("test");
292  a[1].value.pointer.vtable = grpc_resource_quota_arg_vtable();
294  ep = grpc_tcp_create(grpc_fd_create(sv[1], "large_read_test", false), &args,
295  "test");
297 
298  written_bytes = fill_socket(sv[0]);
299  gpr_log(GPR_INFO, "Wrote %" PRIuPTR " bytes", written_bytes);
300 
301  state.ep = ep;
302  state.read_bytes = 0;
303  state.target_read_bytes = static_cast<size_t>(written_bytes);
304  state.min_progress_size =
305  std::min(min_progress_size, static_cast<int>(written_bytes));
306  grpc_slice_buffer_init(&state.incoming);
307  GRPC_CLOSURE_INIT(&state.read_cb, read_cb, &state, grpc_schedule_on_exec_ctx);
308 
309  grpc_endpoint_read(ep, &state.incoming, &state.read_cb, /*urgent=*/false,
310  /*min_progress_size=*/state.min_progress_size);
311 
312  gpr_mu_lock(g_mu);
313  while (state.read_bytes < state.target_read_bytes) {
314  grpc_pollset_worker* worker = nullptr;
316  "pollset_work", grpc_pollset_work(g_pollset, &worker, deadline)));
318 
319  gpr_mu_lock(g_mu);
320  }
321  GPR_ASSERT(state.read_bytes == state.target_read_bytes);
323 
327  static_cast<grpc_resource_quota*>(a[1].value.pointer.p));
328 }
329 
330 struct write_socket_state {
331  grpc_endpoint* ep;
332  int write_done;
333 };
334 
335 static grpc_slice* allocate_blocks(size_t num_bytes, size_t slice_size,
336  size_t* num_blocks, uint8_t* current_data) {
337  size_t nslices = num_bytes / slice_size + (num_bytes % slice_size ? 1u : 0u);
338  grpc_slice* slices =
339  static_cast<grpc_slice*>(gpr_malloc(sizeof(grpc_slice) * nslices));
340  size_t num_bytes_left = num_bytes;
341  unsigned i, j;
342  unsigned char* buf;
343  *num_blocks = nslices;
344 
345  for (i = 0; i < nslices; ++i) {
346  slices[i] = grpc_slice_malloc(slice_size > num_bytes_left ? num_bytes_left
347  : slice_size);
348  num_bytes_left -= GRPC_SLICE_LENGTH(slices[i]);
350  for (j = 0; j < GRPC_SLICE_LENGTH(slices[i]); ++j) {
351  buf[j] = *current_data;
352  (*current_data)++;
353  }
354  }
355  GPR_ASSERT(num_bytes_left == 0);
356  return slices;
357 }
358 
359 static void write_done(void* user_data /* write_socket_state */,
362  struct write_socket_state* state =
363  static_cast<struct write_socket_state*>(user_data);
364  gpr_mu_lock(g_mu);
365  state->write_done = 1;
366  GPR_ASSERT(
367  GRPC_LOG_IF_ERROR("pollset_kick", grpc_pollset_kick(g_pollset, nullptr)));
369 }
370 
371 void drain_socket_blocking(int fd, size_t num_bytes, size_t read_size) {
372  unsigned char* buf = static_cast<unsigned char*>(gpr_malloc(read_size));
374  size_t bytes_left = num_bytes;
375  int flags;
376  int current = 0;
377  int i;
379 
380  flags = fcntl(fd, F_GETFL, 0);
381  GPR_ASSERT(fcntl(fd, F_SETFL, flags & ~O_NONBLOCK) == 0);
382 
383  for (;;) {
384  grpc_pollset_worker* worker = nullptr;
385  gpr_mu_lock(g_mu);
387  "pollset_work",
392 
393  do {
394  bytes_read =
395  read(fd, buf, bytes_left > read_size ? read_size : bytes_left);
396  } while (bytes_read < 0 && errno == EINTR);
397  GPR_ASSERT(bytes_read >= 0);
398  for (i = 0; i < bytes_read; ++i) {
399  GPR_ASSERT(buf[i] == current);
400  current = (current + 1) % 256;
401  }
402  bytes_left -= static_cast<size_t>(bytes_read);
403  if (bytes_left == 0) break;
404  }
405  flags = fcntl(fd, F_GETFL, 0);
406  GPR_ASSERT(fcntl(fd, F_SETFL, flags | O_NONBLOCK) == 0);
407 
408  gpr_free(buf);
409 }
410 
411 /* Verifier for timestamps callback for write_test */
412 void timestamps_verifier(void* arg, grpc_core::Timestamps* ts,
415  GPR_ASSERT(arg != nullptr);
419  gpr_atm* done_timestamps = static_cast<gpr_atm*>(arg);
420  gpr_atm_rel_store(done_timestamps, static_cast<gpr_atm>(1));
421 }
422 
423 /* Write to a socket using the grpc_tcp API, then drain it directly.
424  Note that if the write does not complete immediately we need to drain the
425  socket in parallel with the read. If collect_timestamps is true, it will
426  try to get timestamps for the write. */
427 static void write_test(size_t num_bytes, size_t slice_size,
428  bool collect_timestamps) {
429  int sv[2];
430  grpc_endpoint* ep;
431  struct write_socket_state state;
432  size_t num_blocks;
434  uint8_t current_data = 0;
435  grpc_slice_buffer outgoing;
436  grpc_closure write_done_closure;
440 
441  if (collect_timestamps && !grpc_event_engine_can_track_errors()) {
442  return;
443  }
444 
446  "Start write test with %" PRIuPTR " bytes, slice size %" PRIuPTR,
447  num_bytes, slice_size);
448 
449  if (collect_timestamps) {
450  create_inet_sockets(sv);
451  } else {
452  create_sockets(sv);
453  }
454 
455  grpc_arg a[2];
456  a[0].key = const_cast<char*>(GRPC_ARG_TCP_READ_CHUNK_SIZE);
457  a[0].type = GRPC_ARG_INTEGER,
458  a[0].value.integer = static_cast<int>(slice_size);
459  a[1].key = const_cast<char*>(GRPC_ARG_RESOURCE_QUOTA);
460  a[1].type = GRPC_ARG_POINTER;
461  a[1].value.pointer.p = grpc_resource_quota_create("test");
462  a[1].value.pointer.vtable = grpc_resource_quota_arg_vtable();
464  ep = grpc_tcp_create(grpc_fd_create(sv[1], "write_test", collect_timestamps),
465  &args, "test");
467 
468  state.ep = ep;
469  state.write_done = 0;
470 
471  slices = allocate_blocks(num_bytes, slice_size, &num_blocks, &current_data);
472 
473  grpc_slice_buffer_init(&outgoing);
474  grpc_slice_buffer_addn(&outgoing, slices, num_blocks);
475  GRPC_CLOSURE_INIT(&write_done_closure, write_done, &state,
476  grpc_schedule_on_exec_ctx);
477 
478  gpr_atm done_timestamps;
479  gpr_atm_rel_store(&done_timestamps, static_cast<gpr_atm>(0));
480  grpc_endpoint_write(ep, &outgoing, &write_done_closure,
481  grpc_event_engine_can_track_errors() && collect_timestamps
482  ? &done_timestamps
483  : nullptr,
484  /*max_frame_size=*/INT_MAX);
485  drain_socket_blocking(sv[0], num_bytes, num_bytes);
486  exec_ctx.Flush();
487  gpr_mu_lock(g_mu);
488  for (;;) {
489  grpc_pollset_worker* worker = nullptr;
490  if (state.write_done &&
491  (!(grpc_event_engine_can_track_errors() && collect_timestamps) ||
492  gpr_atm_acq_load(&done_timestamps) == static_cast<gpr_atm>(1))) {
493  break;
494  }
496  "pollset_work", grpc_pollset_work(g_pollset, &worker, deadline)));
498  exec_ctx.Flush();
499  gpr_mu_lock(g_mu);
500  }
502 
505  gpr_free(slices);
507  static_cast<grpc_resource_quota*>(a[1].value.pointer.p));
508 }
509 
510 void on_fd_released(void* arg, grpc_error_handle /*errors*/) {
511  int* done = static_cast<int*>(arg);
512  *done = 1;
513  GPR_ASSERT(
514  GRPC_LOG_IF_ERROR("pollset_kick", grpc_pollset_kick(g_pollset, nullptr)));
515 }
516 
517 /* Do a read_test, then release fd and try to read/write again. Verify that
518  grpc_tcp_fd() is available before the fd is released. */
519 static void release_fd_test(size_t num_bytes, size_t slice_size) {
520  int sv[2];
521  grpc_endpoint* ep;
522  struct read_socket_state state;
523  size_t written_bytes;
524  int fd;
528  grpc_closure fd_released_cb;
529  int fd_released_done = 0;
530  GRPC_CLOSURE_INIT(&fd_released_cb, &on_fd_released, &fd_released_done,
531  grpc_schedule_on_exec_ctx);
532 
534  "Release fd read_test of size %" PRIuPTR ", slice size %" PRIuPTR,
535  num_bytes, slice_size);
536 
537  create_sockets(sv);
538 
539  grpc_arg a[2];
540  a[0].key = const_cast<char*>(GRPC_ARG_TCP_READ_CHUNK_SIZE);
541  a[0].type = GRPC_ARG_INTEGER;
542  a[0].value.integer = static_cast<int>(slice_size);
543  a[1].key = const_cast<char*>(GRPC_ARG_RESOURCE_QUOTA);
544  a[1].type = GRPC_ARG_POINTER;
545  a[1].value.pointer.p = grpc_resource_quota_create("test");
546  a[1].value.pointer.vtable = grpc_resource_quota_arg_vtable();
548  ep =
549  grpc_tcp_create(grpc_fd_create(sv[1], "read_test", false), &args, "test");
550  GPR_ASSERT(grpc_tcp_fd(ep) == sv[1] && sv[1] >= 0);
552 
553  written_bytes = fill_socket_partial(sv[0], num_bytes);
554  gpr_log(GPR_INFO, "Wrote %" PRIuPTR " bytes", written_bytes);
555 
556  state.ep = ep;
557  state.read_bytes = 0;
558  state.target_read_bytes = written_bytes;
559  state.min_progress_size = 1;
560  grpc_slice_buffer_init(&state.incoming);
561  GRPC_CLOSURE_INIT(&state.read_cb, read_cb, &state, grpc_schedule_on_exec_ctx);
562 
563  grpc_endpoint_read(ep, &state.incoming, &state.read_cb, /*urgent=*/false,
564  /*min_progress_size=*/state.min_progress_size);
565 
566  gpr_mu_lock(g_mu);
567  while (state.read_bytes < state.target_read_bytes) {
568  grpc_pollset_worker* worker = nullptr;
570  "pollset_work", grpc_pollset_work(g_pollset, &worker, deadline)));
571  gpr_log(GPR_DEBUG, "wakeup: read=%" PRIdPTR " target=%" PRIdPTR,
572  state.read_bytes, state.target_read_bytes);
575  gpr_mu_lock(g_mu);
576  }
577  GPR_ASSERT(state.read_bytes == state.target_read_bytes);
579 
581  grpc_tcp_destroy_and_release_fd(ep, &fd, &fd_released_cb);
583  gpr_mu_lock(g_mu);
584  while (!fd_released_done) {
585  grpc_pollset_worker* worker = nullptr;
587  "pollset_work", grpc_pollset_work(g_pollset, &worker, deadline)));
588  gpr_log(GPR_DEBUG, "wakeup: fd_released_done=%d", fd_released_done);
589  }
591  GPR_ASSERT(fd_released_done == 1);
592  GPR_ASSERT(fd == sv[1]);
593 
594  written_bytes = fill_socket_partial(sv[0], num_bytes);
595  drain_socket_blocking(fd, written_bytes, written_bytes);
596  written_bytes = fill_socket_partial(fd, num_bytes);
597  drain_socket_blocking(sv[0], written_bytes, written_bytes);
598  close(fd);
600  static_cast<grpc_resource_quota*>(a[1].value.pointer.p));
601 }
602 
603 void run_tests(void) {
604  size_t i = 0;
605  for (int i = 1; i <= 8192; i = i * 2) {
606  read_test(100, 8192, i);
607  read_test(10000, 8192, i);
608  read_test(10000, 137, i);
609  read_test(10000, 1, i);
610  large_read_test(8192, i);
611  large_read_test(1, i);
612  }
613  write_test(100, 8192, false);
614  write_test(100, 1, false);
615  write_test(100000, 8192, false);
616  write_test(100000, 1, false);
617  write_test(100000, 137, false);
618 
619  write_test(100, 8192, true);
620  write_test(100, 1, true);
621  write_test(100000, 8192, true);
622  write_test(100000, 1, true);
623  write_test(100, 137, true);
624 
625  for (i = 1; i < 1000; i = std::max(i + 1, i * 5 / 4)) {
626  write_test(40320, i, false);
627  write_test(40320, i, true);
628  }
629 
630  release_fd_test(100, 8192);
631 }
632 
633 static void clean_up(void) {}
634 
635 static grpc_endpoint_test_fixture create_fixture_tcp_socketpair(
636  size_t slice_size) {
637  int sv[2];
640 
641  create_sockets(sv);
642  grpc_arg a[2];
643  a[0].key = const_cast<char*>(GRPC_ARG_TCP_READ_CHUNK_SIZE);
644  a[0].type = GRPC_ARG_INTEGER;
645  a[0].value.integer = static_cast<int>(slice_size);
646  a[1].key = const_cast<char*>(GRPC_ARG_RESOURCE_QUOTA);
647  a[1].type = GRPC_ARG_POINTER;
648  a[1].value.pointer.p = grpc_resource_quota_create("test");
649  a[1].value.pointer.vtable = grpc_resource_quota_arg_vtable();
651  f.client_ep = grpc_tcp_create(grpc_fd_create(sv[0], "fixture:client", false),
652  &args, "test");
653  f.server_ep = grpc_tcp_create(grpc_fd_create(sv[1], "fixture:server", false),
654  &args, "test");
658  static_cast<grpc_resource_quota*>(a[1].value.pointer.p));
659 
660  return f;
661 }
662 
664  {"tcp/tcp_socketpair", create_fixture_tcp_socketpair, clean_up},
665 };
666 
667 static void destroy_pollset(void* p, grpc_error_handle /*error*/) {
668  grpc_pollset_destroy(static_cast<grpc_pollset*>(p));
669 }
670 
671 int main(int argc, char** argv) {
672  grpc_closure destroyed;
673  grpc::testing::TestEnvironment env(&argc, argv);
674  GPR_GLOBAL_CONFIG_SET(grpc_experimental_enable_tcp_frame_size_tuning, true);
675  grpc_init();
677  {
682  run_tests();
684  grpc_schedule_on_exec_ctx);
685  grpc_pollset_shutdown(g_pollset, &destroyed);
686 
688  }
689  grpc_shutdown();
691 
692  return 0;
693 }
694 
695 #else /* GRPC_POSIX_SOCKET_TCP */
696 
697 int main(int argc, char** argv) { return 1; }
698 
699 #endif /* GRPC_POSIX_SOCKET_TCP */
grpc_pollset_worker
struct grpc_pollset_worker grpc_pollset_worker
Definition: pollset.h:39
grpc_tcp_fd
int grpc_tcp_fd(grpc_endpoint *ep)
GRPC_CLOSURE_INIT
#define GRPC_CLOSURE_INIT(closure, cb, cb_arg, scheduler)
Definition: closure.h:115
grpc_arg
Definition: grpc_types.h:103
GPR_INFO
#define GPR_INFO
Definition: include/grpc/impl/codegen/log.h:56
grpc_pollset_size
size_t grpc_pollset_size(void)
Definition: pollset.cc:56
gpr_mu_unlock
GPRAPI void gpr_mu_unlock(gpr_mu *mu)
incoming
static uv_pipe_t incoming[4]
Definition: test-pipe-sendmsg.c:38
grpc_timeout_seconds_to_deadline
gpr_timespec grpc_timeout_seconds_to_deadline(int64_t time_s)
Definition: test/core/util/test_config.cc:81
log.h
grpc_slice_buffer_addn
GPRAPI void grpc_slice_buffer_addn(grpc_slice_buffer *sb, grpc_slice *slices, size_t n)
Definition: slice/slice_buffer.cc:224
absl::str_format_internal::LengthMod::j
@ j
grpc_endpoint_test_config
Definition: endpoint_tests.h:34
grpc_event_engine_can_track_errors
bool grpc_event_engine_can_track_errors()
read_cb
static void read_cb(uv_stream_t *stream, ssize_t nread, const uv_buf_t *buf)
Definition: benchmark-pound.c:138
generate.env
env
Definition: generate.py:37
GRPC_ARG_INTEGER
@ GRPC_ARG_INTEGER
Definition: grpc_types.h:81
memset
return memset(p, 0, total)
grpc_tcp_destroy_and_release_fd
void grpc_tcp_destroy_and_release_fd(grpc_endpoint *ep, int *fd, grpc_closure *done)
grpc_resource_quota
struct grpc_resource_quota grpc_resource_quota
Definition: grpc_types.h:729
buffer_list.h
grpc_endpoint_tests
void grpc_endpoint_tests(grpc_endpoint_test_config config, grpc_pollset *pollset, gpr_mu *mu)
Definition: endpoint_tests.cc:345
write
#define write
Definition: test-fs.c:47
GRPC_ARG_RESOURCE_QUOTA
#define GRPC_ARG_RESOURCE_QUOTA
Definition: grpc_types.h:299
client
Definition: examples/python/async_streaming/client.py:1
string.h
buf
voidpf void * buf
Definition: bloaty/third_party/zlib/contrib/minizip/ioapi.h:136
grpc_core::Timestamp
Definition: src/core/lib/gprpp/time.h:62
gpr_free
GPRAPI void gpr_free(void *ptr)
Definition: alloc.cc:51
grpc_endpoint_read
void grpc_endpoint_read(grpc_endpoint *ep, grpc_slice_buffer *slices, grpc_closure *cb, bool urgent, int min_progress_size)
Definition: endpoint.cc:25
useful.h
error
grpc_error_handle error
Definition: retry_filter.cc:499
grpc_core::grpc_tcp_set_write_timestamps_callback
void grpc_tcp_set_write_timestamps_callback(void(*fn)(void *, Timestamps *, grpc_error_handle error))
Definition: buffer_list.cc:297
gpr_malloc
GPRAPI void * gpr_malloc(size_t size)
Definition: alloc.cc:29
u
OPENSSL_EXPORT pem_password_cb void * u
Definition: pem.h:351
grpc_resource_quota_create
GRPCAPI grpc_resource_quota * grpc_resource_quota_create(const char *trace_name)
Definition: api.cc:66
allocate_blocks
static grpc_slice * allocate_blocks(size_t num_bytes, size_t slice_size, size_t *num_blocks, uint8_t *current_data)
Definition: endpoint_tests.cc:79
grpc_pollset_work
grpc_error_handle grpc_pollset_work(grpc_pollset *pollset, grpc_pollset_worker **worker, grpc_core::Timestamp deadline)
Definition: pollset.cc:45
GRPC_LOG_IF_ERROR
#define GRPC_LOG_IF_ERROR(what, error)
Definition: error.h:398
time.h
a
int a
Definition: abseil-cpp/absl/container/internal/hash_policy_traits_test.cc:88
xds_manager.p
p
Definition: xds_manager.py:60
uint8_t
unsigned char uint8_t
Definition: stdint-msvc2008.h:78
g_pollset
static grpc_pollset * g_pollset
Definition: endpoint_pair_test.cc:31
grpc_slice_malloc
GPRAPI grpc_slice grpc_slice_malloc(size_t length)
Definition: slice/slice.cc:227
grpc_channel_args
Definition: grpc_types.h:132
grpc_core::Timestamps
Definition: buffer_list.h:90
grpc_core::BufferTimestamp::time
gpr_timespec time
Definition: buffer_list.h:86
gen_build_yaml.struct
def struct(**kwargs)
Definition: test/core/end2end/gen_build_yaml.py:30
grpc_pollset_init
void grpc_pollset_init(grpc_pollset *pollset, gpr_mu **mu)
Definition: pollset.cc:33
gpr_zalloc
GPRAPI void * gpr_zalloc(size_t size)
Definition: alloc.cc:40
client
static uv_tcp_t client
Definition: test-callback-stack.c:33
run_tests
int run_tests(int benchmark_output)
Definition: runner.c:77
server
std::unique_ptr< Server > server
Definition: channelz_service_test.cc:330
autogen_x86imm.f
f
Definition: autogen_x86imm.py:9
ssize_t
intptr_t ssize_t
Definition: win.h:27
asyncio_get_stats.args
args
Definition: asyncio_get_stats.py:40
grpc_core::Timestamps::sendmsg_time
BufferTimestamp sendmsg_time
Definition: buffer_list.h:91
count_slices
size_t count_slices(grpc_slice *slices, size_t nslices, int *current_data)
Definition: endpoint_tests.cc:54
GPR_ASSERT
#define GPR_ASSERT(x)
Definition: include/grpc/impl/codegen/log.h:94
grpc_core::Timestamps::acked_time
BufferTimestamp acked_time
Definition: buffer_list.h:94
ev_posix.h
max
int max
Definition: bloaty/third_party/zlib/examples/enough.c:170
grpc_core::ExecCtx::Flush
bool Flush()
Definition: exec_ctx.cc:69
grpc_resource_quota_arg_vtable
const GRPCAPI grpc_arg_pointer_vtable * grpc_resource_quota_arg_vtable(void)
Definition: api.cc:62
grpc_timeout_milliseconds_to_deadline
gpr_timespec grpc_timeout_milliseconds_to_deadline(int64_t time_ms)
Definition: test/core/util/test_config.cc:89
gpr_log
GPRAPI void gpr_log(const char *file, int line, gpr_log_severity severity, const char *format,...) GPR_PRINT_FORMAT_CHECK(4
worker
Definition: worker.py:1
grpc_fd_create
grpc_fd * grpc_fd_create(int fd, const char *name, bool track_err)
tcp_posix.h
grpc.h
gpr_atm_acq_load
#define gpr_atm_acq_load(p)
Definition: impl/codegen/atm_gcc_atomic.h:52
bytes_read
static size_t bytes_read
Definition: test-ipc-heavy-traffic-deadlock-bug.c:47
grpc_tcp_create
grpc_endpoint * grpc_tcp_create(grpc_fd *fd, const grpc_channel_args *args, absl::string_view peer_string)
done
struct tab * done
Definition: bloaty/third_party/zlib/examples/enough.c:176
GRPC_SLICE_START_PTR
#define GRPC_SLICE_START_PTR(slice)
Definition: include/grpc/impl/codegen/slice.h:101
arg
Definition: cmdline.cc:40
time.h
gpr_atm_rel_store
#define gpr_atm_rel_store(p, value)
Definition: impl/codegen/atm_gcc_atomic.h:54
grpc_slice
Definition: include/grpc/impl/codegen/slice.h:65
configs
static grpc_end2end_test_config configs[]
Definition: h2_census.cc:111
close
#define close
Definition: test-fs.c:48
read_bytes
static int read_bytes(int fd, char *buf, size_t read_size, int spin)
Definition: low_level_ping_pong.cc:71
gpr_mu_lock
GPRAPI void gpr_mu_lock(gpr_mu *mu)
clean_up
static void clean_up(void)
Definition: endpoint_pair_test.cc:33
slice_internal.h
min
#define min(a, b)
Definition: qsort.h:83
grpc_endpoint_destroy
void grpc_endpoint_destroy(grpc_endpoint *ep)
Definition: endpoint.cc:53
main
int main(int argc, char **argv)
Definition: tcp_posix_test.cc:697
grpc_pollset_kick
grpc_error_handle grpc_pollset_kick(grpc_pollset *pollset, grpc_pollset_worker *specific_worker)
Definition: pollset.cc:51
grpc_core::ExecCtx
Definition: exec_ctx.h:97
GPR_GLOBAL_CONFIG_SET
#define GPR_GLOBAL_CONFIG_SET(name, value)
Definition: global_config_generic.h:26
grpc_slice_buffer_init
GPRAPI void grpc_slice_buffer_init(grpc_slice_buffer *sb)
Definition: slice/slice_buffer.cc:116
GRPC_SLICE_LENGTH
#define GRPC_SLICE_LENGTH(slice)
Definition: include/grpc/impl/codegen/slice.h:104
gpr_timespec::clock_type
gpr_clock_type clock_type
Definition: gpr_types.h:55
test_config.h
value
const char * value
Definition: hpack_parser_table.cc:165
gpr_atm
intptr_t gpr_atm
Definition: impl/codegen/atm_gcc_atomic.h:32
g_mu
static gpr_mu g_mu
Definition: iomgr.cc:55
GPR_ARRAY_SIZE
#define GPR_ARRAY_SIZE(array)
Definition: useful.h:129
read
int read(izstream &zs, T *x, Items items)
Definition: bloaty/third_party/zlib/contrib/iostream2/zstream.h:115
destroy_pollset
static void destroy_pollset(void *p, grpc_error_handle)
Definition: google_default_credentials.cc:199
bytes
uint8 bytes[10]
Definition: bloaty/third_party/protobuf/src/google/protobuf/io/coded_stream_unittest.cc:153
absl::flags_internal
Definition: abseil-cpp/absl/flags/commandlineflag.h:40
server
Definition: examples/python/async_streaming/server.py:1
gpr_mu
pthread_mutex_t gpr_mu
Definition: impl/codegen/sync_posix.h:47
port.h
exec_ctx
grpc_core::ExecCtx exec_ctx
Definition: end2end_binder_transport_test.cc:75
http2_test_server.listen
def listen(endpoint, test_case)
Definition: http2_test_server.py:87
GPR_GLOBAL_CONFIG_DECLARE_BOOL
#define GPR_GLOBAL_CONFIG_DECLARE_BOOL(name)
Definition: global_config_generic.h:28
grpc_pollset_shutdown
void grpc_pollset_shutdown(grpc_pollset *pollset, grpc_closure *closure)
Definition: pollset.cc:37
grpc_resource_quota_unref
GRPCAPI void grpc_resource_quota_unref(grpc_resource_quota *resource_quota)
Definition: api.cc:79
ret
UniquePtr< SSL_SESSION > ret
Definition: ssl_x509.cc:1029
grpc_endpoint_test_fixture
Definition: endpoint_tests.h:29
alloc.h
grpc::testing::TestEnvironment
Definition: test/core/util/test_config.h:54
slices
SliceBuffer * slices
Definition: retry_filter.cc:631
arg
struct arg arg
state
Definition: bloaty/third_party/zlib/contrib/blast/blast.c:41
endpoint_tests.h
grpc_slice_buffer_destroy_internal
void grpc_slice_buffer_destroy_internal(grpc_slice_buffer *sb)
Definition: slice/slice_buffer.cc:123
grpc_core::Timestamps::scheduled_time
BufferTimestamp scheduled_time
Definition: buffer_list.h:92
grpc_endpoint_write
void grpc_endpoint_write(grpc_endpoint *ep, grpc_slice_buffer *slices, grpc_closure *cb, void *arg, int max_frame_size)
Definition: endpoint.cc:30
GPR_DEBUG
#define GPR_DEBUG
Definition: include/grpc/impl/codegen/log.h:55
test_server.socket
socket
Definition: test_server.py:65
flags
uint32_t flags
Definition: retry_filter.cc:632
grpc_slice_buffer
Definition: include/grpc/impl/codegen/slice.h:83
GRPC_ARG_POINTER
@ GRPC_ARG_POINTER
Definition: grpc_types.h:82
len
int len
Definition: abseil-cpp/absl/base/internal/low_level_alloc_test.cc:46
grpc_init
GRPCAPI void grpc_init(void)
Definition: init.cc:146
grpc_error
Definition: error_internal.h:42
sockaddr_posix.h
grpc_core::Timestamp::FromTimespecRoundUp
static Timestamp FromTimespecRoundUp(gpr_timespec t)
Definition: src/core/lib/gprpp/time.cc:136
GRPC_ARG_TCP_READ_CHUNK_SIZE
#define GRPC_ARG_TCP_READ_CHUNK_SIZE
Definition: grpc_types.h:330
GPR_CLOCK_REALTIME
@ GPR_CLOCK_REALTIME
Definition: gpr_types.h:39
grpc_endpoint_add_to_pollset
void grpc_endpoint_add_to_pollset(grpc_endpoint *ep, grpc_pollset *pollset)
Definition: endpoint.cc:35
read_size
static int read_size
Definition: test-tcp-close-reset.c:48
grpc_pollset
Definition: bm_cq_multiple_threads.cc:37
grpc_pollset_destroy
void grpc_pollset_destroy(grpc_pollset *pollset)
Definition: pollset.cc:41
grpc_closure
Definition: closure.h:56
grpc_endpoint
Definition: endpoint.h:105
grpc_shutdown
GRPCAPI void grpc_shutdown(void)
Definition: init.cc:209
addr
struct sockaddr_in addr
Definition: libuv/docs/code/tcp-echo-server/main.c:10
errno.h
grpc_core::ExecCtx::Get
static ExecCtx * Get()
Definition: exec_ctx.h:205
i
uint64_t i
Definition: abseil-cpp/absl/container/btree_benchmark.cc:230
GRPC_ERROR_IS_NONE
#define GRPC_ERROR_IS_NONE(err)
Definition: error.h:241


grpc
Author(s):
autogenerated on Thu Mar 13 2025 03:01:29