endpoint_tests.cc
Go to the documentation of this file.
1 /*
2  *
3  * Copyright 2015 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
20 
21 #include <limits.h>
22 #include <stdbool.h>
23 #include <sys/types.h>
24 
25 #include <grpc/slice.h>
26 #include <grpc/support/alloc.h>
27 #include <grpc/support/log.h>
28 #include <grpc/support/time.h>
29 
34 
35 /*
36  General test notes:
37 
38  All tests which write data into an endpoint write i%256 into byte i, which
39  is verified by readers.
40 
41  In general there are a few interesting things to vary which may lead to
42  exercising different codepaths in an implementation:
43  1. Total amount of data written to the endpoint
44  2. Size of slice allocations
45  3. Amount of data we read from or write to the endpoint at once
46 
47  The tests here tend to parameterize these where applicable.
48 
49 */
50 
51 static gpr_mu* g_mu;
53 
54 size_t count_slices(grpc_slice* slices, size_t nslices, int* current_data) {
55  size_t num_bytes = 0;
56  size_t i;
57  size_t j;
58  unsigned char* buf;
59  for (i = 0; i < nslices; ++i) {
61  for (j = 0; j < GRPC_SLICE_LENGTH(slices[i]); ++j) {
62  GPR_ASSERT(buf[j] == *current_data);
63  *current_data = (*current_data + 1) % 256;
64  }
65  num_bytes += GRPC_SLICE_LENGTH(slices[i]);
66  }
67  return num_bytes;
68 }
69 
71  const char* test_name,
72  size_t slice_size) {
73  gpr_log(GPR_INFO, "%s/%s", test_name, config.name);
74  return config.create_fixture(slice_size);
75 }
76 
77 static void end_test(grpc_endpoint_test_config config) { config.clean_up(); }
78 
79 static grpc_slice* allocate_blocks(size_t num_bytes, size_t slice_size,
80  size_t* num_blocks, uint8_t* current_data) {
81  size_t nslices = num_bytes / slice_size + (num_bytes % slice_size ? 1 : 0);
83  static_cast<grpc_slice*>(gpr_malloc(sizeof(grpc_slice) * nslices));
84  size_t num_bytes_left = num_bytes;
85  size_t i;
86  size_t j;
87  unsigned char* buf;
88  *num_blocks = nslices;
89 
90  for (i = 0; i < nslices; ++i) {
91  slices[i] = grpc_slice_malloc(slice_size > num_bytes_left ? num_bytes_left
92  : slice_size);
93  num_bytes_left -= GRPC_SLICE_LENGTH(slices[i]);
95  for (j = 0; j < GRPC_SLICE_LENGTH(slices[i]); ++j) {
96  buf[j] = *current_data;
97  (*current_data)++;
98  }
99  }
100  GPR_ASSERT(num_bytes_left == 0);
101  return slices;
102 }
103 
107  size_t target_bytes;
108  size_t bytes_read;
122 };
123 
124 static void read_scheduler(void* data, grpc_error_handle /* error */) {
126  static_cast<struct read_and_write_test_state*>(data);
127  grpc_endpoint_read(state->read_ep, &state->incoming, &state->done_read,
128  /*urgent=*/false, /*min_progress_size=*/1);
129 }
130 
134  static_cast<struct read_and_write_test_state*>(data);
135 
136  state->bytes_read += count_slices(
137  state->incoming.slices, state->incoming.count, &state->current_read_data);
138  if (state->bytes_read == state->target_bytes || !GRPC_ERROR_IS_NONE(error)) {
139  gpr_log(GPR_INFO, "Read handler done");
140  gpr_mu_lock(g_mu);
141  state->read_done = 1 + (GRPC_ERROR_IS_NONE(error));
142  GRPC_LOG_IF_ERROR("pollset_kick", grpc_pollset_kick(g_pollset, nullptr));
144  } else if (GRPC_ERROR_IS_NONE(error)) {
145  /* We perform many reads one after another. If grpc_endpoint_read and the
146  * read_handler are both run inline, we might end up growing the stack
147  * beyond the limit. Schedule the read on ExecCtx to avoid this. */
148  grpc_core::ExecCtx::Run(DEBUG_LOCATION, &state->read_scheduler,
150  }
151 }
152 
153 static void write_scheduler(void* data, grpc_error_handle /* error */) {
155  static_cast<struct read_and_write_test_state*>(data);
156  grpc_endpoint_write(state->write_ep, &state->outgoing, &state->done_write,
157  nullptr, /*max_frame_size=*/state->max_write_frame_size);
158 }
159 
163  static_cast<struct read_and_write_test_state*>(data);
164  grpc_slice* slices = nullptr;
165  size_t nslices;
166 
167  if (GRPC_ERROR_IS_NONE(error)) {
168  state->bytes_written += state->current_write_size;
169  if (state->target_bytes - state->bytes_written <
170  state->current_write_size) {
171  state->current_write_size = state->target_bytes - state->bytes_written;
172  }
173  if (state->current_write_size != 0) {
174  slices = allocate_blocks(state->current_write_size, 8192, &nslices,
175  &state->current_write_data);
177  grpc_slice_buffer_addn(&state->outgoing, slices, nslices);
178  /* We perform many writes one after another. If grpc_endpoint_write and
179  * the write_handler are both run inline, we might end up growing the
180  * stack beyond the limit. Schedule the write on ExecCtx to avoid this. */
181  grpc_core::ExecCtx::Run(DEBUG_LOCATION, &state->write_scheduler,
183  gpr_free(slices);
184  return;
185  }
186  }
187 
188  gpr_log(GPR_INFO, "Write handler done");
189  gpr_mu_lock(g_mu);
190  state->write_done = 1 + (GRPC_ERROR_IS_NONE(error));
191  GRPC_LOG_IF_ERROR("pollset_kick", grpc_pollset_kick(g_pollset, nullptr));
193 }
194 
195 /* Do both reading and writing using the grpc_endpoint API.
196 
197  This also includes a test of the shutdown behavior.
198  */
200  size_t num_bytes, size_t write_size,
201  size_t slice_size, int max_write_frame_size,
202  bool shutdown) {
205  begin_test(config, "read_and_write_test", slice_size);
210  "num_bytes=%" PRIuPTR " write_size=%" PRIuPTR " slice_size=%" PRIuPTR
211  " shutdown=%d",
212  num_bytes, write_size, slice_size, shutdown);
213 
214  if (shutdown) {
215  gpr_log(GPR_INFO, "Start read and write shutdown test");
216  } else {
218  "Start read and write test with %" PRIuPTR
219  " bytes, slice size %" PRIuPTR,
220  num_bytes, slice_size);
221  }
222 
223  state.read_ep = f.client_ep;
224  state.write_ep = f.server_ep;
225  state.target_bytes = num_bytes;
226  state.bytes_read = 0;
227  state.current_write_size = write_size;
228  state.max_write_frame_size = max_write_frame_size;
229  state.bytes_written = 0;
230  state.read_done = 0;
231  state.write_done = 0;
232  state.current_read_data = 0;
233  state.current_write_data = 0;
234  GRPC_CLOSURE_INIT(&state.read_scheduler, read_scheduler, &state,
235  grpc_schedule_on_exec_ctx);
237  grpc_schedule_on_exec_ctx);
238  GRPC_CLOSURE_INIT(&state.write_scheduler, write_scheduler, &state,
239  grpc_schedule_on_exec_ctx);
241  &state, grpc_schedule_on_exec_ctx);
242  grpc_slice_buffer_init(&state.outgoing);
243  grpc_slice_buffer_init(&state.incoming);
244 
245  /* Get started by pretending an initial write completed */
246  /* NOTE: Sets up initial conditions so we can have the same write handler
247  for the first iteration as for later iterations. It does the right thing
248  even when bytes_written is unsigned. */
249  state.bytes_written -= state.current_write_size;
252 
253  grpc_endpoint_read(state.read_ep, &state.incoming, &state.done_read,
254  /*urgent=*/false, /*min_progress_size=*/1);
255  if (shutdown) {
256  gpr_log(GPR_DEBUG, "shutdown read");
258  state.read_ep, GRPC_ERROR_CREATE_FROM_STATIC_STRING("Test Shutdown"));
259  gpr_log(GPR_DEBUG, "shutdown write");
261  state.write_ep, GRPC_ERROR_CREATE_FROM_STATIC_STRING("Test Shutdown"));
262  }
264 
265  gpr_mu_lock(g_mu);
266  while (!state.read_done || !state.write_done) {
267  grpc_pollset_worker* worker = nullptr;
268  GPR_ASSERT(grpc_core::ExecCtx::Get()->Now() < deadline);
270  "pollset_work", grpc_pollset_work(g_pollset, &worker, deadline)));
271  }
274 
275  end_test(config);
278  grpc_endpoint_destroy(state.read_ep);
279  grpc_endpoint_destroy(state.write_ep);
280 }
281 
283  gpr_mu_lock(g_mu);
284  *static_cast<int*>(arg) += (!GRPC_ERROR_IS_NONE(error));
287 }
288 
289 static void wait_for_fail_count(int* fail_count, int want_fail_count) {
291  gpr_mu_lock(g_mu);
294  while (grpc_core::ExecCtx::Get()->Now() < deadline &&
295  *fail_count < want_fail_count) {
296  grpc_pollset_worker* worker = nullptr;
298  "pollset_work", grpc_pollset_work(g_pollset, &worker, deadline)));
301  gpr_mu_lock(g_mu);
302  }
303  GPR_ASSERT(*fail_count == want_fail_count);
305 }
306 
309  begin_test(config, "multiple_shutdown_test", 128);
310  int fail_count = 0;
311  grpc_slice_buffer slice_buffer;
312  grpc_slice_buffer_init(&slice_buffer);
313 
316  grpc_endpoint_read(f.client_ep, &slice_buffer,
317  GRPC_CLOSURE_CREATE(inc_on_failure, &fail_count,
318  grpc_schedule_on_exec_ctx),
319  /*urgent=*/false, /*min_progress_size=*/1);
320  wait_for_fail_count(&fail_count, 0);
321  grpc_endpoint_shutdown(f.client_ep,
322  GRPC_ERROR_CREATE_FROM_STATIC_STRING("Test Shutdown"));
323  wait_for_fail_count(&fail_count, 1);
324  grpc_endpoint_read(f.client_ep, &slice_buffer,
325  GRPC_CLOSURE_CREATE(inc_on_failure, &fail_count,
326  grpc_schedule_on_exec_ctx),
327  /*urgent=*/false, /*min_progress_size=*/1);
328  wait_for_fail_count(&fail_count, 2);
330  grpc_endpoint_write(f.client_ep, &slice_buffer,
331  GRPC_CLOSURE_CREATE(inc_on_failure, &fail_count,
332  grpc_schedule_on_exec_ctx),
333  nullptr, /*max_frame_size=*/INT_MAX);
334  wait_for_fail_count(&fail_count, 3);
335  grpc_endpoint_shutdown(f.client_ep,
336  GRPC_ERROR_CREATE_FROM_STATIC_STRING("Test Shutdown"));
337  wait_for_fail_count(&fail_count, 3);
338 
339  grpc_slice_buffer_destroy_internal(&slice_buffer);
340 
341  grpc_endpoint_destroy(f.client_ep);
342  grpc_endpoint_destroy(f.server_ep);
343 }
344 
346  grpc_pollset* pollset, gpr_mu* mu) {
347  size_t i;
348  g_pollset = pollset;
349  g_mu = mu;
351  for (int i = 1; i <= 8192; i = i * 2) {
352  read_and_write_test(config, 10000000, 100000, 8192, i, false);
353  read_and_write_test(config, 1000000, 100000, 1, i, false);
354  read_and_write_test(config, 100000000, 100000, 1, i, true);
355  }
356  for (i = 1; i < 1000; i = std::max(i + 1, i * 5 / 4)) {
357  read_and_write_test(config, 40320, i, i, i, false);
358  }
359  g_pollset = nullptr;
360  g_mu = nullptr;
361 }
grpc_pollset_worker
struct grpc_pollset_worker grpc_pollset_worker
Definition: pollset.h:39
GRPC_CLOSURE_INIT
#define GRPC_CLOSURE_INIT(closure, cb, cb_arg, scheduler)
Definition: closure.h:115
GPR_INFO
#define GPR_INFO
Definition: include/grpc/impl/codegen/log.h:56
read_and_write_test_state::max_write_frame_size
int max_write_frame_size
Definition: endpoint_tests.cc:115
gpr_mu_unlock
GPRAPI void gpr_mu_unlock(gpr_mu *mu)
read_and_write_test_read_handler
static void read_and_write_test_read_handler(void *data, grpc_error_handle error)
Definition: endpoint_tests.cc:131
GRPC_ERROR_NONE
#define GRPC_ERROR_NONE
Definition: error.h:234
grpc_timeout_seconds_to_deadline
gpr_timespec grpc_timeout_seconds_to_deadline(int64_t time_s)
Definition: test/core/util/test_config.cc:81
log.h
read_and_write_test_state
Definition: endpoint_tests.cc:104
grpc_slice_buffer_addn
GPRAPI void grpc_slice_buffer_addn(grpc_slice_buffer *sb, grpc_slice *slices, size_t n)
Definition: slice/slice_buffer.cc:224
grpc_endpoint_test_config
Definition: endpoint_tests.h:34
read_and_write_test_state::current_read_data
int current_read_data
Definition: endpoint_tests.cc:111
grpc_slice_from_copied_string
GPRAPI grpc_slice grpc_slice_from_copied_string(const char *source)
Definition: slice/slice.cc:177
slice.h
grpc_endpoint_tests
void grpc_endpoint_tests(grpc_endpoint_test_config config, grpc_pollset *pollset, gpr_mu *mu)
Definition: endpoint_tests.cc:345
read_and_write_test_state::bytes_written
size_t bytes_written
Definition: endpoint_tests.cc:110
inc_on_failure
static void inc_on_failure(void *arg, grpc_error_handle error)
Definition: endpoint_tests.cc:282
end_test
static void end_test(grpc_endpoint_test_config config)
Definition: endpoint_tests.cc:77
buf
voidpf void * buf
Definition: bloaty/third_party/zlib/contrib/minizip/ioapi.h:136
grpc_core::Timestamp
Definition: src/core/lib/gprpp/time.h:62
gpr_free
GPRAPI void gpr_free(void *ptr)
Definition: alloc.cc:51
grpc_endpoint_read
void grpc_endpoint_read(grpc_endpoint *ep, grpc_slice_buffer *slices, grpc_closure *cb, bool urgent, int min_progress_size)
Definition: endpoint.cc:25
useful.h
error
grpc_error_handle error
Definition: retry_filter.cc:499
gpr_malloc
GPRAPI void * gpr_malloc(size_t size)
Definition: alloc.cc:29
read_and_write_test_state::incoming
grpc_slice_buffer incoming
Definition: endpoint_tests.cc:116
allocate_blocks
static grpc_slice * allocate_blocks(size_t num_bytes, size_t slice_size, size_t *num_blocks, uint8_t *current_data)
Definition: endpoint_tests.cc:79
grpc_pollset_work
grpc_error_handle grpc_pollset_work(grpc_pollset *pollset, grpc_pollset_worker **worker, grpc_core::Timestamp deadline)
Definition: pollset.cc:45
GRPC_LOG_IF_ERROR
#define GRPC_LOG_IF_ERROR(what, error)
Definition: error.h:398
time.h
read_and_write_test_write_handler
static void read_and_write_test_write_handler(void *data, grpc_error_handle error)
Definition: endpoint_tests.cc:160
GRPC_CLOSURE_CREATE
#define GRPC_CLOSURE_CREATE(cb, cb_arg, scheduler)
Definition: closure.h:160
read_and_write_test_state::write_scheduler
grpc_closure write_scheduler
Definition: endpoint_tests.cc:121
uint8_t
unsigned char uint8_t
Definition: stdint-msvc2008.h:78
grpc_slice_malloc
GPRAPI grpc_slice grpc_slice_malloc(size_t length)
Definition: slice/slice.cc:227
gen_build_yaml.struct
def struct(**kwargs)
Definition: test/core/end2end/gen_build_yaml.py:30
g_mu
static gpr_mu * g_mu
Definition: endpoint_tests.cc:51
DEBUG_LOCATION
#define DEBUG_LOCATION
Definition: debug_location.h:41
read_and_write_test_state::read_scheduler
grpc_closure read_scheduler
Definition: endpoint_tests.cc:120
autogen_x86imm.f
f
Definition: autogen_x86imm.py:9
count_slices
size_t count_slices(grpc_slice *slices, size_t nslices, int *current_data)
Definition: endpoint_tests.cc:54
GPR_ASSERT
#define GPR_ASSERT(x)
Definition: include/grpc/impl/codegen/log.h:94
max
int max
Definition: bloaty/third_party/zlib/examples/enough.c:170
grpc_core::ExecCtx::Flush
bool Flush()
Definition: exec_ctx.cc:69
mu
Mutex mu
Definition: server_config_selector_filter.cc:74
gpr_log
GPRAPI void gpr_log(const char *file, int line, gpr_log_severity severity, const char *format,...) GPR_PRINT_FORMAT_CHECK(4
read_and_write_test_state::bytes_read
size_t bytes_read
Definition: endpoint_tests.cc:108
worker
Definition: worker.py:1
grpc_slice_buffer_reset_and_unref
GPRAPI void grpc_slice_buffer_reset_and_unref(grpc_slice_buffer *sb)
Definition: slice_buffer_api.cc:32
GRPC_SLICE_START_PTR
#define GRPC_SLICE_START_PTR(slice)
Definition: include/grpc/impl/codegen/slice.h:101
read_and_write_test_state::outgoing
grpc_slice_buffer outgoing
Definition: endpoint_tests.cc:117
arg
Definition: cmdline.cc:40
read_and_write_test_state::write_done
int write_done
Definition: endpoint_tests.cc:114
time.h
grpc_slice
Definition: include/grpc/impl/codegen/slice.h:65
read_and_write_test_state::read_done
int read_done
Definition: endpoint_tests.cc:113
gpr_mu_lock
GPRAPI void gpr_mu_lock(gpr_mu *mu)
grpc_endpoint_shutdown
void grpc_endpoint_shutdown(grpc_endpoint *ep, grpc_error_handle why)
Definition: endpoint.cc:49
data
char data[kBufferLength]
Definition: abseil-cpp/absl/strings/internal/str_format/float_conversion.cc:1006
read_and_write_test_state::target_bytes
size_t target_bytes
Definition: endpoint_tests.cc:107
slice_internal.h
grpc_endpoint_destroy
void grpc_endpoint_destroy(grpc_endpoint *ep)
Definition: endpoint.cc:53
grpc_pollset_kick
grpc_error_handle grpc_pollset_kick(grpc_pollset *pollset, grpc_pollset_worker *specific_worker)
Definition: pollset.cc:51
grpc_core::ExecCtx
Definition: exec_ctx.h:97
grpc_slice_buffer_init
GPRAPI void grpc_slice_buffer_init(grpc_slice_buffer *sb)
Definition: slice/slice_buffer.cc:116
GRPC_ERROR_CREATE_FROM_STATIC_STRING
#define GRPC_ERROR_CREATE_FROM_STATIC_STRING(desc)
Definition: error.h:291
read_and_write_test_state::read_ep
grpc_endpoint * read_ep
Definition: endpoint_tests.cc:105
GRPC_SLICE_LENGTH
#define GRPC_SLICE_LENGTH(slice)
Definition: include/grpc/impl/codegen/slice.h:104
wait_for_fail_count
static void wait_for_fail_count(int *fail_count, int want_fail_count)
Definition: endpoint_tests.cc:289
test_config.h
absl::Now
ABSL_NAMESPACE_BEGIN Time Now()
Definition: abseil-cpp/absl/time/clock.cc:39
grpc_slice_buffer_add
GPRAPI void grpc_slice_buffer_add(grpc_slice_buffer *sb, grpc_slice slice)
Definition: slice/slice_buffer.cc:170
g_pollset
static grpc_pollset * g_pollset
Definition: endpoint_tests.cc:52
write_scheduler
static void write_scheduler(void *data, grpc_error_handle)
Definition: endpoint_tests.cc:153
gpr_mu
pthread_mutex_t gpr_mu
Definition: impl/codegen/sync_posix.h:47
exec_ctx
grpc_core::ExecCtx exec_ctx
Definition: end2end_binder_transport_test.cc:75
read_and_write_test_state::done_write
grpc_closure done_write
Definition: endpoint_tests.cc:119
grpc_endpoint_test_fixture
Definition: endpoint_tests.h:29
alloc.h
read_and_write_test
static void read_and_write_test(grpc_endpoint_test_config config, size_t num_bytes, size_t write_size, size_t slice_size, int max_write_frame_size, bool shutdown)
Definition: endpoint_tests.cc:199
read_and_write_test_state::write_ep
grpc_endpoint * write_ep
Definition: endpoint_tests.cc:106
slices
SliceBuffer * slices
Definition: retry_filter.cc:631
arg
struct arg arg
state
Definition: bloaty/third_party/zlib/contrib/blast/blast.c:41
endpoint_tests.h
grpc_core::ExecCtx::Run
static void Run(const DebugLocation &location, grpc_closure *closure, grpc_error_handle error)
Definition: exec_ctx.cc:98
config_s
Definition: bloaty/third_party/zlib/deflate.c:120
grpc_slice_buffer_destroy_internal
void grpc_slice_buffer_destroy_internal(grpc_slice_buffer *sb)
Definition: slice/slice_buffer.cc:123
grpc_endpoint_write
void grpc_endpoint_write(grpc_endpoint *ep, grpc_slice_buffer *slices, grpc_closure *cb, void *arg, int max_frame_size)
Definition: endpoint.cc:30
multiple_shutdown_test
static void multiple_shutdown_test(grpc_endpoint_test_config config)
Definition: endpoint_tests.cc:307
GPR_DEBUG
#define GPR_DEBUG
Definition: include/grpc/impl/codegen/log.h:55
grpc_slice_buffer
Definition: include/grpc/impl/codegen/slice.h:83
begin_test
static grpc_endpoint_test_fixture begin_test(grpc_endpoint_test_config config, const char *test_name, size_t slice_size)
Definition: endpoint_tests.cc:70
grpc_error
Definition: error_internal.h:42
grpc_core::Timestamp::FromTimespecRoundUp
static Timestamp FromTimespecRoundUp(gpr_timespec t)
Definition: src/core/lib/gprpp/time.cc:136
grpc_endpoint_add_to_pollset
void grpc_endpoint_add_to_pollset(grpc_endpoint *ep, grpc_pollset *pollset)
Definition: endpoint.cc:35
grpc_pollset
Definition: bm_cq_multiple_threads.cc:37
grpc_closure
Definition: closure.h:56
grpc_endpoint
Definition: endpoint.h:105
read_and_write_test_state::current_write_data
uint8_t current_write_data
Definition: endpoint_tests.cc:112
grpc_core::ExecCtx::Get
static ExecCtx * Get()
Definition: exec_ctx.h:205
i
uint64_t i
Definition: abseil-cpp/absl/container/btree_benchmark.cc:230
read_and_write_test_state::done_read
grpc_closure done_read
Definition: endpoint_tests.cc:118
GRPC_ERROR_IS_NONE
#define GRPC_ERROR_IS_NONE(err)
Definition: error.h:241
read_and_write_test_state::current_write_size
size_t current_write_size
Definition: endpoint_tests.cc:109
read_scheduler
static void read_scheduler(void *data, grpc_error_handle)
Definition: endpoint_tests.cc:124


grpc
Author(s):
autogenerated on Fri May 16 2025 02:58:19