passthru_endpoint.cc
Go to the documentation of this file.
1 /*
2  *
3  * Copyright 2016 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
20 
21 #include <inttypes.h>
22 #include <string.h>
23 
24 #include <string>
25 
26 #include "absl/strings/str_format.h"
27 
28 #include <grpc/support/alloc.h>
30 
35 
37 
38 typedef struct {
39  bool is_armed;
43 } pending_op;
44 
45 typedef struct {
49  std::vector<grpc_passthru_endpoint_channel_action> actions;
52 
53 typedef struct {
64 } half;
65 
68  int halves;
72  bool shutdown;
75 };
76 
78  GPR_ASSERT(m->pending_read_op.is_armed);
79  GPR_ASSERT(m->bytes_read_so_far <=
80  m->parent->channel_effects->allowed_read_bytes);
81  if (m->parent->shutdown) {
83  DEBUG_LOCATION, m->pending_read_op.cb,
84  GRPC_ERROR_CREATE_FROM_STATIC_STRING("Already shutdown"));
85  // Move any pending data into pending_read_op.slices so that it may be
86  // free'ed by the executing callback.
87  grpc_slice_buffer_move_into(&m->read_buffer, m->pending_read_op.slices);
88  m->pending_read_op.is_armed = false;
89  return;
90  }
91 
92  if (m->bytes_read_so_far == m->parent->channel_effects->allowed_read_bytes) {
93  // Keep it in pending state.
94  return;
95  }
96  // This delayed processing should only be invoked when read_buffer has
97  // something in it.
98  GPR_ASSERT(m->read_buffer.count > 0);
99  uint64_t readable_length = std::min<uint64_t>(
100  m->read_buffer.length,
101  m->parent->channel_effects->allowed_read_bytes - m->bytes_read_so_far);
102  GPR_ASSERT(readable_length > 0);
103  grpc_slice_buffer_move_first(&m->read_buffer, readable_length,
104  m->pending_read_op.slices);
105  grpc_core::ExecCtx::Run(DEBUG_LOCATION, m->pending_read_op.cb, error);
106  if (m->parent->simulate_channel_actions) {
107  m->bytes_read_so_far += readable_length;
108  }
109  m->pending_read_op.is_armed = false;
110 }
111 
113  grpc_closure* cb, bool /*urgent*/,
114  int /*min_progress_size*/) {
115  half* m = reinterpret_cast<half*>(ep);
116  gpr_mu_lock(&m->parent->mu);
117  if (m->parent->shutdown) {
120  GRPC_ERROR_CREATE_FROM_STATIC_STRING("Already shutdown"));
121  } else if (m->read_buffer.count > 0) {
122  GPR_ASSERT(!m->pending_read_op.is_armed);
123  GPR_ASSERT(!m->on_read);
124  m->pending_read_op.is_armed = true;
125  m->pending_read_op.cb = cb;
126  m->pending_read_op.ep = ep;
127  m->pending_read_op.slices = slices;
129  } else {
130  GPR_ASSERT(!m->pending_read_op.is_armed);
131  m->on_read = cb;
132  m->on_read_out = slices;
133  }
134  gpr_mu_unlock(&m->parent->mu);
135 }
136 
137 // Copy src slice and split the copy at n bytes into two separate slices
139  grpc_slice& split2) {
140  GPR_ASSERT(n <= GRPC_SLICE_LENGTH(src));
141  if (n == GRPC_SLICE_LENGTH(src)) {
142  split1 = grpc_slice_copy(src);
143  split2 = grpc_empty_slice();
144  return;
145  }
146  split1 = GRPC_SLICE_MALLOC(n);
148  split2 = GRPC_SLICE_MALLOC(GRPC_SLICE_LENGTH(src) - n);
150  GRPC_SLICE_LENGTH(src) - n);
151 }
152 
153 static half* other_half(half* h) {
154  if (h == &h->parent->client) return &h->parent->server;
155  return &h->parent->client;
156 }
157 
159  GPR_ASSERT(m->pending_write_op.is_armed);
160  GPR_ASSERT(m->bytes_written_so_far <=
161  m->parent->channel_effects->allowed_write_bytes);
162  if (m->parent->shutdown) {
164  DEBUG_LOCATION, m->pending_write_op.cb,
165  GRPC_ERROR_CREATE_FROM_STATIC_STRING("Already shutdown"));
166  m->pending_write_op.is_armed = false;
167  grpc_slice_buffer_reset_and_unref(m->pending_write_op.slices);
168  return;
169  }
170  if (m->bytes_written_so_far ==
171  m->parent->channel_effects->allowed_write_bytes) {
172  // Keep it in pending state.
173  return;
174  }
175 
176  half* other = other_half(m);
177  uint64_t max_writable =
178  std::min<uint64_t>(m->pending_write_op.slices->length,
179  m->parent->channel_effects->allowed_write_bytes -
180  m->bytes_written_so_far);
181  uint64_t max_readable = other->parent->channel_effects->allowed_read_bytes -
182  other->bytes_read_so_far;
183  uint64_t immediate_bytes_read =
184  other->on_read != nullptr ? std::min<uint64_t>(max_readable, max_writable)
185  : 0;
186 
187  GPR_ASSERT(max_writable > 0);
188  GPR_ASSERT(max_readable >= 0);
189  // At the end of this process, we should have written max_writable bytes;
190  if (m->parent->simulate_channel_actions) {
191  m->bytes_written_so_far += max_writable;
192  }
193  // Estimate if the original write would still be pending at the end of this
194  // process
195  bool would_write_be_pending =
196  max_writable < m->pending_write_op.slices->length;
197  if (!m->parent->simulate_channel_actions) {
198  GPR_ASSERT(!would_write_be_pending);
199  }
200  grpc_slice_buffer* slices = m->pending_write_op.slices;
202  other->on_read != nullptr ? other->on_read_out : &other->read_buffer;
203  while (max_writable > 0) {
205  uint64_t slice_length = GPR_SLICE_LENGTH(slice);
206  GPR_ASSERT(slice_length > 0);
207  grpc_slice split1, split2;
208  uint64_t split_length = 0;
209  if (slice_length <= max_readable) {
210  split_length = std::min<uint64_t>(slice_length, max_writable);
211  } else if (max_readable > 0) {
212  // slice_length > max_readable
213  split_length = std::min<uint64_t>(max_readable, max_writable);
214  } else {
215  // slice_length still > max_readable but max_readable is 0.
216  // In this case put the bytes into other->read_buffer. During a future
217  // read if max_readable still remains zero at the time of read, the
218  // pending read logic will kick in.
219  dest = &other->read_buffer;
220  split_length = std::min<uint64_t>(slice_length, max_writable);
221  }
222 
223  grpc_slice_copy_split(slice, split_length, split1, split2);
225  // Write a copy of the slice to the destination to be read
227  // Re-insert split2 into source for next iteration.
228  if (GPR_SLICE_LENGTH(split2) > 0) {
230  } else {
232  }
233 
234  if (max_readable > 0) {
235  GPR_ASSERT(max_readable >= static_cast<uint64_t>(split_length));
236  max_readable -= split_length;
237  }
238 
239  GPR_ASSERT(max_writable >= static_cast<uint64_t>(split_length));
240  max_writable -= split_length;
241  }
242 
243  if (immediate_bytes_read > 0) {
245  if (m->parent->simulate_channel_actions) {
246  other->bytes_read_so_far += immediate_bytes_read;
247  }
249  other->on_read = nullptr;
250  }
251 
252  if (!would_write_be_pending) {
253  // No slices should be left
254  GPR_ASSERT(m->pending_write_op.slices->count == 0);
255  grpc_slice_buffer_reset_and_unref(m->pending_write_op.slices);
256  m->pending_write_op.is_armed = false;
257  grpc_core::ExecCtx::Run(DEBUG_LOCATION, m->pending_write_op.cb, error);
258  }
259 }
260 
262  grpc_closure* cb, void* /*arg*/, int /*max_frame_size*/) {
263  half* m = reinterpret_cast<half*>(ep);
264  gpr_mu_lock(&m->parent->mu);
265  gpr_atm_no_barrier_fetch_add(&m->parent->stats->num_writes, (gpr_atm)1);
266  if (m->parent->shutdown) {
269  GRPC_ERROR_CREATE_FROM_STATIC_STRING("Endpoint already shutdown"));
270  } else {
271  GPR_ASSERT(!m->pending_write_op.is_armed);
272  // Copy slices into m->pending_write_op.slices
273  m->pending_write_op.slices = &m->write_buffer;
274  GPR_ASSERT(m->pending_write_op.slices->count == 0);
275  for (int i = 0; i < static_cast<int>(slices->count); i++) {
276  if (GPR_SLICE_LENGTH(slices->slices[i]) > 0) {
277  grpc_slice_buffer_add_indexed(m->pending_write_op.slices,
278  grpc_slice_copy(slices->slices[i]));
279  }
280  }
281  if (m->pending_write_op.slices->count > 0) {
282  m->pending_write_op.is_armed = true;
283  m->pending_write_op.cb = cb;
284  m->pending_write_op.ep = ep;
286  } else {
287  // There is nothing to write. Schedule callback to be run right away.
289  }
290  }
291  gpr_mu_unlock(&m->parent->mu);
292 }
293 
295  if (m->pending_read_op.is_armed) {
297  }
298  if (m->pending_write_op.is_armed) {
300  }
301 }
302 
303 static void me_add_to_pollset(grpc_endpoint* /*ep*/,
304  grpc_pollset* /*pollset*/) {}
305 
307  grpc_pollset_set* /*pollset*/) {}
308 
310  grpc_pollset_set* /*pollset*/) {}
311 
313  m->parent->shutdown = true;
315  if (m->on_read) {
317  DEBUG_LOCATION, m->on_read,
319  m->on_read = nullptr;
320  }
321  m = other_half(m);
323  if (m->on_read) {
325  DEBUG_LOCATION, m->on_read,
327  m->on_read = nullptr;
328  }
329 }
330 
332  half* m = reinterpret_cast<half*>(ep);
333  gpr_mu_lock(&m->parent->mu);
334  shutdown_locked(m, why);
335  gpr_mu_unlock(&m->parent->mu);
336  GRPC_ERROR_UNREF(why);
337 }
338 
340  gpr_mu_destroy(&p->mu);
342  delete p->channel_effects;
343  grpc_slice_buffer_destroy_internal(&p->client.read_buffer);
344  grpc_slice_buffer_destroy_internal(&p->server.read_buffer);
345  grpc_slice_buffer_destroy_internal(&p->client.write_buffer);
346  grpc_slice_buffer_destroy_internal(&p->server.write_buffer);
347  gpr_free(p);
348 }
349 
350 static void me_destroy(grpc_endpoint* ep) {
351  passthru_endpoint* p = (reinterpret_cast<half*>(ep))->parent;
352  gpr_mu_lock(&p->mu);
353  if (0 == --p->halves && p->channel_effects->actions.empty()) {
354  // no pending channel actions exist
355  gpr_mu_unlock(&p->mu);
357  } else {
358  if (p->halves == 0 && p->simulate_channel_actions) {
359  grpc_timer_cancel(&p->channel_effects->timer);
360  }
361  gpr_mu_unlock(&p->mu);
362  }
363 }
364 
366  passthru_endpoint* p = (reinterpret_cast<half*>(ep))->parent;
367  return (reinterpret_cast<half*>(ep)) == &p->client
368  ? "fake:mock_client_endpoint"
369  : "fake:mock_server_endpoint";
370 }
371 
373  passthru_endpoint* p = (reinterpret_cast<half*>(ep))->parent;
374  return (reinterpret_cast<half*>(ep)) == &p->client
375  ? "fake:mock_client_endpoint"
376  : "fake:mock_server_endpoint";
377 }
378 
379 static int me_get_fd(grpc_endpoint* /*ep*/) { return -1; }
380 
381 static bool me_can_track_err(grpc_endpoint* /*ep*/) { return false; }
382 
383 static const grpc_endpoint_vtable vtable = {
384  me_read,
385  me_write,
389  me_shutdown,
390  me_destroy,
391  me_get_peer,
393  me_get_fd,
395 };
396 
397 static void half_init(half* m, passthru_endpoint* parent,
398  const char* half_name) {
399  m->base.vtable = &vtable;
400  m->parent = parent;
401  grpc_slice_buffer_init(&m->read_buffer);
402  grpc_slice_buffer_init(&m->write_buffer);
403  m->pending_write_op.slices = nullptr;
404  m->on_read = nullptr;
405  m->bytes_read_so_far = 0;
406  m->bytes_written_so_far = 0;
407  m->pending_write_op.is_armed = false;
408  m->pending_read_op.is_armed = false;
409  std::string name =
410  absl::StrFormat("passthru_endpoint_%s_%p", half_name, parent);
411 }
412 
416  bool simulate_channel_actions) {
418  static_cast<passthru_endpoint*>(gpr_malloc(sizeof(*m)));
419  m->halves = 2;
420  m->shutdown = false;
421  if (stats == nullptr) {
423  } else {
424  gpr_ref(&stats->refs);
425  m->stats = stats;
426  }
427  m->channel_effects = new grpc_passthru_endpoint_channel_effects();
428  m->simulate_channel_actions = simulate_channel_actions;
429  if (!simulate_channel_actions) {
430  m->channel_effects->allowed_read_bytes = UINT64_MAX;
431  m->channel_effects->allowed_write_bytes = UINT64_MAX;
432  }
433  half_init(&m->client, m, "client");
434  half_init(&m->server, m, "server");
435  gpr_mu_init(&m->mu);
436  *client = &m->client.base;
437  *server = &m->server.base;
438 }
439 
442  static_cast<grpc_passthru_endpoint_stats*>(
444  memset(stats, 0, sizeof(*stats));
445  gpr_ref_init(&stats->refs, 1);
446  return stats;
447 }
448 
450  if (gpr_unref(&stats->refs)) {
451  gpr_free(stats);
452  }
453 }
454 
456 
458  half* m = reinterpret_cast<half*>(arg);
459  gpr_mu_lock(&m->parent->mu);
460  GPR_ASSERT(!m->parent->channel_effects->actions.empty());
461  if (m->parent->halves == 0) {
462  gpr_mu_unlock(&m->parent->mu);
464  return;
465  }
466  auto curr_action = m->parent->channel_effects->actions[0];
467  m->parent->channel_effects->actions.erase(
468  m->parent->channel_effects->actions.begin());
469  m->parent->channel_effects->allowed_read_bytes +=
470  curr_action.add_n_readable_bytes;
471  m->parent->channel_effects->allowed_write_bytes +=
472  curr_action.add_n_writable_bytes;
476  gpr_mu_unlock(&m->parent->mu);
477 }
478 
480  if (m->parent->channel_effects->actions.empty()) {
482  GRPC_ERROR_CREATE_FROM_STATIC_STRING("Channel actions complete");
485  return;
486  }
487  grpc_timer_init(&m->parent->channel_effects->timer,
489  m->parent->channel_effects->actions[0].wait_ms) +
492  grpc_schedule_on_exec_ctx));
493 }
494 
496  grpc_endpoint* ep,
497  const std::vector<grpc_passthru_endpoint_channel_action>& actions) {
498  half* m = reinterpret_cast<half*>(ep);
499  gpr_mu_lock(&m->parent->mu);
500  if (!m->parent->simulate_channel_actions || m->parent->shutdown) {
501  gpr_mu_unlock(&m->parent->mu);
502  return;
503  }
504  m->parent->channel_effects->actions = actions;
506  gpr_mu_unlock(&m->parent->mu);
507 }
passthru_endpoint::simulate_channel_actions
bool simulate_channel_actions
Definition: passthru_endpoint.cc:71
grpc_slice_buffer_move_into
GPRAPI void grpc_slice_buffer_move_into(grpc_slice_buffer *src, grpc_slice_buffer *dst)
Definition: slice/slice_buffer.cc:291
grpc_endpoint_vtable
Definition: endpoint.h:39
gpr_mu_unlock
GPRAPI void gpr_mu_unlock(gpr_mu *mu)
GRPC_ERROR_NONE
#define GRPC_ERROR_NONE
Definition: error.h:234
passthru_endpoint::stats
grpc_passthru_endpoint_stats * stats
Definition: passthru_endpoint.cc:69
memset
return memset(p, 0, total)
absl::StrFormat
ABSL_MUST_USE_RESULT std::string StrFormat(const FormatSpec< Args... > &format, const Args &... args)
Definition: abseil-cpp/absl/strings/str_format.h:338
pending_op::slices
grpc_slice_buffer * slices
Definition: passthru_endpoint.cc:41
client
Definition: examples/python/async_streaming/client.py:1
grpc_pollset_set
struct grpc_pollset_set grpc_pollset_set
Definition: iomgr_fwd.h:23
flush_pending_ops_locked
void flush_pending_ops_locked(half *m, grpc_error_handle error)
Definition: passthru_endpoint.cc:294
string.h
absl::string_view
Definition: abseil-cpp/absl/strings/string_view.h:167
me_shutdown
static void me_shutdown(grpc_endpoint *ep, grpc_error_handle why)
Definition: passthru_endpoint.cc:331
gpr_free
GPRAPI void gpr_free(void *ptr)
Definition: alloc.cc:51
GRPC_SLICE_MALLOC
#define GRPC_SLICE_MALLOC(len)
Definition: include/grpc/slice.h:70
testing::internal::string
::std::string string
Definition: bloaty/third_party/protobuf/third_party/googletest/googletest/include/gtest/internal/gtest-port.h:881
error
grpc_error_handle error
Definition: retry_filter.cc:499
half::parent
passthru_endpoint * parent
Definition: passthru_endpoint.cc:55
UINT64_MAX
#define UINT64_MAX
Definition: stdint-msvc2008.h:143
error_ref_leak.err
err
Definition: error_ref_leak.py:35
half::base
grpc_endpoint base
Definition: passthru_endpoint.cc:54
grpc_passthru_endpoint_channel_effects::allowed_read_bytes
uint64_t allowed_read_bytes
Definition: passthru_endpoint.cc:48
half::pending_read_op
pending_op pending_read_op
Definition: passthru_endpoint.cc:60
gpr_malloc
GPRAPI void * gpr_malloc(size_t size)
Definition: alloc.cc:29
GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING
#define GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(desc, errs, count)
Definition: error.h:307
half_init
static void half_init(half *m, passthru_endpoint *parent, const char *half_name)
Definition: passthru_endpoint.cc:397
setup.name
name
Definition: setup.py:542
start_scheduling_grpc_passthru_endpoint_channel_effects
void start_scheduling_grpc_passthru_endpoint_channel_effects(grpc_endpoint *ep, const std::vector< grpc_passthru_endpoint_channel_action > &actions)
Definition: passthru_endpoint.cc:495
passthru_endpoint::channel_effects
grpc_passthru_endpoint_channel_effects * channel_effects
Definition: passthru_endpoint.cc:70
xds_manager.p
p
Definition: xds_manager.py:60
GRPC_CLOSURE_CREATE
#define GRPC_CLOSURE_CREATE(cb, cb_arg, scheduler)
Definition: closure.h:160
grpc_passthru_endpoint_stats_create
grpc_passthru_endpoint_stats * grpc_passthru_endpoint_stats_create()
Definition: passthru_endpoint.cc:440
me_write
static void me_write(grpc_endpoint *ep, grpc_slice_buffer *slices, grpc_closure *cb, void *, int)
Definition: passthru_endpoint.cc:261
grpc_slice_buffer_take_first
GPRAPI grpc_slice grpc_slice_buffer_take_first(grpc_slice_buffer *sb)
Definition: slice/slice_buffer.cc:438
grpc_timer
Definition: iomgr/timer.h:33
sockaddr.h
grpc_slice_copy_split
void grpc_slice_copy_split(grpc_slice src, uint64_t n, grpc_slice &split1, grpc_slice &split2)
Definition: passthru_endpoint.cc:138
me_get_fd
static int me_get_fd(grpc_endpoint *)
Definition: passthru_endpoint.cc:379
me_can_track_err
static bool me_can_track_err(grpc_endpoint *)
Definition: passthru_endpoint.cc:381
grpc_passthru_endpoint_channel_effects
Definition: passthru_endpoint.cc:45
grpc_passthru_endpoint_channel_effects::actions
std::vector< grpc_passthru_endpoint_channel_action > actions
Definition: passthru_endpoint.cc:49
half::bytes_read_so_far
uint64_t bytes_read_so_far
Definition: passthru_endpoint.cc:62
gpr_mu_destroy
GPRAPI void gpr_mu_destroy(gpr_mu *mu)
grpc_passthru_endpoint_channel_effects::on_complete
std::function< void()> on_complete
Definition: passthru_endpoint.cc:50
me_read
static void me_read(grpc_endpoint *ep, grpc_slice_buffer *slices, grpc_closure *cb, bool, int)
Definition: passthru_endpoint.cc:112
DEBUG_LOCATION
#define DEBUG_LOCATION
Definition: debug_location.h:41
string_util.h
grpc_passthru_endpoint_create
void grpc_passthru_endpoint_create(grpc_endpoint **client, grpc_endpoint **server, grpc_passthru_endpoint_stats *stats, bool simulate_channel_actions)
Definition: passthru_endpoint.cc:413
memcpy
memcpy(mem, inblock.get(), min(CONTAINING_RECORD(inblock.get(), MEMBLOCK, data) ->size, size))
passthru_endpoint::server
half server
Definition: passthru_endpoint.cc:74
grpc_passthru_endpoint_destroy
void grpc_passthru_endpoint_destroy(passthru_endpoint *p)
Definition: passthru_endpoint.cc:339
pending_op::ep
grpc_endpoint * ep
Definition: passthru_endpoint.cc:40
GPR_ASSERT
#define GPR_ASSERT(x)
Definition: include/grpc/impl/codegen/log.h:94
me_get_local_address
static absl::string_view me_get_local_address(grpc_endpoint *ep)
Definition: passthru_endpoint.cc:372
passthru_endpoint::mu
gpr_mu mu
Definition: passthru_endpoint.cc:67
half::pending_write_op
pending_op pending_write_op
Definition: passthru_endpoint.cc:61
me_destroy
static void me_destroy(grpc_endpoint *ep)
Definition: passthru_endpoint.cc:350
slice
grpc_slice slice
Definition: src/core/lib/surface/server.cc:467
gpr_mu_init
GPRAPI void gpr_mu_init(gpr_mu *mu)
passthru_endpoint::client
half client
Definition: passthru_endpoint.cc:73
grpc_slice_buffer_reset_and_unref
GPRAPI void grpc_slice_buffer_reset_and_unref(grpc_slice_buffer *sb)
Definition: slice_buffer_api.cc:32
gen_stats_data.stats
list stats
Definition: gen_stats_data.py:58
uint64_t
unsigned __int64 uint64_t
Definition: stdint-msvc2008.h:90
passthru_endpoint::shutdown
bool shutdown
Definition: passthru_endpoint.cc:72
shutdown_locked
static void shutdown_locked(half *m, grpc_error_handle why)
Definition: passthru_endpoint.cc:312
do_pending_write_op_locked
static void do_pending_write_op_locked(half *m, grpc_error_handle error)
Definition: passthru_endpoint.cc:158
half
Definition: passthru_endpoint.cc:53
GRPC_SLICE_START_PTR
#define GRPC_SLICE_START_PTR(slice)
Definition: include/grpc/impl/codegen/slice.h:101
passthru_endpoint::halves
int halves
Definition: passthru_endpoint.cc:68
arg
Definition: cmdline.cc:40
time.h
grpc_empty_slice
GPRAPI grpc_slice grpc_empty_slice(void)
Definition: slice/slice.cc:42
gpr_atm_no_barrier_fetch_add
#define gpr_atm_no_barrier_fetch_add(p, delta)
Definition: impl/codegen/atm_gcc_atomic.h:59
grpc_slice
Definition: include/grpc/impl/codegen/slice.h:65
vtable
static const grpc_endpoint_vtable vtable
Definition: passthru_endpoint.cc:383
gpr_mu_lock
GPRAPI void gpr_mu_lock(gpr_mu *mu)
half::bytes_written_so_far
uint64_t bytes_written_so_far
Definition: passthru_endpoint.cc:63
half::on_read
grpc_closure * on_read
Definition: passthru_endpoint.cc:59
do_pending_read_op_locked
static void do_pending_read_op_locked(half *m, grpc_error_handle error)
Definition: passthru_endpoint.cc:77
sched_next_channel_action_locked
static void sched_next_channel_action_locked(half *m)
Definition: passthru_endpoint.cc:479
slice_internal.h
me_delete_from_pollset_set
static void me_delete_from_pollset_set(grpc_endpoint *, grpc_pollset_set *)
Definition: passthru_endpoint.cc:309
me_get_peer
static absl::string_view me_get_peer(grpc_endpoint *ep)
Definition: passthru_endpoint.cc:365
grpc_passthru_endpoint_stats
Definition: passthru_endpoint.h:29
pending_op
Definition: passthru_endpoint.cc:38
n
int n
Definition: abseil-cpp/absl/container/btree_test.cc:1080
tests.qps.qps_worker.dest
dest
Definition: qps_worker.py:45
grpc_slice_buffer_init
GPRAPI void grpc_slice_buffer_init(grpc_slice_buffer *sb)
Definition: slice/slice_buffer.cc:116
me_add_to_pollset
static void me_add_to_pollset(grpc_endpoint *, grpc_pollset *)
Definition: passthru_endpoint.cc:303
GRPC_ERROR_CREATE_FROM_STATIC_STRING
#define GRPC_ERROR_CREATE_FROM_STATIC_STRING(desc)
Definition: error.h:291
half::on_read_out
grpc_slice_buffer * on_read_out
Definition: passthru_endpoint.cc:58
GRPC_SLICE_LENGTH
#define GRPC_SLICE_LENGTH(slice)
Definition: include/grpc/impl/codegen/slice.h:104
grpc_passthru_endpoint_channel_effects::allowed_write_bytes
uint64_t allowed_write_bytes
Definition: passthru_endpoint.cc:47
grpc_passthru_endpoint_channel_effects::timer
grpc_timer timer
Definition: passthru_endpoint.cc:46
gpr_atm
intptr_t gpr_atm
Definition: impl/codegen/atm_gcc_atomic.h:32
grpc_timer_cancel
void grpc_timer_cancel(grpc_timer *timer)
Definition: iomgr/timer.cc:36
grpc_core::Duration::Milliseconds
static constexpr Duration Milliseconds(int64_t millis)
Definition: src/core/lib/gprpp/time.h:155
do_next_sched_channel_action
static void do_next_sched_channel_action(void *arg, grpc_error_handle error)
Definition: passthru_endpoint.cc:457
passthru_endpoint
Definition: passthru_endpoint.cc:66
half::write_buffer
grpc_slice_buffer write_buffer
Definition: passthru_endpoint.cc:57
server
Definition: examples/python/async_streaming/server.py:1
gpr_mu
pthread_mutex_t gpr_mu
Definition: impl/codegen/sync_posix.h:47
grpc_slice_buffer_undo_take_first
GPRAPI void grpc_slice_buffer_undo_take_first(grpc_slice_buffer *sb, grpc_slice slice)
Definition: slice/slice_buffer.cc:467
passthru_endpoint.h
alloc.h
slices
SliceBuffer * slices
Definition: retry_filter.cc:631
arg
struct arg arg
grpc_timer_init
void grpc_timer_init(grpc_timer *timer, grpc_core::Timestamp deadline, grpc_closure *closure)
Definition: iomgr/timer.cc:31
grpc_slice_buffer_move_first
GPRAPI void grpc_slice_buffer_move_first(grpc_slice_buffer *src, size_t n, grpc_slice_buffer *dst)
Definition: slice/slice_buffer.cc:348
GRPC_ERROR_UNREF
#define GRPC_ERROR_UNREF(err)
Definition: error.h:262
grpc_core::ExecCtx::Run
static void Run(const DebugLocation &location, grpc_closure *closure, grpc_error_handle error)
Definition: exec_ctx.cc:98
timer.h
grpc_slice_buffer_destroy_internal
void grpc_slice_buffer_destroy_internal(grpc_slice_buffer *sb)
Definition: slice/slice_buffer.cc:123
half::read_buffer
grpc_slice_buffer read_buffer
Definition: passthru_endpoint.cc:56
grpc_slice_buffer
Definition: include/grpc/impl/codegen/slice.h:83
gpr_ref_init
GPRAPI void gpr_ref_init(gpr_refcount *r, int n)
Definition: sync.cc:86
grpc_core::ExecCtx::Now
Timestamp Now()
Definition: exec_ctx.cc:90
gpr_unref
GPRAPI int gpr_unref(gpr_refcount *r)
Definition: sync.cc:103
grpc_error
Definition: error_internal.h:42
function
std::function< bool(GrpcTool *, int, const char **, const CliCredentials &, GrpcToolOutputCallback)> function
Definition: grpc_tool.cc:250
regress.m
m
Definition: regress/regress.py:25
grpc_pollset
Definition: bm_cq_multiple_threads.cc:37
grpc_closure
Definition: closure.h:56
grpc_passthru_endpoint_stats_destroy
void grpc_passthru_endpoint_stats_destroy(grpc_passthru_endpoint_stats *stats)
Definition: passthru_endpoint.cc:449
grpc_endpoint
Definition: endpoint.h:105
me_add_to_pollset_set
static void me_add_to_pollset_set(grpc_endpoint *, grpc_pollset_set *)
Definition: passthru_endpoint.cc:306
gpr_ref
GPRAPI void gpr_ref(gpr_refcount *r)
Definition: sync.cc:88
other_half
static half * other_half(half *h)
Definition: passthru_endpoint.cc:153
pending_op::cb
grpc_closure * cb
Definition: passthru_endpoint.cc:42
cb
OPENSSL_EXPORT pem_password_cb * cb
Definition: pem.h:351
grpc_core::ExecCtx::Get
static ExecCtx * Get()
Definition: exec_ctx.h:205
pending_op::is_armed
bool is_armed
Definition: passthru_endpoint.cc:39
i
uint64_t i
Definition: abseil-cpp/absl/container/btree_benchmark.cc:230
grpc_slice_unref_internal
void grpc_slice_unref_internal(const grpc_slice &slice)
Definition: slice_refcount.h:39
grpc_slice_buffer_add_indexed
GPRAPI size_t grpc_slice_buffer_add_indexed(grpc_slice_buffer *sb, grpc_slice slice)
Definition: slice/slice_buffer.cc:161
grpc_slice_copy
GPRAPI grpc_slice grpc_slice_copy(grpc_slice s)
Definition: slice/slice.cc:46


grpc
Author(s):
autogenerated on Fri May 16 2025 02:59:41