transport.cc
Go to the documentation of this file.
1 /*
2  *
3  * Copyright 2015 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
20 
22 
23 #include <string.h>
24 
25 #include <new>
26 
27 #include "src/core/lib/gpr/alloc.h"
32 
34  "stream_refcount");
35 
40  /* Ick.
41  The thread we're running on MAY be owned (indirectly) by a call-stack.
42  If that's the case, destroying the call-stack MAY try to destroy the
43  thread, which is a tangled mess that we just don't want to ever have to
44  cope with.
45  Throw this over to the executor (on a core-owned thread) and process it
46  there. */
48  } else {
51  }
52 }
53 
56 }
57 
58 #ifndef NDEBUG
60  grpc_iomgr_cb_func cb, void* cb_arg,
61  const char* object_type) {
62  refcount->object_type = object_type;
63 #else
64 void grpc_stream_ref_init(grpc_stream_refcount* refcount, int /*initial_refs*/,
65  grpc_iomgr_cb_func cb, void* cb_arg) {
66 #endif
67  GRPC_CLOSURE_INIT(&refcount->destroy, cb, cb_arg, grpc_schedule_on_exec_ctx);
68 
69  new (&refcount->refs) grpc_core::RefCount(
71  : nullptr);
72 }
73 
74 static void move64bits(uint64_t* from, uint64_t* to) {
75  *to += *from;
76  *from = 0;
77 }
78 
81  move64bits(&from->framing_bytes, &to->framing_bytes);
82  move64bits(&from->data_bytes, &to->data_bytes);
83  move64bits(&from->header_bytes, &to->header_bytes);
84 }
85 
88  grpc_transport_move_one_way_stats(&from->incoming, &to->incoming);
89  grpc_transport_move_one_way_stats(&from->outgoing, &to->outgoing);
90 }
91 
94 }
95 
98 }
99 
102  const void* server_data,
105  server_data, arena);
106 }
107 
112 }
113 
117 }
118 
120  grpc_polling_entity* pollent) {
121  grpc_pollset* pollset;
122  grpc_pollset_set* pollset_set;
123  if ((pollset = grpc_polling_entity_pollset(pollent)) != nullptr) {
125  } else if ((pollset_set = grpc_polling_entity_pollset_set(pollent)) !=
126  nullptr) {
128  } else {
129  // No-op for empty pollset. Empty pollset is possible when using
130  // non-fd-based event engines such as CFStream.
131  }
132 }
133 
136  grpc_closure* then_schedule_closure) {
137  transport->vtable->destroy_stream(transport, stream, then_schedule_closure);
138 }
139 
142 }
143 
144 // This comment should be sung to the tune of
145 // "Supercalifragilisticexpialidocious":
146 //
147 // grpc_transport_stream_op_batch_finish_with_failure
148 // is a function that must always unref cancel_error
149 // though it lives in lib, it handles transport stream ops sure
150 // it's grpc_transport_stream_op_batch_finish_with_failure
153  grpc_core::CallCombiner* call_combiner) {
156  &closures);
157  // Execute closures.
158  closures.RunClosures(call_combiner);
159 }
160 
164  if (batch->cancel_stream) {
166  }
167  // Construct a list of closures to execute.
169  closures->Add(
170  batch->payload->recv_initial_metadata.recv_initial_metadata_ready,
171  GRPC_ERROR_REF(error), "failing recv_initial_metadata_ready");
172  }
173  if (batch->recv_message) {
174  closures->Add(batch->payload->recv_message.recv_message_ready,
175  GRPC_ERROR_REF(error), "failing recv_message_ready");
176  }
178  closures->Add(
179  batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready,
180  GRPC_ERROR_REF(error), "failing recv_trailing_metadata_ready");
181  }
182  if (batch->on_complete != nullptr) {
184  "failing on_complete");
185  }
187 }
188 
195  }
196 };
197 
199  made_transport_op* op = static_cast<made_transport_op*>(arg);
200  grpc_core::ExecCtx::Run(DEBUG_LOCATION, op->inner_on_complete,
202  delete op;
203 }
204 
207  GRPC_CLOSURE_INIT(&op->outer_on_complete, destroy_made_transport_op, op,
208  grpc_schedule_on_exec_ctx);
209  op->inner_on_complete = on_complete;
210  op->op.on_consumed = &op->outer_on_complete;
211  return &op->op;
212 }
213 
219 };
223  grpc_closure* c = op->inner_on_complete;
224  delete op;
225  if (c != nullptr) {
227  }
228 }
229 
231  grpc_closure* on_complete) {
233  op->op.payload = &op->payload;
235  op, grpc_schedule_on_exec_ctx);
236  op->inner_on_complete = on_complete;
237  op->op.on_complete = &op->outer_on_complete;
238  return &op->op;
239 }
GRPC_CLOSURE_INIT
#define GRPC_CLOSURE_INIT(closure, cb, cb_arg, scheduler)
Definition: closure.h:115
grpc_core::CallCombinerClosureList
Definition: call_combiner.h:144
grpc_transport_move_stats
void grpc_transport_move_stats(grpc_transport_stream_stats *from, grpc_transport_stream_stats *to)
Definition: transport.cc:86
iomgr.h
grpc_core::CallCombiner
Definition: call_combiner.h:50
grpc_polling_entity_pollset
grpc_pollset * grpc_polling_entity_pollset(grpc_polling_entity *pollent)
Definition: polling_entity.cc:42
move64bits
static void move64bits(uint64_t *from, uint64_t *to)
Definition: transport.cc:74
GRPC_ERROR_NONE
#define GRPC_ERROR_NONE
Definition: error.h:234
grpc_transport_vtable::perform_op
void(* perform_op)(grpc_transport *self, grpc_transport_op *op)
Definition: transport_impl.h:75
memset
return memset(p, 0, total)
destroy_made_transport_op
static void destroy_made_transport_op(void *arg, grpc_error_handle error)
Definition: transport.cc:198
grpc_transport_move_one_way_stats
void grpc_transport_move_one_way_stats(grpc_transport_one_way_stats *from, grpc_transport_one_way_stats *to)
Definition: transport.cc:79
grpc_transport_stream_op_batch::recv_message
bool recv_message
Definition: transport.h:322
grpc_iomgr_is_any_background_poller_thread
bool grpc_iomgr_is_any_background_poller_thread()
Definition: iomgr.cc:175
made_transport_op::op
grpc_transport_op op
Definition: transport.cc:192
grpc_transport_stream_op_batch::on_complete
grpc_closure * on_complete
Definition: transport.h:304
grpc_pollset_set
struct grpc_pollset_set grpc_pollset_set
Definition: iomgr_fwd.h:23
string.h
grpc_transport_stream_op_batch_finish_with_failure
void grpc_transport_stream_op_batch_finish_with_failure(grpc_transport_stream_op_batch *batch, grpc_error_handle error, grpc_core::CallCombiner *call_combiner)
Definition: transport.cc:151
grpc_transport_perform_op
void grpc_transport_perform_op(grpc_transport *transport, grpc_transport_op *op)
Definition: transport.cc:114
error
grpc_error_handle error
Definition: retry_filter.cc:499
grpc_transport_stream_op_batch_queue_finish_with_failure
void grpc_transport_stream_op_batch_queue_finish_with_failure(grpc_transport_stream_op_batch *batch, grpc_error_handle error, grpc_core::CallCombinerClosureList *closures)
Definition: transport.cc:161
grpc_transport_vtable::set_pollset
void(* set_pollset)(grpc_transport *self, grpc_stream *stream, grpc_pollset *pollset)
Definition: transport_impl.h:63
made_transport_stream_op::op
grpc_transport_stream_op_batch op
Definition: transport.cc:217
to
size_t to
Definition: abseil-cpp/absl/container/internal/layout_test.cc:1385
made_transport_stream_op
Definition: transport.cc:214
grpc_transport_vtable::set_pollset_set
void(* set_pollset_set)(grpc_transport *self, grpc_stream *stream, grpc_pollset_set *pollset_set)
Definition: transport_impl.h:67
grpc_transport_vtable::perform_stream_op
void(* perform_stream_op)(grpc_transport *self, grpc_stream *stream, grpc_transport_stream_op_batch *op)
Definition: transport_impl.h:71
made_transport_op
Definition: transport.cc:189
grpc_core::Arena
Definition: src/core/lib/resource_quota/arena.h:45
GRPC_TRACE_FLAG_ENABLED
#define GRPC_TRACE_FLAG_ENABLED(f)
Definition: debug/trace.h:114
grpc_transport_perform_stream_op
void grpc_transport_perform_stream_op(grpc_transport *transport, grpc_stream *stream, grpc_transport_stream_op_batch *op)
Definition: transport.cc:108
grpc_stream_destroy
void grpc_stream_destroy(grpc_stream_refcount *refcount)
Definition: transport.cc:36
grpc_core::Executor::Run
static void Run(grpc_closure *closure, grpc_error_handle error, ExecutorType executor_type=ExecutorType::DEFAULT, ExecutorJobType job_type=ExecutorJobType::SHORT)
Definition: executor.cc:398
grpc_transport_op
Definition: transport.h:452
arena
grpc_core::ScopedArenaPtr arena
Definition: binder_transport_test.cc:237
alloc.h
made_transport_op::made_transport_op
made_transport_op()
Definition: transport.cc:193
from
size_t from
Definition: abseil-cpp/absl/container/internal/layout_test.cc:1384
DEBUG_LOCATION
#define DEBUG_LOCATION
Definition: debug_location.h:41
GPR_ROUND_UP_TO_ALIGNMENT_SIZE
#define GPR_ROUND_UP_TO_ALIGNMENT_SIZE(x)
Given a size, round up to the next multiple of sizeof(void*).
Definition: src/core/lib/gpr/alloc.h:25
refcount
size_t refcount
Definition: abseil-cpp/absl/strings/internal/cordz_info.cc:122
grpc_transport_stream_op_batch::cancel_stream
bool cancel_stream
Definition: transport.h:329
grpc_transport_get_endpoint
grpc_endpoint * grpc_transport_get_endpoint(grpc_transport *transport)
Definition: transport.cc:140
c
void c(T a)
Definition: miscompile_with_no_unique_address_test.cc:40
grpc_polling_entity_pollset_set
grpc_pollset_set * grpc_polling_entity_pollset_set(grpc_polling_entity *pollent)
Definition: polling_entity.cc:49
transport
grpc_transport transport
Definition: filter_fuzzer.cc:146
grpc_transport_destroy
void grpc_transport_destroy(grpc_transport *transport)
Definition: transport.cc:96
made_transport_op::outer_on_complete
grpc_closure outer_on_complete
Definition: transport.cc:190
grpc_transport_stream_op_batch_payload::cancel_error
grpc_error_handle cancel_error
Definition: transport.h:444
uint64_t
unsigned __int64 uint64_t
Definition: stdint-msvc2008.h:90
grpc_transport_vtable::sizeof_stream
size_t sizeof_stream
Definition: transport_impl.h:40
arg
Definition: cmdline.cc:40
grpc_transport_stream_op_batch::payload
grpc_transport_stream_op_batch_payload * payload
Definition: transport.h:307
grpc_transport_vtable::get_endpoint
grpc_endpoint *(* get_endpoint)(grpc_transport *self)
Definition: transport_impl.h:85
grpc_make_transport_op
grpc_transport_op * grpc_make_transport_op(grpc_closure *on_complete)
Definition: transport.cc:205
made_transport_stream_op::outer_on_complete
grpc_closure outer_on_complete
Definition: transport.cc:215
grpc_polling_entity
Definition: polling_entity.h:38
batch
grpc_transport_stream_op_batch * batch
Definition: retry_filter.cc:243
grpc_transport_one_way_stats
Definition: transport.h:243
executor.h
grpc_op::op
grpc_op_type op
Definition: grpc_types.h:642
grpc_transport_vtable::destroy_stream
void(* destroy_stream)(grpc_transport *self, grpc_stream *stream, grpc_closure *then_schedule_closure)
Definition: transport_impl.h:78
made_transport_op::inner_on_complete
grpc_closure * inner_on_complete
Definition: transport.cc:191
grpc_core::TraceFlag
Definition: debug/trace.h:63
grpc_make_transport_stream_op
grpc_transport_stream_op_batch * grpc_make_transport_stream_op(grpc_closure *on_complete)
Definition: transport.cc:230
grpc_core::CallCombinerClosureList::RunClosures
void RunClosures(CallCombiner *call_combiner)
Definition: call_combiner.h:161
grpc_transport_stream_op_batch::recv_initial_metadata
bool recv_initial_metadata
Definition: transport.h:319
grpc_transport_vtable::init_stream
int(* init_stream)(grpc_transport *self, grpc_stream *stream, grpc_stream_refcount *refcount, const void *server_data, grpc_core::Arena *arena)
Definition: transport_impl.h:46
grpc_transport_stream_op_batch_payload
Definition: transport.h:341
grpc_stream_ref_init
void grpc_stream_ref_init(grpc_stream_refcount *refcount, int, grpc_iomgr_cb_func cb, void *cb_arg, const char *object_type)
Definition: transport.cc:59
GRPC_ERROR_REF
#define GRPC_ERROR_REF(err)
Definition: error.h:261
grpc_transport_stream_size
size_t grpc_transport_stream_size(grpc_transport *transport)
Definition: transport.cc:92
grpc_core::CallCombinerClosureList::Add
void Add(grpc_closure *closure, grpc_error_handle error, const char *reason)
Definition: call_combiner.h:150
grpc_transport_stream_stats
Definition: transport.h:249
grpc_transport::vtable
const grpc_transport_vtable * vtable
Definition: transport_impl.h:93
grpc_trace_stream_refcount
grpc_core::DebugOnlyTraceFlag grpc_trace_stream_refcount(false, "stream_refcount")
made_transport_stream_op::inner_on_complete
grpc_closure * inner_on_complete
Definition: transport.cc:216
grpc_iomgr_cb_func
void(* grpc_iomgr_cb_func)(void *arg, grpc_error_handle error)
Definition: closure.h:53
grpc_transport_stream_op_batch_payload::recv_initial_metadata
grpc_metadata_batch * recv_initial_metadata
Definition: transport.h:390
arg
struct arg arg
grpc_transport_stream_op_batch_payload::recv_message
absl::optional< grpc_core::SliceBuffer > * recv_message
Definition: transport.h:416
exec_ctx.h
GRPC_ERROR_UNREF
#define GRPC_ERROR_UNREF(err)
Definition: error.h:262
grpc_core::ExecCtx::Run
static void Run(const DebugLocation &location, grpc_closure *closure, grpc_error_handle error)
Definition: exec_ctx.cc:98
grpc_transport
Definition: transport_impl.h:89
transport.h
grpc_stream
struct grpc_stream grpc_stream
Definition: transport.h:174
grpc_transport_stream_op_batch::recv_trailing_metadata
bool recv_trailing_metadata
Definition: transport.h:326
destroy_made_transport_stream_op
static void destroy_made_transport_stream_op(void *arg, grpc_error_handle error)
Definition: transport.cc:220
grpc_transport_destroy_stream
void grpc_transport_destroy_stream(grpc_transport *transport, grpc_stream *stream, grpc_closure *then_schedule_closure)
Definition: transport.cc:134
flags
uint32_t flags
Definition: retry_filter.cc:632
transport_impl.h
grpc_transport_stream_op_batch_payload::recv_trailing_metadata
grpc_metadata_batch * recv_trailing_metadata
Definition: transport.h:425
grpc_transport_set_pops
void grpc_transport_set_pops(grpc_transport *transport, grpc_stream *stream, grpc_polling_entity *pollent)
Definition: transport.cc:119
grpc_error
Definition: error_internal.h:42
slice_stream_destroy
void slice_stream_destroy(void *arg)
Definition: transport.cc:54
grpc_pollset
Definition: bm_cq_multiple_threads.cc:37
grpc_transport_stream_op_batch
Definition: transport.h:284
grpc_core::RefCount
Definition: ref_counted.h:44
grpc_closure
Definition: closure.h:56
grpc_transport_vtable::destroy
void(* destroy)(grpc_transport *self)
Definition: transport_impl.h:82
op
static grpc_op * op
Definition: test/core/fling/client.cc:47
GRPC_EXEC_CTX_FLAG_THREAD_RESOURCE_LOOP
#define GRPC_EXEC_CTX_FLAG_THREAD_RESOURCE_LOOP
Definition: exec_ctx.h:48
grpc_endpoint
Definition: endpoint.h:105
grpc_core::Closure::Run
static void Run(const DebugLocation &location, grpc_closure *closure, grpc_error_handle error)
Definition: closure.h:250
grpc_transport_init_stream
int grpc_transport_init_stream(grpc_transport *transport, grpc_stream *stream, grpc_stream_refcount *refcount, const void *server_data, grpc_core::Arena *arena)
Definition: transport.cc:100
cb
OPENSSL_EXPORT pem_password_cb * cb
Definition: pem.h:351
grpc_core::ExecCtx::Get
static ExecCtx * Get()
Definition: exec_ctx.h:205
grpc_stream_refcount
Definition: transport.h:178
grpc_transport_stream_op_batch_payload::cancel_stream
struct grpc_transport_stream_op_batch_payload::@46 cancel_stream
made_transport_stream_op::payload
grpc_transport_stream_op_batch_payload payload
Definition: transport.cc:218
port_platform.h
stream
voidpf stream
Definition: bloaty/third_party/zlib/contrib/minizip/ioapi.h:136


grpc
Author(s):
autogenerated on Fri May 16 2025 03:00:40