Go to the documentation of this file.
26 #include "absl/strings/str_format.h"
49 std::vector<grpc_passthru_endpoint_channel_action>
actions;
80 m->parent->channel_effects->allowed_read_bytes);
81 if (
m->parent->shutdown) {
88 m->pending_read_op.is_armed =
false;
92 if (
m->bytes_read_so_far ==
m->parent->channel_effects->allowed_read_bytes) {
99 uint64_t readable_length = std::min<uint64_t>(
100 m->read_buffer.length,
101 m->parent->channel_effects->allowed_read_bytes -
m->bytes_read_so_far);
104 m->pending_read_op.slices);
106 if (
m->parent->simulate_channel_actions) {
107 m->bytes_read_so_far += readable_length;
109 m->pending_read_op.is_armed =
false;
117 if (
m->parent->shutdown) {
121 }
else if (
m->read_buffer.count > 0) {
124 m->pending_read_op.is_armed =
true;
125 m->pending_read_op.cb =
cb;
126 m->pending_read_op.ep = ep;
127 m->pending_read_op.slices =
slices;
154 if (h == &h->parent->client)
return &h->parent->server;
155 return &h->parent->client;
161 m->parent->channel_effects->allowed_write_bytes);
162 if (
m->parent->shutdown) {
166 m->pending_write_op.is_armed =
false;
170 if (
m->bytes_written_so_far ==
171 m->parent->channel_effects->allowed_write_bytes) {
178 std::min<uint64_t>(
m->pending_write_op.slices->length,
179 m->parent->channel_effects->allowed_write_bytes -
180 m->bytes_written_so_far);
184 other->
on_read !=
nullptr ? std::min<uint64_t>(max_readable, max_writable)
190 if (
m->parent->simulate_channel_actions) {
191 m->bytes_written_so_far += max_writable;
195 bool would_write_be_pending =
196 max_writable <
m->pending_write_op.slices->length;
197 if (!
m->parent->simulate_channel_actions) {
203 while (max_writable > 0) {
209 if (slice_length <= max_readable) {
210 split_length = std::min<uint64_t>(slice_length, max_writable);
211 }
else if (max_readable > 0) {
213 split_length = std::min<uint64_t>(max_readable, max_writable);
220 split_length = std::min<uint64_t>(slice_length, max_writable);
228 if (GPR_SLICE_LENGTH(split2) > 0) {
234 if (max_readable > 0) {
236 max_readable -= split_length;
240 max_writable -= split_length;
243 if (immediate_bytes_read > 0) {
245 if (
m->parent->simulate_channel_actions) {
252 if (!would_write_be_pending) {
254 GPR_ASSERT(
m->pending_write_op.slices->count == 0);
256 m->pending_write_op.is_armed =
false;
266 if (
m->parent->shutdown) {
273 m->pending_write_op.slices = &
m->write_buffer;
274 GPR_ASSERT(
m->pending_write_op.slices->count == 0);
275 for (
int i = 0; i < static_cast<int>(
slices->count);
i++) {
276 if (GPR_SLICE_LENGTH(
slices->slices[
i]) > 0) {
281 if (
m->pending_write_op.slices->count > 0) {
282 m->pending_write_op.is_armed =
true;
283 m->pending_write_op.cb =
cb;
284 m->pending_write_op.ep = ep;
295 if (
m->pending_read_op.is_armed) {
298 if (
m->pending_write_op.is_armed) {
313 m->parent->shutdown =
true;
319 m->on_read =
nullptr;
327 m->on_read =
nullptr;
342 delete p->channel_effects;
353 if (0 == --
p->halves &&
p->channel_effects->actions.empty()) {
358 if (
p->halves == 0 &&
p->simulate_channel_actions) {
367 return (
reinterpret_cast<half*
>(ep)) == &
p->client
368 ?
"fake:mock_client_endpoint"
369 :
"fake:mock_server_endpoint";
374 return (
reinterpret_cast<half*
>(ep)) == &
p->client
375 ?
"fake:mock_client_endpoint"
376 :
"fake:mock_server_endpoint";
398 const char* half_name) {
403 m->pending_write_op.slices =
nullptr;
404 m->on_read =
nullptr;
405 m->bytes_read_so_far = 0;
406 m->bytes_written_so_far = 0;
407 m->pending_write_op.is_armed =
false;
408 m->pending_read_op.is_armed =
false;
416 bool simulate_channel_actions) {
421 if (
stats ==
nullptr) {
428 m->simulate_channel_actions = simulate_channel_actions;
429 if (!simulate_channel_actions) {
430 m->channel_effects->allowed_read_bytes =
UINT64_MAX;
431 m->channel_effects->allowed_write_bytes =
UINT64_MAX;
460 GPR_ASSERT(!
m->parent->channel_effects->actions.empty());
461 if (
m->parent->halves == 0) {
466 auto curr_action =
m->parent->channel_effects->actions[0];
467 m->parent->channel_effects->actions.erase(
468 m->parent->channel_effects->actions.begin());
469 m->parent->channel_effects->allowed_read_bytes +=
470 curr_action.add_n_readable_bytes;
471 m->parent->channel_effects->allowed_write_bytes +=
472 curr_action.add_n_writable_bytes;
480 if (
m->parent->channel_effects->actions.empty()) {
489 m->parent->channel_effects->actions[0].wait_ms) +
492 grpc_schedule_on_exec_ctx));
497 const std::vector<grpc_passthru_endpoint_channel_action>& actions) {
500 if (!
m->parent->simulate_channel_actions ||
m->parent->shutdown) {
504 m->parent->channel_effects->actions = actions;
bool simulate_channel_actions
GPRAPI void grpc_slice_buffer_move_into(grpc_slice_buffer *src, grpc_slice_buffer *dst)
GPRAPI void gpr_mu_unlock(gpr_mu *mu)
grpc_passthru_endpoint_stats * stats
return memset(p, 0, total)
ABSL_MUST_USE_RESULT std::string StrFormat(const FormatSpec< Args... > &format, const Args &... args)
grpc_slice_buffer * slices
struct grpc_pollset_set grpc_pollset_set
void flush_pending_ops_locked(half *m, grpc_error_handle error)
static void me_shutdown(grpc_endpoint *ep, grpc_error_handle why)
GPRAPI void gpr_free(void *ptr)
#define GRPC_SLICE_MALLOC(len)
passthru_endpoint * parent
uint64_t allowed_read_bytes
pending_op pending_read_op
GPRAPI void * gpr_malloc(size_t size)
#define GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(desc, errs, count)
static void half_init(half *m, passthru_endpoint *parent, const char *half_name)
void start_scheduling_grpc_passthru_endpoint_channel_effects(grpc_endpoint *ep, const std::vector< grpc_passthru_endpoint_channel_action > &actions)
grpc_passthru_endpoint_channel_effects * channel_effects
#define GRPC_CLOSURE_CREATE(cb, cb_arg, scheduler)
grpc_passthru_endpoint_stats * grpc_passthru_endpoint_stats_create()
static void me_write(grpc_endpoint *ep, grpc_slice_buffer *slices, grpc_closure *cb, void *, int)
GPRAPI grpc_slice grpc_slice_buffer_take_first(grpc_slice_buffer *sb)
void grpc_slice_copy_split(grpc_slice src, uint64_t n, grpc_slice &split1, grpc_slice &split2)
static int me_get_fd(grpc_endpoint *)
static bool me_can_track_err(grpc_endpoint *)
std::vector< grpc_passthru_endpoint_channel_action > actions
uint64_t bytes_read_so_far
GPRAPI void gpr_mu_destroy(gpr_mu *mu)
std::function< void()> on_complete
static void me_read(grpc_endpoint *ep, grpc_slice_buffer *slices, grpc_closure *cb, bool, int)
void grpc_passthru_endpoint_create(grpc_endpoint **client, grpc_endpoint **server, grpc_passthru_endpoint_stats *stats, bool simulate_channel_actions)
memcpy(mem, inblock.get(), min(CONTAINING_RECORD(inblock.get(), MEMBLOCK, data) ->size, size))
void grpc_passthru_endpoint_destroy(passthru_endpoint *p)
static absl::string_view me_get_local_address(grpc_endpoint *ep)
pending_op pending_write_op
static void me_destroy(grpc_endpoint *ep)
GPRAPI void gpr_mu_init(gpr_mu *mu)
GPRAPI void grpc_slice_buffer_reset_and_unref(grpc_slice_buffer *sb)
unsigned __int64 uint64_t
static void shutdown_locked(half *m, grpc_error_handle why)
static void do_pending_write_op_locked(half *m, grpc_error_handle error)
#define GRPC_SLICE_START_PTR(slice)
GPRAPI grpc_slice grpc_empty_slice(void)
#define gpr_atm_no_barrier_fetch_add(p, delta)
static const grpc_endpoint_vtable vtable
GPRAPI void gpr_mu_lock(gpr_mu *mu)
uint64_t bytes_written_so_far
static void do_pending_read_op_locked(half *m, grpc_error_handle error)
static void sched_next_channel_action_locked(half *m)
static void me_delete_from_pollset_set(grpc_endpoint *, grpc_pollset_set *)
static absl::string_view me_get_peer(grpc_endpoint *ep)
GPRAPI void grpc_slice_buffer_init(grpc_slice_buffer *sb)
static void me_add_to_pollset(grpc_endpoint *, grpc_pollset *)
#define GRPC_ERROR_CREATE_FROM_STATIC_STRING(desc)
grpc_slice_buffer * on_read_out
#define GRPC_SLICE_LENGTH(slice)
uint64_t allowed_write_bytes
void grpc_timer_cancel(grpc_timer *timer)
static constexpr Duration Milliseconds(int64_t millis)
static void do_next_sched_channel_action(void *arg, grpc_error_handle error)
grpc_slice_buffer write_buffer
GPRAPI void grpc_slice_buffer_undo_take_first(grpc_slice_buffer *sb, grpc_slice slice)
void grpc_timer_init(grpc_timer *timer, grpc_core::Timestamp deadline, grpc_closure *closure)
GPRAPI void grpc_slice_buffer_move_first(grpc_slice_buffer *src, size_t n, grpc_slice_buffer *dst)
#define GRPC_ERROR_UNREF(err)
static void Run(const DebugLocation &location, grpc_closure *closure, grpc_error_handle error)
void grpc_slice_buffer_destroy_internal(grpc_slice_buffer *sb)
grpc_slice_buffer read_buffer
GPRAPI void gpr_ref_init(gpr_refcount *r, int n)
GPRAPI int gpr_unref(gpr_refcount *r)
void grpc_passthru_endpoint_stats_destroy(grpc_passthru_endpoint_stats *stats)
static void me_add_to_pollset_set(grpc_endpoint *, grpc_pollset_set *)
GPRAPI void gpr_ref(gpr_refcount *r)
static half * other_half(half *h)
OPENSSL_EXPORT pem_password_cb * cb
void grpc_slice_unref_internal(const grpc_slice &slice)
GPRAPI size_t grpc_slice_buffer_add_indexed(grpc_slice_buffer *sb, grpc_slice slice)
GPRAPI grpc_slice grpc_slice_copy(grpc_slice s)
grpc
Author(s):
autogenerated on Fri May 16 2025 02:59:41