connected_channel.cc
Go to the documentation of this file.
1 /*
2  *
3  * Copyright 2015 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
20 
22 
24 #include <grpc/support/alloc.h>
25 #include <grpc/support/log.h>
26 
29 #include "src/core/lib/gpr/alloc.h"
36 
37 #define MAX_BUFFER_LENGTH 8192
38 
41 } channel_data;
42 
47  const char* reason;
48 };
51  // Closures used for returning results on the call combiner.
52  callback_state on_complete[6]; // Max number of pending batches.
56 } call_data;
57 
59  callback_state* state = static_cast<callback_state*>(arg);
60  GRPC_CALL_COMBINER_START(state->call_combiner, state->original_closure,
61  GRPC_ERROR_REF(error), state->reason);
62 }
63 
66  gpr_free(arg);
67 }
68 
70  bool free_when_done, const char* reason,
71  grpc_closure** original_closure) {
72  state->original_closure = *original_closure;
73  state->call_combiner = calld->call_combiner;
74  state->reason = reason;
75  *original_closure = GRPC_CLOSURE_INIT(
76  &state->closure,
78  state, grpc_schedule_on_exec_ctx);
79 }
80 
83  if (batch->send_initial_metadata) return &calld->on_complete[0];
84  if (batch->send_message) return &calld->on_complete[1];
85  if (batch->send_trailing_metadata) return &calld->on_complete[2];
86  if (batch->recv_initial_metadata) return &calld->on_complete[3];
87  if (batch->recv_message) return &calld->on_complete[4];
88  if (batch->recv_trailing_metadata) return &calld->on_complete[5];
89  GPR_UNREACHABLE_CODE(return nullptr);
90 }
91 
92 /* We perform a small hack to locate transport data alongside the connected
93  channel data in call allocations, to allow everything to be pulled in minimal
94  cache line requests */
95 #define TRANSPORT_STREAM_FROM_CALL_DATA(calld) \
96  ((grpc_stream*)(((char*)(calld)) + \
97  GPR_ROUND_UP_TO_ALIGNMENT_SIZE(sizeof(call_data))))
98 #define CALL_DATA_FROM_TRANSPORT_STREAM(transport_stream) \
99  ((call_data*)(((char*)(transport_stream)) - \
100  GPR_ROUND_UP_TO_ALIGNMENT_SIZE(sizeof(call_data))))
101 
102 /* Intercept a call operation and either push it directly up or translate it
103  into transport stream operations */
106  call_data* calld = static_cast<call_data*>(elem->call_data);
107  channel_data* chand = static_cast<channel_data*>(elem->channel_data);
109  callback_state* state = &calld->recv_initial_metadata_ready;
111  calld, state, false, "recv_initial_metadata_ready",
112  &batch->payload->recv_initial_metadata.recv_initial_metadata_ready);
113  }
114  if (batch->recv_message) {
115  callback_state* state = &calld->recv_message_ready;
116  intercept_callback(calld, state, false, "recv_message_ready",
117  &batch->payload->recv_message.recv_message_ready);
118  }
120  callback_state* state = &calld->recv_trailing_metadata_ready;
122  calld, state, false, "recv_trailing_metadata_ready",
123  &batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready);
124  }
125  if (batch->cancel_stream) {
126  // There can be more than one cancellation batch in flight at any
127  // given time, so we can't just pick out a fixed index into
128  // calld->on_complete like we can for the other ops. However,
129  // cancellation isn't in the fast path, so we just allocate a new
130  // closure for each one.
132  static_cast<callback_state*>(gpr_malloc(sizeof(*state)));
133  intercept_callback(calld, state, true, "on_complete (cancel_stream)",
134  &batch->on_complete);
135  } else if (batch->on_complete != nullptr) {
137  intercept_callback(calld, state, false, "on_complete", &batch->on_complete);
138  }
140  chand->transport, TRANSPORT_STREAM_FROM_CALL_DATA(calld), batch);
141  GRPC_CALL_COMBINER_STOP(calld->call_combiner, "passed batch to transport");
142 }
143 
146  channel_data* chand = static_cast<channel_data*>(elem->channel_data);
147  grpc_transport_perform_op(chand->transport, op);
148 }
149 
150 /* Constructor for call_data */
153  call_data* calld = static_cast<call_data*>(elem->call_data);
154  channel_data* chand = static_cast<channel_data*>(elem->channel_data);
155  calld->call_combiner = args->call_combiner;
157  chand->transport, TRANSPORT_STREAM_FROM_CALL_DATA(calld),
158  &args->call_stack->refcount, args->server_transport_data, args->arena);
159  return r == 0 ? GRPC_ERROR_NONE
161  "transport stream initialization failed");
162 }
163 
165  grpc_polling_entity* pollent) {
166  call_data* calld = static_cast<call_data*>(elem->call_data);
167  channel_data* chand = static_cast<channel_data*>(elem->channel_data);
168  grpc_transport_set_pops(chand->transport,
169  TRANSPORT_STREAM_FROM_CALL_DATA(calld), pollent);
170 }
171 
172 /* Destructor for call_data */
174  grpc_call_element* elem, const grpc_call_final_info* /*final_info*/,
175  grpc_closure* then_schedule_closure) {
176  call_data* calld = static_cast<call_data*>(elem->call_data);
177  channel_data* chand = static_cast<channel_data*>(elem->channel_data);
178  grpc_transport_destroy_stream(chand->transport,
180  then_schedule_closure);
181 }
182 
183 /* Constructor for channel_data */
186  channel_data* cd = static_cast<channel_data*>(elem->channel_data);
187  GPR_ASSERT(args->is_last);
188  cd->transport = grpc_channel_args_find_pointer<grpc_transport>(
189  args->channel_args, GRPC_ARG_TRANSPORT);
190  return GRPC_ERROR_NONE;
191 }
192 
193 /* Destructor for channel_data */
195  channel_data* cd = static_cast<channel_data*>(elem->channel_data);
196  if (cd->transport) {
197  grpc_transport_destroy(cd->transport);
198  }
199 }
200 
201 /* No-op. */
203  grpc_channel_element* /*elem*/, const grpc_channel_info* /*channel_info*/) {
204 }
205 
208  nullptr,
210  sizeof(call_data),
214  sizeof(channel_data),
216  [](grpc_channel_stack* channel_stack, grpc_channel_element* elem) {
217  /* HACK(ctiller): increase call stack size for the channel to make space
218  for channel data. We need a cleaner (but performant) way to do this,
219  and I'm not sure what that is yet.
220  This is only "safe" because call stacks place no additional data after
221  the last call element, and the last call element MUST be the connected
222  channel. */
223  channel_stack->call_stack_size += grpc_transport_stream_size(
224  static_cast<channel_data*>(elem->channel_data)->transport);
225  },
228  "connected",
229 };
230 
232  grpc_transport* t = builder->transport();
233  GPR_ASSERT(t != nullptr);
234  builder->AppendFilter(&grpc_connected_filter);
235  return true;
236 }
237 
239  call_data* calld = static_cast<call_data*>(elem->call_data);
240  return TRANSPORT_STREAM_FROM_CALL_DATA(calld);
241 }
GRPC_CLOSURE_INIT
#define GRPC_CLOSURE_INIT(closure, cb, cb_arg, scheduler)
Definition: closure.h:115
get_state_for_batch
static callback_state * get_state_for_batch(call_data *calld, grpc_transport_stream_op_batch *batch)
Definition: connected_channel.cc:81
connected_channel_init_channel_elem
static grpc_error_handle connected_channel_init_channel_elem(grpc_channel_element *elem, grpc_channel_element_args *args)
Definition: connected_channel.cc:184
grpc_core::CallCombiner
Definition: call_combiner.h:50
GRPC_ERROR_NONE
#define GRPC_ERROR_NONE
Definition: error.h:234
log.h
set_pollset_or_pollset_set
static void set_pollset_or_pollset_set(grpc_call_element *elem, grpc_polling_entity *pollent)
Definition: connected_channel.cc:164
connected_channel_call_data::recv_initial_metadata_ready
callback_state recv_initial_metadata_ready
Definition: connected_channel.cc:53
connected_channel_call_data::on_complete
callback_state on_complete[6]
Definition: connected_channel.cc:52
grpc_channel_stack
Definition: channel_stack.h:202
connected_channel_get_channel_info
static void connected_channel_get_channel_info(grpc_channel_element *, const grpc_channel_info *)
Definition: connected_channel.cc:202
polling_entity.h
grpc_transport_stream_op_batch::recv_message
bool recv_message
Definition: transport.h:322
connected_channel_destroy_channel_elem
static void connected_channel_destroy_channel_elem(grpc_channel_element *elem)
Definition: connected_channel.cc:194
call_data
struct connected_channel_call_data call_data
grpc_transport_stream_op_batch::on_complete
grpc_closure * on_complete
Definition: transport.h:304
gpr_free
GPRAPI void gpr_free(void *ptr)
Definition: alloc.cc:51
grpc_transport_perform_op
void grpc_transport_perform_op(grpc_transport *transport, grpc_transport_op *op)
Definition: transport.cc:114
grpc_channel_element
Definition: channel_stack.h:186
elem
Timer elem
Definition: event_engine/iomgr_event_engine/timer_heap_test.cc:109
error
grpc_error_handle error
Definition: retry_filter.cc:499
connected_channel_init_call_elem
static grpc_error_handle connected_channel_init_call_elem(grpc_call_element *elem, const grpc_call_element_args *args)
Definition: connected_channel.cc:151
gpr_malloc
GPRAPI void * gpr_malloc(size_t size)
Definition: alloc.cc:29
closure.h
intercept_callback
static void intercept_callback(call_data *calld, callback_state *state, bool free_when_done, const char *reason, grpc_closure **original_closure)
Definition: connected_channel.cc:69
grpc_call_element
Definition: channel_stack.h:194
callback_state::call_combiner
grpc_core::CallCombiner * call_combiner
Definition: connected_channel.cc:46
grpc_transport_perform_stream_op
void grpc_transport_perform_stream_op(grpc_transport *transport, grpc_stream *stream, grpc_transport_stream_op_batch *op)
Definition: transport.cc:108
connected_channel_start_transport_op
static void connected_channel_start_transport_op(grpc_channel_element *elem, grpc_transport_op *op)
Definition: connected_channel.cc:144
grpc_transport_op
Definition: transport.h:452
alloc.h
grpc_core::ChannelStackBuilder
Definition: channel_stack_builder.h:41
grpc_connected_channel_get_stream
grpc_stream * grpc_connected_channel_get_stream(grpc_call_element *elem)
Definition: connected_channel.cc:238
grpc_types.h
callback_state::reason
const char * reason
Definition: connected_channel.cc:47
grpc_transport_stream_op_batch::cancel_stream
bool cancel_stream
Definition: transport.h:329
grpc_connected_filter
const grpc_channel_filter grpc_connected_filter
Definition: connected_channel.cc:206
profile_analyzer.builder
builder
Definition: profile_analyzer.py:159
grpc_add_connected_filter
bool grpc_add_connected_filter(grpc_core::ChannelStackBuilder *builder)
Definition: connected_channel.cc:231
asyncio_get_stats.args
args
Definition: asyncio_get_stats.py:40
callback_state
Definition: connected_channel.cc:43
GPR_ASSERT
#define GPR_ASSERT(x)
Definition: include/grpc/impl/codegen/log.h:94
callback_state::closure
grpc_closure closure
Definition: connected_channel.cc:44
connected_channel_call_data::call_combiner
grpc_core::CallCombiner * call_combiner
Definition: connected_channel.cc:50
channel_stack.h
connected_channel_call_data
Definition: connected_channel.cc:49
grpc_transport_destroy
void grpc_transport_destroy(grpc_transport *transport)
Definition: transport.cc:96
connected_channel_channel_data
Definition: connected_channel.cc:39
call_combiner.h
GRPC_CALL_COMBINER_STOP
#define GRPC_CALL_COMBINER_STOP(call_combiner, reason)
Definition: call_combiner.h:58
run_in_call_combiner
static void run_in_call_combiner(void *arg, grpc_error_handle error)
Definition: connected_channel.cc:58
arg
Definition: cmdline.cc:40
grpc_transport_stream_op_batch::payload
grpc_transport_stream_op_batch_payload * payload
Definition: transport.h:307
connected_channel_call_data::recv_trailing_metadata_ready
callback_state recv_trailing_metadata_ready
Definition: connected_channel.cc:55
error.h
grpc_polling_entity
Definition: polling_entity.h:38
grpc_call_element_args
Definition: channel_stack.h:80
batch
grpc_transport_stream_op_batch * batch
Definition: retry_filter.cc:243
GPR_UNREACHABLE_CODE
#define GPR_UNREACHABLE_CODE(STATEMENT)
Definition: impl/codegen/port_platform.h:652
GRPC_CALL_COMBINER_START
#define GRPC_CALL_COMBINER_START(call_combiner, closure, error, reason)
Definition: call_combiner.h:56
run_cancel_in_call_combiner
static void run_cancel_in_call_combiner(void *arg, grpc_error_handle error)
Definition: connected_channel.cc:64
transport_fwd.h
grpc_channel_filter
Definition: channel_stack.h:111
GRPC_ERROR_CREATE_FROM_STATIC_STRING
#define GRPC_ERROR_CREATE_FROM_STATIC_STRING(desc)
Definition: error.h:291
grpc_transport_stream_op_batch::recv_initial_metadata
bool recv_initial_metadata
Definition: transport.h:319
grpc_transport_stream_op_batch::send_trailing_metadata
bool send_trailing_metadata
Definition: transport.h:313
GRPC_ERROR_REF
#define GRPC_ERROR_REF(err)
Definition: error.h:261
grpc_transport_stream_size
size_t grpc_transport_stream_size(grpc_transport *transport)
Definition: transport.cc:92
grpc_transport_stream_op_batch::send_message
bool send_message
Definition: transport.h:316
connected_channel_channel_data::transport
grpc_transport * transport
Definition: connected_channel.cc:40
channel_data
struct connected_channel_channel_data channel_data
alloc.h
fix_build_deps.r
r
Definition: fix_build_deps.py:491
grpc_transport_stream_op_batch_payload::recv_initial_metadata
grpc_metadata_batch * recv_initial_metadata
Definition: transport.h:390
grpc_transport_stream_op_batch::send_initial_metadata
bool send_initial_metadata
Definition: transport.h:310
arg
struct arg arg
state
Definition: bloaty/third_party/zlib/contrib/blast/blast.c:41
grpc_transport_stream_op_batch_payload::recv_message
absl::optional< grpc_core::SliceBuffer > * recv_message
Definition: transport.h:416
grpc_transport
Definition: transport_impl.h:89
transport.h
grpc_stream
struct grpc_stream grpc_stream
Definition: transport.h:174
channel_args.h
connected_channel_destroy_call_elem
static void connected_channel_destroy_call_elem(grpc_call_element *elem, const grpc_call_final_info *, grpc_closure *then_schedule_closure)
Definition: connected_channel.cc:173
grpc_transport_stream_op_batch::recv_trailing_metadata
bool recv_trailing_metadata
Definition: transport.h:326
grpc_channel_element_args
Definition: channel_stack.h:74
grpc_transport_destroy_stream
void grpc_transport_destroy_stream(grpc_transport *transport, grpc_stream *stream, grpc_closure *then_schedule_closure)
Definition: transport.cc:134
callback_state::original_closure
grpc_closure * original_closure
Definition: connected_channel.cc:45
connected_channel.h
grpc_call_final_info
Definition: channel_stack.h:95
grpc_transport_stream_op_batch_payload::recv_trailing_metadata
grpc_metadata_batch * recv_trailing_metadata
Definition: transport.h:425
TRANSPORT_STREAM_FROM_CALL_DATA
#define TRANSPORT_STREAM_FROM_CALL_DATA(calld)
Definition: connected_channel.cc:95
connected_channel_start_transport_stream_op_batch
static void connected_channel_start_transport_stream_op_batch(grpc_call_element *elem, grpc_transport_stream_op_batch *batch)
Definition: connected_channel.cc:104
grpc_transport_set_pops
void grpc_transport_set_pops(grpc_transport *transport, grpc_stream *stream, grpc_polling_entity *pollent)
Definition: transport.cc:119
grpc_error
Definition: error_internal.h:42
grpc_channel_info
Definition: grpc_types.h:720
grpc_transport_stream_op_batch
Definition: transport.h:284
grpc_closure
Definition: closure.h:56
op
static grpc_op * op
Definition: test/core/fling/client.cc:47
connected_channel_call_data::recv_message_ready
callback_state recv_message_ready
Definition: connected_channel.cc:54
grpc_transport_init_stream
int grpc_transport_init_stream(grpc_transport *transport, grpc_stream *stream, grpc_stream_refcount *refcount, const void *server_data, grpc_core::Arena *arena)
Definition: transport.cc:100
GRPC_ARG_TRANSPORT
#define GRPC_ARG_TRANSPORT
Definition: transport.h:71
port_platform.h


grpc
Author(s):
autogenerated on Fri May 16 2025 02:58:01