Go to the documentation of this file.
31 #include "absl/strings/match.h"
32 #include "absl/strings/str_cat.h"
33 #include "absl/strings/str_format.h"
34 #include "absl/strings/string_view.h"
35 #include "absl/types/optional.h"
66 #define GRPC_HEADER_SIZE_IN_BYTES 5
67 #define GRPC_FLUSH_READ_SIZE 4096
70 #define CRONET_LOG(...) \
72 if (grpc_cronet_trace.enabled()) gpr_log(__VA_ARGS__); \
232 #define GRPC_CRONET_STREAM_REF(stream, reason) \
233 grpc_cronet_stream_ref((stream), (reason))
234 #define GRPC_CRONET_STREAM_UNREF(stream, reason) \
235 grpc_cronet_stream_unref((stream), (reason))
243 #define GRPC_CRONET_STREAM_REF(stream, reason) grpc_cronet_stream_ref((stream))
244 #define GRPC_CRONET_STREAM_UNREF(stream, reason) \
245 grpc_cronet_stream_unref((stream))
258 return "ACTION_TAKEN_WITH_CALLBACK";
260 return "ACTION_TAKEN_NO_CALLBACK";
262 return "NO_ACTION_POSSIBLE";
270 return "OP_SEND_INITIAL_METADATA";
272 return "OP_SEND_MESSAGE";
274 return "OP_SEND_TRAILING_METADATA";
276 return "OP_RECV_MESSAGE";
278 return "OP_RECV_INITIAL_METADATA";
280 return "OP_RECV_TRAILING_METADATA";
282 return "OP_CANCEL_ERROR";
284 return "OP_ON_COMPLETE";
288 return "OP_SUCCEEDED";
290 return "OP_CANCELED";
292 return "OP_RECV_MESSAGE_AND_ON_COMPLETE";
294 return "OP_READ_REQ_MADE";
302 if (s->state.rs.read_buffer &&
303 s->state.rs.read_buffer != s->state.rs.grpc_header_bytes) {
306 s->state.rs.read_buffer =
nullptr;
310 s->state.rs.read_buffer = s->state.rs.grpc_header_bytes;
312 s->state.rs.received_bytes = 0;
313 s->state.rs.compressed =
false;
316 s->state.rs.remaining_bytes);
320 int cronet_internal_error_code,
323 "Cronet error code:%d, Cronet error detail:%s",
324 cronet_internal_error_code,
desc)),
345 if (
op->send_message) {
346 s->state.pending_send_message =
true;
348 if (
op->recv_trailing_metadata) {
349 s->state.pending_recv_trailing_metadata =
true;
373 if (curr->
next == oas) {
419 for (
size_t i = 0;
i < header_array->
count;
i++) {
432 gpr_log(GPR_DEBUG,
"Failed to parse metadata: %s",
433 absl::StrCat(
"key=", header_array->headers[i].key,
435 " value=", value.as_string_view())
534 if (t->use_packet_coalescing) {
550 const char* negotiated_protocol) {
554 headers, negotiated_protocol);
559 for (
size_t i = 0;
i < headers->
count;
i++) {
560 if (0 == strcmp(
"grpc-status", headers->
headers[
i].
key)) {
619 }
else if (
count > 0) {
656 if (trailers->
count > 0) {
668 if (t->use_packet_coalescing) {
686 char** pp_write_buffer,
705 for (
size_t i = 0;
i < write_slice_buffer->
count; ++
i) {
713 class CronetMetadataEncoder {
716 size_t* p_count,
const char* host,
722 headers_(*pp_headers),
730 CronetMetadataEncoder(
const CronetMetadataEncoder&) =
delete;
731 CronetMetadataEncoder& operator=(
const CronetMetadataEncoder&) =
delete;
733 template <
class T,
class V>
734 void Encode(
T,
const V&
value) {
790 const char** method_;
802 CronetMetadataEncoder encoder(pp_headers, p_num_headers, host,
811 *compressed = ((
c & 0x01) == 0x01);
830 struct op_state* stream_state = &s->state;
838 if (is_canceled_or_failed) {
890 }
else if (!stream_state
894 }
else if (!stream_state
905 }
else if (!stream_state
914 }
else if (!stream_state
928 }
else if (!stream_state
940 }
else if (!stream_state
950 !(t->use_packet_coalescing &&
1035 struct op_state* stream_state = &s->state;
1047 if (t->use_packet_coalescing) {
1052 const char*
method =
"POST";
1053 s->header_array.headers =
nullptr;
1056 t->host, &
url, &s->header_array.headers, &s->header_array.count,
1058 s->header_array.capacity = s->header_array.count;
1063 unsigned int header_index;
1064 for (header_index = 0; header_index < s->header_array.count;
1066 gpr_free(
const_cast<char*
>(s->header_array.headers[header_index].key));
1067 gpr_free(
const_cast<char*
>(s->header_array.headers[header_index].value));
1070 if (t->use_packet_coalescing) {
1072 s->state.flush_cronet_when_ready =
true;
1086 size_t write_buffer_size;
1091 if (write_buffer_size > 0) {
1096 static_cast<int>(write_buffer_size),
false);
1097 if (t->use_packet_coalescing) {
1129 if (t->use_packet_coalescing) {
1342 const char* error_message =
1418 if (
op->send_initial_metadata &&
1420 op->payload->send_initial_metadata.send_initial_metadata)) {
1423 if (
op->recv_initial_metadata) {
1426 op->payload->recv_initial_metadata.recv_initial_metadata_ready,
1429 if (
op->recv_message) {
1431 op->payload->recv_message.recv_message_ready,
1434 if (
op->recv_trailing_metadata) {
1437 op->payload->recv_trailing_metadata.recv_trailing_metadata_ready,
1494 for (
size_t i = 0;
i <
args->num_args;
i++) {
#define GRPC_CRONET_STREAM_UNREF(stream, reason)
static void on_failed(bidirectional_stream *, int)
size_t grpc_chttp2_base64_infer_length_after_decode(const grpc_slice &slice)
static const grpc_transport_vtable grpc_cronet_vtable
GPRAPI void gpr_mu_unlock(gpr_mu *mu)
static void set_pollset_set_do_nothing(grpc_transport *, grpc_stream *, grpc_pollset_set *)
std::string StrCat(const AlphaNum &a, const AlphaNum &b)
ABSL_MUST_USE_RESULT std::string StrFormat(const FormatSpec< Args... > &format, const Args &... args)
void bidirectional_stream_cancel(bidirectional_stream *)
static grpc_endpoint * get_endpoint(grpc_transport *)
grpc_closure * on_complete
static void create_grpc_frame(grpc_slice_buffer *write_slice_buffer, char **pp_write_buffer, size_t *p_write_buffer_size, uint32_t flags)
struct grpc_pollset_set grpc_pollset_set
static Slice FromStaticString(const char *s)
bool state_op_done[OP_NUM_OPS]
GPRAPI void gpr_free(void *ptr)
#define GRPC_SLICE_MALLOC(len)
static const char * op_id_string(enum e_op_id i)
static void destroy_stream(grpc_transport *, grpc_stream *gs, grpc_closure *then_schedule_closure)
bool use_packet_coalescing
static int init_stream(grpc_transport *gt, grpc_stream *gs, grpc_stream_refcount *refcount, const void *, grpc_core::Arena *arena)
GPRAPI void * gpr_malloc(size_t size)
static bidirectional_stream_callback cronet_callbacks
grpc_metadata_batch * send_initial_metadata
grpc_core::TraceFlag grpc_cronet_trace(false, "cronet")
grpc_transport_stream_op_batch op
#define GRPC_ERROR_CANCELLED
static void on_write_completed(bidirectional_stream *, const char *)
static void add_to_storage(struct stream_obj *s, grpc_transport_stream_op_batch *op)
static void parse_grpc_header(const uint8_t *data, int *length, bool *compressed)
cronet_net_error_code net_error
@ OP_RECV_INITIAL_METADATA
grpc_transport * grpc_create_cronet_transport(void *engine, const char *target, const grpc_channel_args *args, void *)
static void read_grpc_header(stream_obj *s)
static char write_buffer[WRITE_BUFFER_SIZE]
static void on_succeeded(bidirectional_stream *)
#define T(upbtypeconst, upbtype, ctype, default_value)
grpc_core::ScopedArenaPtr arena
read_state(grpc_core::Arena *arena)
@ OP_SEND_INITIAL_METADATA
static bool op_can_be_run(grpc_transport_stream_op_batch *curr_op, struct stream_obj *s, struct op_state *op_state, enum e_op_id op_id)
@ OP_RECV_TRAILING_METADATA
bool trailing_metadata_valid
int bidirectional_stream_start(bidirectional_stream *, const char *, int, const char *, const bidirectional_stream_header_array *, bool)
memcpy(mem, inblock.get(), min(CONTAINING_RECORD(inblock.get(), MEMBLOCK, data) ->size, size))
void grpc_cronet_stream_unref(stream_obj *s, const char *reason)
char grpc_header_bytes[GRPC_HEADER_SIZE_IN_BYTES]
static void on_stream_ready(bidirectional_stream *)
bidirectional_stream_header_array header_array
void Clear()
Removes and unrefs all slices in the SliceBuffer.
constexpr absl::remove_reference_t< T > && move(T &&t) noexcept
static void convert_cronet_array_to_metadata(const bidirectional_stream_header_array *header_array, grpc_metadata_batch *mds)
grpc_slice_buffer * c_slice_buffer()
Return a pointer to the back raw grpc_slice_buffer.
static void perform_op(grpc_transport *, grpc_transport_op *)
GPRAPI void gpr_log(const char *file, int line, gpr_log_severity severity, const char *format,...) GPR_PRINT_FORMAT_CHECK(4
GPRAPI void gpr_mu_init(gpr_mu *mu)
static enum e_op_result execute_stream_op(struct op_and_state *oas)
static void destroy_transport(grpc_transport *)
const grpc_slice & c_slice() const
static const char * op_result_string(enum e_op_result i)
void bidirectional_stream_disable_auto_flush(bidirectional_stream *, bool)
@ OP_RECV_MESSAGE_AND_ON_COMPLETE
grpc_error_handle cancel_error
#define GRPC_SLICE_START_PTR(slice)
static void on_response_headers_received(bidirectional_stream *, const bidirectional_stream_header_array *, const char *)
GPRAPI grpc_slice grpc_slice_from_static_string(const char *source)
grpc_transport_stream_op_batch_payload * payload
const char * cronet_net_error_as_string(cronet_net_error_code net_error)
void grpc_stream_ref(grpc_stream_refcount *refcount, const char *reason)
GPRAPI void gpr_mu_lock(gpr_mu *mu)
grpc_error_handle cancel_error
#define GRPC_ARG_USE_CRONET_PACKET_COALESCING
static void execute_from_storage(stream_obj *s)
struct op_and_state * oas
struct op_and_state * next
bidirectional_stream * bidirectional_stream_create(stream_engine *, void *, bidirectional_stream_callback *)
static void on_canceled(bidirectional_stream *)
struct op_storage storage
@ OP_SEND_TRAILING_METADATA
static void on_read_completed(bidirectional_stream *, char *, int)
static grpc_error_handle make_error_with_desc(int error_code, int cronet_internal_error_code, const char *desc)
static void remove_from_storage(struct stream_obj *s, struct op_and_state *oas)
void bidirectional_stream_delay_request_headers_until_flush(bidirectional_stream *, bool)
#define GRPC_SLICE_LENGTH(slice)
GPRAPI char * grpc_slice_to_c_string(grpc_slice s)
bidirectional_stream * cbs
bool recv_initial_metadata
int bidirectional_stream_destroy(bidirectional_stream *)
#define GRPC_HEADER_SIZE_IN_BYTES
grpc_stream_refcount * refcount
bool pending_write_for_trailer
grpc_metadata_batch initial_metadata
bool send_trailing_metadata
#define GRPC_ERROR_REF(err)
#define GRPC_WRITE_INTERNAL_COMPRESS
#define GRPC_CRONET_STREAM_REF(stream, reason)
grpc_slice grpc_chttp2_base64_encode(const grpc_slice &input)
grpc_error_handle grpc_error_set_int(grpc_error_handle src, grpc_error_ints which, intptr_t value)
struct bidirectional_stream_header_array bidirectional_stream_header_array
static void perform_stream_op(grpc_transport *, grpc_stream *gs, grpc_transport_stream_op_batch *op)
static void on_response_trailers_received(bidirectional_stream *, const bidirectional_stream_header_array *)
static bool header_has_authority(const grpc_metadata_batch *b)
grpc_core::ExecCtx exec_ctx
const grpc_transport_vtable * vtable
grpc_metadata_batch trailing_metadata
int bidirectional_stream_write(bidirectional_stream *, const char *, int, bool)
#define GRPC_ERROR_CREATE_FROM_CPP_STRING(desc)
grpc_transport_stream_op_batch * curr_op
AllocList * next[kMaxLevel]
grpc_slice grpc_chttp2_base64_decode_with_length(const grpc_slice &input, size_t output_length)
grpc_core::SliceBuffer * send_message
op_state(grpc_core::Arena *arena)
grpc_metadata_batch * recv_initial_metadata
grpc_status_code cronet_net_error_to_grpc_error(cronet_net_error_code net_error)
struct op_and_state * head
int bidirectional_stream_read(bidirectional_stream *, char *, int)
bool send_initial_metadata
absl::optional< grpc_core::SliceBuffer > * recv_message
void bidirectional_stream_flush(bidirectional_stream *)
static void null_and_maybe_free_read_buffer(stream_obj *s)
#define GRPC_ERROR_UNREF(err)
static void Run(const DebugLocation &location, grpc_closure *closure, grpc_error_handle error)
bool pending_recv_trailing_metadata
bool state_callback_received[OP_NUM_OPS]
struct grpc_stream grpc_stream
bool recv_trailing_metadata
void grpc_cronet_stream_ref(stream_obj *s, const char *reason)
grpc_metadata_batch * recv_trailing_metadata
@ ACTION_TAKEN_NO_CALLBACK
grpc_core::SliceBuffer read_slice_buffer
bool flush_cronet_when_ready
bool length_field_received
op_and_state(stream_obj *s, const grpc_transport_stream_op_batch &op)
bool pending_send_message
@ ACTION_TAKEN_WITH_CALLBACK
static void convert_metadata_to_cronet_headers(grpc_metadata_batch *metadata, const char *host, std::string *pp_url, bidirectional_stream_header **pp_headers, size_t *p_num_headers, const char **method)
void grpc_stream_unref(grpc_stream_refcount *refcount, const char *reason)
static ABSL_INTERNAL_ATOMIC_HOOK_ATTRIBUTES absl::base_internal::AtomicHook< StatusPayloadPrinter > storage
bool EndsWith(absl::string_view text, absl::string_view suffix) noexcept
@ GRPC_ERROR_INT_GRPC_STATUS
grpc status code representing this error
void grpc_slice_unref_internal(const grpc_slice &slice)
static void set_pollset_do_nothing(grpc_transport *, grpc_stream *, grpc_pollset *)
stream_obj(grpc_transport *gt, grpc_stream *gs, grpc_stream_refcount *refcount, grpc_core::Arena *arena)
struct grpc_transport_stream_op_batch_payload::@46 cancel_stream
grpc_cronet_transport * curr_ct
#define GRPC_FLUSH_READ_SIZE
#define GRPC_ERROR_IS_NONE(err)
grpc
Author(s):
autogenerated on Fri May 16 2025 02:58:06