cronet_transport.cc
Go to the documentation of this file.
1 /*
2  *
3  * Copyright 2016 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
20 
22 
23 #include <stdint.h>
24 #include <stdlib.h>
25 #include <string.h>
26 
27 #include <new>
28 #include <string>
29 #include <utility>
30 
31 #include "absl/strings/match.h"
32 #include "absl/strings/str_cat.h"
33 #include "absl/strings/str_format.h"
34 #include "absl/strings/string_view.h"
35 #include "absl/types/optional.h"
37 
38 #include <grpc/slice.h>
39 #include <grpc/status.h>
40 #include <grpc/support/alloc.h>
41 #include <grpc/support/log.h>
42 #include <grpc/support/sync.h>
43 
63 
64 // IWYU pragma: no_include <type_traits>
65 
66 #define GRPC_HEADER_SIZE_IN_BYTES 5
67 #define GRPC_FLUSH_READ_SIZE 4096
68 
70 #define CRONET_LOG(...) \
71  do { \
72  if (grpc_cronet_trace.enabled()) gpr_log(__VA_ARGS__); \
73  } while (0)
74 
79 };
80 
81 enum e_op_id {
96 };
97 
98 /* Cronet callbacks. See cronet_c_for_grpc.h for documentation for each. */
99 
101 static void on_response_headers_received(
103  const char*);
104 static void on_write_completed(bidirectional_stream*, const char*);
105 static void on_read_completed(bidirectional_stream*, char*, int);
108 static void on_succeeded(bidirectional_stream*);
109 static void on_failed(bidirectional_stream*, int);
110 static void on_canceled(bidirectional_stream*);
117  on_succeeded,
118  on_failed,
119  on_canceled};
120 
121 /* Cronet transport object */
123  grpc_transport base; /* must be first element in this structure */
125  char* host;
127 };
129 
130 /* TODO (makdharma): reorder structure for memory efficiency per
131  http://www.catb.org/esr/structure-packing/#_structure_reordering: */
132 struct read_state {
135 
136  /* vars to store data coming from server */
137  char* read_buffer = nullptr;
138  bool length_field_received = false;
139  int received_bytes = 0;
141  int length_field = 0;
142  bool compressed = false;
144  char* payload_field = nullptr;
145  bool read_stream_closed = false;
146 
147  /* vars for holding data destined for the application */
149 
150  /* vars for trailing metadata */
153 
154  /* vars for initial metadata */
156 };
157 
158 struct write_state {
159  char* write_buffer = nullptr;
160 };
161 
162 /* track state of one stream op */
163 struct op_state {
165 
168  /* A non-zero gRPC status code has been seen */
169  bool fail_state = false;
170  /* Transport is discarding all buffered messages */
171  bool flush_read = false;
174  bool pending_send_message = false;
175  /* User requested RECV_TRAILING_METADATA */
179  /* data structure for storing data coming from server */
180  struct read_state rs;
181  /* data structure for storing data going to the server */
182  struct write_state ws;
183 };
184 
185 struct stream_obj;
186 
187 struct op_and_state {
189 
191  struct op_state state;
192  bool done = false;
193  struct stream_obj* s; /* Pointer back to the stream object */
194  /* next op_and_state in the linked list */
195  struct op_and_state* next = nullptr;
196 };
197 
198 struct op_storage {
200  struct op_and_state* head = nullptr;
201 };
202 
203 struct stream_obj {
206  ~stream_obj();
207 
209  struct op_and_state* oas = nullptr;
215  bidirectional_stream_header_array(); // Zero-initialize the structure.
216 
217  /* Stream level state. Some state will be tracked both at stream and stream_op
218  * level */
219  struct op_state state;
220 
221  /* OP storage */
223 
224  /* Mutex to protect storage */
226 
227  /* Refcount object of the stream */
229 };
230 
231 #ifndef NDEBUG
232 #define GRPC_CRONET_STREAM_REF(stream, reason) \
233  grpc_cronet_stream_ref((stream), (reason))
234 #define GRPC_CRONET_STREAM_UNREF(stream, reason) \
235  grpc_cronet_stream_unref((stream), (reason))
236 void grpc_cronet_stream_ref(stream_obj* s, const char* reason) {
237  grpc_stream_ref(s->refcount, reason);
238 }
239 void grpc_cronet_stream_unref(stream_obj* s, const char* reason) {
240  grpc_stream_unref(s->refcount, reason);
241 }
242 #else
243 #define GRPC_CRONET_STREAM_REF(stream, reason) grpc_cronet_stream_ref((stream))
244 #define GRPC_CRONET_STREAM_UNREF(stream, reason) \
245  grpc_cronet_stream_unref((stream))
246 void grpc_cronet_stream_ref(stream_obj* s) { grpc_stream_ref(s->refcount); }
248 #endif
249 
250 static enum e_op_result execute_stream_op(struct op_and_state* oas);
251 
252 /*
253  Utility function to translate enum into string for printing
254 */
255 static const char* op_result_string(enum e_op_result i) {
256  switch (i) {
258  return "ACTION_TAKEN_WITH_CALLBACK";
260  return "ACTION_TAKEN_NO_CALLBACK";
261  case NO_ACTION_POSSIBLE:
262  return "NO_ACTION_POSSIBLE";
263  }
264  GPR_UNREACHABLE_CODE(return "UNKNOWN");
265 }
266 
267 static const char* op_id_string(enum e_op_id i) {
268  switch (i) {
270  return "OP_SEND_INITIAL_METADATA";
271  case OP_SEND_MESSAGE:
272  return "OP_SEND_MESSAGE";
274  return "OP_SEND_TRAILING_METADATA";
275  case OP_RECV_MESSAGE:
276  return "OP_RECV_MESSAGE";
278  return "OP_RECV_INITIAL_METADATA";
280  return "OP_RECV_TRAILING_METADATA";
281  case OP_CANCEL_ERROR:
282  return "OP_CANCEL_ERROR";
283  case OP_ON_COMPLETE:
284  return "OP_ON_COMPLETE";
285  case OP_FAILED:
286  return "OP_FAILED";
287  case OP_SUCCEEDED:
288  return "OP_SUCCEEDED";
289  case OP_CANCELED:
290  return "OP_CANCELED";
292  return "OP_RECV_MESSAGE_AND_ON_COMPLETE";
293  case OP_READ_REQ_MADE:
294  return "OP_READ_REQ_MADE";
295  case OP_NUM_OPS:
296  return "OP_NUM_OPS";
297  }
298  return "UNKNOWN";
299 }
300 
302  if (s->state.rs.read_buffer &&
303  s->state.rs.read_buffer != s->state.rs.grpc_header_bytes) {
304  gpr_free(s->state.rs.read_buffer);
305  }
306  s->state.rs.read_buffer = nullptr;
307 }
308 
309 static void read_grpc_header(stream_obj* s) {
310  s->state.rs.read_buffer = s->state.rs.grpc_header_bytes;
311  s->state.rs.remaining_bytes = GRPC_HEADER_SIZE_IN_BYTES;
312  s->state.rs.received_bytes = 0;
313  s->state.rs.compressed = false;
314  CRONET_LOG(GPR_DEBUG, "bidirectional_stream_read(%p)", s->cbs);
315  bidirectional_stream_read(s->cbs, s->state.rs.read_buffer,
316  s->state.rs.remaining_bytes);
317 }
318 
320  int cronet_internal_error_code,
321  const char* desc) {
323  "Cronet error code:%d, Cronet error detail:%s",
324  cronet_internal_error_code, desc)),
325  GRPC_ERROR_INT_GRPC_STATUS, error_code);
326 }
327 
330  : op(op), state(s->arena), s(s) {}
331 
332 /*
333  Add a new stream op to op storage.
334 */
335 static void add_to_storage(struct stream_obj* s,
337  struct op_storage* storage = &s->storage;
338  /* add new op at the beginning of the linked list. The memory is freed
339  in remove_from_storage */
340  op_and_state* new_op = new op_and_state(s, *op);
341  gpr_mu_lock(&s->mu);
342  new_op->next = storage->head;
343  storage->head = new_op;
344  storage->num_pending_ops++;
345  if (op->send_message) {
346  s->state.pending_send_message = true;
347  }
348  if (op->recv_trailing_metadata) {
349  s->state.pending_recv_trailing_metadata = true;
350  }
351  CRONET_LOG(GPR_DEBUG, "adding new op %p. %d in the queue.", new_op,
352  storage->num_pending_ops);
353  gpr_mu_unlock(&s->mu);
354 }
355 
356 /*
357  Traverse the linked list and delete op and free memory
358 */
359 static void remove_from_storage(struct stream_obj* s,
360  struct op_and_state* oas) {
361  struct op_and_state* curr;
362  if (s->storage.head == nullptr || oas == nullptr) {
363  return;
364  }
365  if (s->storage.head == oas) {
366  s->storage.head = oas->next;
367  delete oas;
369  CRONET_LOG(GPR_DEBUG, "Freed %p. Now %d in the queue", oas,
371  } else {
372  for (curr = s->storage.head; curr != nullptr; curr = curr->next) {
373  if (curr->next == oas) {
374  curr->next = oas->next;
376  CRONET_LOG(GPR_DEBUG, "Freed %p. Now %d in the queue", oas,
378  delete oas;
379  break;
380  } else if (GPR_UNLIKELY(curr->next == nullptr)) {
381  CRONET_LOG(GPR_ERROR, "Reached end of LL and did not find op to free");
382  }
383  }
384  }
385 }
386 
387 /*
388  Cycle through ops and try to take next action. Break when either
389  an action with callback is taken, or no action is possible.
390  This can get executed from the Cronet network thread via cronet callback
391  or on the application supplied thread via the perform_stream_op function.
392 */
394  gpr_mu_lock(&s->mu);
395  for (struct op_and_state* curr = s->storage.head; curr != nullptr;) {
396  CRONET_LOG(GPR_DEBUG, "calling op at %p. done = %d", curr, curr->done);
397  GPR_ASSERT(!curr->done);
398  enum e_op_result result = execute_stream_op(curr);
399  CRONET_LOG(GPR_DEBUG, "execute_stream_op[%p] returns %s", curr,
401  /* if this op is done, then remove it and free memory */
402  if (curr->done) {
403  struct op_and_state* next = curr->next;
404  remove_from_storage(s, curr);
405  curr = next;
406  } else if (result == NO_ACTION_POSSIBLE) {
407  curr = curr->next;
408  } else if (result == ACTION_TAKEN_WITH_CALLBACK) {
409  /* wait for the callback */
410  break;
411  } /* continue processing the same op if ACTION_TAKEN_WITHOUT_CALLBACK */
412  }
413  gpr_mu_unlock(&s->mu);
414 }
415 
417  const bidirectional_stream_header_array* header_array,
418  grpc_metadata_batch* mds) {
419  for (size_t i = 0; i < header_array->count; i++) {
420  CRONET_LOG(GPR_DEBUG, "header key=%s, value=%s",
421  header_array->headers[i].key, header_array->headers[i].value);
423  if (absl::EndsWith(header_array->headers[i].key, "-bin")) {
427  } else {
429  }
430  mds->Append(header_array->headers[i].key, grpc_core::Slice(value),
432  gpr_log(GPR_DEBUG, "Failed to parse metadata: %s",
433  absl::StrCat("key=", header_array->headers[i].key,
434  " error=", error,
435  " value=", value.as_string_view())
436  .c_str());
437  });
438  }
439 }
440 
441 /*
442  Cronet callback
443 */
444 static void on_failed(bidirectional_stream* stream, int net_error) {
445  gpr_log(GPR_ERROR, "on_failed(%p, %d)", stream, net_error);
446  grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
448 
449  stream_obj* s = static_cast<stream_obj*>(stream->annotation);
450  gpr_mu_lock(&s->mu);
453  s->state.net_error = static_cast<cronet_net_error_code>(net_error);
454  s->cbs = nullptr;
455  if (s->header_array.headers) {
457  s->header_array.headers = nullptr;
458  }
459  if (s->state.ws.write_buffer) {
461  s->state.ws.write_buffer = nullptr;
462  }
464  gpr_mu_unlock(&s->mu);
466  GRPC_CRONET_STREAM_UNREF(s, "cronet transport");
467 }
468 
469 /*
470  Cronet callback
471 */
473  CRONET_LOG(GPR_DEBUG, "on_canceled(%p)", stream);
474  grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
476 
477  stream_obj* s = static_cast<stream_obj*>(stream->annotation);
478  gpr_mu_lock(&s->mu);
481  s->cbs = nullptr;
482  if (s->header_array.headers) {
484  s->header_array.headers = nullptr;
485  }
486  if (s->state.ws.write_buffer) {
488  s->state.ws.write_buffer = nullptr;
489  }
491  gpr_mu_unlock(&s->mu);
493  GRPC_CRONET_STREAM_UNREF(s, "cronet transport");
494 }
495 
496 /*
497  Cronet callback
498 */
500  CRONET_LOG(GPR_DEBUG, "on_succeeded(%p)", stream);
501  grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
503 
504  stream_obj* s = static_cast<stream_obj*>(stream->annotation);
505  gpr_mu_lock(&s->mu);
508  s->cbs = nullptr;
510  gpr_mu_unlock(&s->mu);
512  GRPC_CRONET_STREAM_UNREF(s, "cronet transport");
513 }
514 
515 /*
516  Cronet callback
517 */
519  CRONET_LOG(GPR_DEBUG, "W: on_stream_ready(%p)", stream);
520  grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
522  stream_obj* s = static_cast<stream_obj*>(stream->annotation);
524  gpr_mu_lock(&s->mu);
527  /* Free the memory allocated for headers */
528  if (s->header_array.headers) {
530  s->header_array.headers = nullptr;
531  }
532  /* Send the initial metadata on wire if there is no SEND_MESSAGE or
533  * SEND_TRAILING_METADATA ops pending */
534  if (t->use_packet_coalescing) {
536  CRONET_LOG(GPR_DEBUG, "cronet_bidirectional_stream_flush (%p)", s->cbs);
538  }
539  }
540  gpr_mu_unlock(&s->mu);
542 }
543 
544 /*
545  Cronet callback
546 */
549  const bidirectional_stream_header_array* headers,
550  const char* negotiated_protocol) {
551  grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
553  CRONET_LOG(GPR_DEBUG, "R: on_response_headers_received(%p, %p, %s)", stream,
554  headers, negotiated_protocol);
555  stream_obj* s = static_cast<stream_obj*>(stream->annotation);
556 
557  /* Identify if this is a header or a trailer (in a trailer-only response case)
558  */
559  for (size_t i = 0; i < headers->count; i++) {
560  if (0 == strcmp("grpc-status", headers->headers[i].key)) {
562 
563  /* Do an extra read for a trailer-only stream to trigger on_succeeded()
564  * callback */
566  return;
567  }
568  }
569 
570  gpr_mu_lock(&s->mu);
575  /* Do an extra read to trigger on_succeeded() callback in case connection
576  is closed */
579  }
580  gpr_mu_unlock(&s->mu);
582 }
583 
584 /*
585  Cronet callback
586 */
588  grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
590  stream_obj* s = static_cast<stream_obj*>(stream->annotation);
591  CRONET_LOG(GPR_DEBUG, "W: on_write_completed(%p, %s)", stream, data);
592  gpr_mu_lock(&s->mu);
593  if (s->state.ws.write_buffer) {
595  s->state.ws.write_buffer = nullptr;
596  }
598  gpr_mu_unlock(&s->mu);
600 }
601 
602 /*
603  Cronet callback
604 */
606  int count) {
607  grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
609  stream_obj* s = static_cast<stream_obj*>(stream->annotation);
610  CRONET_LOG(GPR_DEBUG, "R: on_read_completed(%p, %p, %d)", stream, data,
611  count);
612  gpr_mu_lock(&s->mu);
614  if (count > 0 && s->state.flush_read) {
615  CRONET_LOG(GPR_DEBUG, "bidirectional_stream_read(%p)", s->cbs);
618  gpr_mu_unlock(&s->mu);
619  } else if (count > 0) {
622  if (s->state.rs.remaining_bytes > 0) {
623  CRONET_LOG(GPR_DEBUG, "bidirectional_stream_read(%p)", s->cbs);
628  gpr_mu_unlock(&s->mu);
629  } else {
630  gpr_mu_unlock(&s->mu);
632  }
633  } else {
635  s->state.rs.read_stream_closed = true;
636  gpr_mu_unlock(&s->mu);
638  }
639 }
640 
641 /*
642  Cronet callback
643 */
646  const bidirectional_stream_header_array* trailers) {
647  grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
649  CRONET_LOG(GPR_DEBUG, "R: on_response_trailers_received(%p,%p)", stream,
650  trailers);
651  stream_obj* s = static_cast<stream_obj*>(stream->annotation);
653  gpr_mu_lock(&s->mu);
656  if (trailers->count > 0) {
658  }
660  /* Send a EOS when server terminates the stream (testServerFinishesRequest) to
661  * trigger on_succeeded */
665  CRONET_LOG(GPR_DEBUG, "bidirectional_stream_write (%p, 0)", s->cbs);
667  bidirectional_stream_write(s->cbs, "", 0, true);
668  if (t->use_packet_coalescing) {
669  CRONET_LOG(GPR_DEBUG, "bidirectional_stream_flush (%p)", s->cbs);
671  }
673 
674  gpr_mu_unlock(&s->mu);
675  } else {
676  gpr_mu_unlock(&s->mu);
678  }
679 }
680 
681 /*
682  Utility function that takes the data from s->write_slice_buffer and assembles
683  into a contiguous byte stream with 5 byte gRPC header prepended.
684 */
685 static void create_grpc_frame(grpc_slice_buffer* write_slice_buffer,
686  char** pp_write_buffer,
687  size_t* p_write_buffer_size, uint32_t flags) {
688  size_t length = write_slice_buffer->length;
689  *p_write_buffer_size = length + GRPC_HEADER_SIZE_IN_BYTES;
690  /* This is freed in the on_write_completed callback */
691  char* write_buffer =
692  static_cast<char*>(gpr_malloc(length + GRPC_HEADER_SIZE_IN_BYTES));
693  *pp_write_buffer = write_buffer;
694  uint8_t* p = reinterpret_cast<uint8_t*>(write_buffer);
695  /* Append 5 byte header */
696  /* Compressed flag */
697  *p++ = static_cast<uint8_t>((flags & GRPC_WRITE_INTERNAL_COMPRESS) ? 1 : 0);
698  /* Message length */
699  *p++ = static_cast<uint8_t>(length >> 24);
700  *p++ = static_cast<uint8_t>(length >> 16);
701  *p++ = static_cast<uint8_t>(length >> 8);
702  *p++ = static_cast<uint8_t>(length);
703  /* append actual data */
704  size_t offset = 0;
705  for (size_t i = 0; i < write_slice_buffer->count; ++i) {
706  memcpy(p + offset, GRPC_SLICE_START_PTR(write_slice_buffer->slices[i]),
707  GRPC_SLICE_LENGTH(write_slice_buffer->slices[i]));
708  offset += GRPC_SLICE_LENGTH(write_slice_buffer->slices[i]);
709  }
710 }
711 
712 namespace {
713 class CronetMetadataEncoder {
714  public:
715  explicit CronetMetadataEncoder(bidirectional_stream_header** pp_headers,
716  size_t* p_count, const char* host,
717  size_t capacity, const char** method,
718  std::string* url)
719  : host_(host),
721  count_(*p_count),
722  headers_(*pp_headers),
723  method_(method),
724  url_(url) {
725  count_ = 0;
726  headers_ = static_cast<bidirectional_stream_header*>(
728  }
729 
730  CronetMetadataEncoder(const CronetMetadataEncoder&) = delete;
731  CronetMetadataEncoder& operator=(const CronetMetadataEncoder&) = delete;
732 
733  template <class T, class V>
734  void Encode(T, const V& value) {
736  grpc_core::Slice(T::Encode(value)));
737  }
738 
739  void Encode(grpc_core::HttpSchemeMetadata,
741  /* Cronet populates these fields on its own */
742  }
745  /* Cronet populates these fields on its own */
746  }
747 
748  void Encode(grpc_core::HttpMethodMetadata,
750  switch (method) {
752  *method_ = "POST";
753  break;
757  abort();
758  }
759  }
760 
761  void Encode(grpc_core::HttpPathMetadata,
763  /* Create URL by appending :path value to the hostname */
764  *url_ = absl::StrCat("https://", host_, path.as_string_view());
765  }
766 
767  void Encode(const grpc_core::Slice& key_slice,
768  const grpc_core::Slice& value_slice) {
769  char* key = grpc_slice_to_c_string(key_slice.c_slice());
770  char* value;
771  if (grpc_is_binary_header_internal(key_slice.c_slice())) {
772  grpc_slice wire_value = grpc_chttp2_base64_encode(value_slice.c_slice());
773  value = grpc_slice_to_c_string(wire_value);
774  grpc_slice_unref_internal(wire_value);
775  } else {
776  value = grpc_slice_to_c_string(value_slice.c_slice());
777  }
778  CRONET_LOG(GPR_DEBUG, "header %s = %s", key, value);
780  headers_[count_].key = key;
781  headers_[count_].value = value;
782  ++count_;
783  }
784 
785  private:
786  const char* host_;
787  size_t capacity_;
788  size_t& count_;
789  bidirectional_stream_header*& headers_;
790  const char** method_;
791  std::string* url_;
792 };
793 } // namespace
794 
795 /*
796  Convert metadata in a format that Cronet can consume
797 */
799  grpc_metadata_batch* metadata, const char* host, std::string* pp_url,
800  bidirectional_stream_header** pp_headers, size_t* p_num_headers,
801  const char** method) {
802  CronetMetadataEncoder encoder(pp_headers, p_num_headers, host,
803  metadata->count(), method, pp_url);
804  metadata->Encode(&encoder);
805 }
806 
807 static void parse_grpc_header(const uint8_t* data, int* length,
808  bool* compressed) {
809  const uint8_t c = *data;
810  const uint8_t* p = data + 1;
811  *compressed = ((c & 0x01) == 0x01);
812  *length = 0;
813  *length |= (*p++) << 24;
814  *length |= (*p++) << 16;
815  *length |= (*p++) << 8;
816  *length |= (*p++);
817 }
818 
820  return b->get_pointer(grpc_core::HttpAuthorityMetadata()) != nullptr;
821 }
822 
823 /*
824  Op Execution: Decide if one of the actions contained in the stream op can be
825  executed. This is the heart of the state machine.
826 */
828  struct stream_obj* s, struct op_state* op_state,
829  enum e_op_id op_id) {
830  struct op_state* stream_state = &s->state;
831  grpc_cronet_transport* t = s->curr_ct;
832  bool result = true;
833  /* When call is canceled, every op can be run, except under following
834  conditions
835  */
836  bool is_canceled_or_failed = stream_state->state_op_done[OP_CANCEL_ERROR] ||
837  stream_state->state_callback_received[OP_FAILED];
838  if (is_canceled_or_failed) {
839  if (op_id == OP_SEND_INITIAL_METADATA) {
840  CRONET_LOG(GPR_DEBUG, "Because");
841  result = false;
842  }
843  if (op_id == OP_SEND_MESSAGE) {
844  CRONET_LOG(GPR_DEBUG, "Because");
845  result = false;
846  }
847  if (op_id == OP_SEND_TRAILING_METADATA) {
848  CRONET_LOG(GPR_DEBUG, "Because");
849  result = false;
850  }
851  if (op_id == OP_CANCEL_ERROR) {
852  CRONET_LOG(GPR_DEBUG, "Because");
853  result = false;
854  }
855  /* already executed */
856  if (op_id == OP_RECV_INITIAL_METADATA &&
857  stream_state->state_op_done[OP_RECV_INITIAL_METADATA]) {
858  CRONET_LOG(GPR_DEBUG, "Because");
859  result = false;
860  }
862  CRONET_LOG(GPR_DEBUG, "Because");
863  result = false;
864  }
865  if (op_id == OP_RECV_TRAILING_METADATA &&
866  stream_state->state_op_done[OP_RECV_TRAILING_METADATA]) {
867  CRONET_LOG(GPR_DEBUG, "Because");
868  result = false;
869  }
870  /* ON_COMPLETE can be processed if one of the following conditions is met:
871  * 1. the stream failed
872  * 2. the stream is cancelled, and the callback is received
873  * 3. the stream succeeded before cancel is effective
874  * 4. the stream is cancelled, and the stream is never started */
875  if (op_id == OP_ON_COMPLETE &&
876  !(stream_state->state_callback_received[OP_FAILED] ||
877  stream_state->state_callback_received[OP_CANCELED] ||
878  stream_state->state_callback_received[OP_SUCCEEDED] ||
879  !stream_state->state_op_done[OP_SEND_INITIAL_METADATA])) {
880  CRONET_LOG(GPR_DEBUG, "Because");
881  result = false;
882  }
883  } else if (op_id == OP_SEND_INITIAL_METADATA) {
884  /* already executed */
885  if (stream_state->state_op_done[OP_SEND_INITIAL_METADATA]) result = false;
886  } else if (op_id == OP_RECV_INITIAL_METADATA) {
887  if (stream_state->state_op_done[OP_RECV_INITIAL_METADATA]) {
888  /* already executed */
889  result = false;
890  } else if (!stream_state
892  /* we haven't sent headers yet. */
893  result = false;
894  } else if (!stream_state
896  !stream_state->state_op_done[OP_RECV_TRAILING_METADATA]) {
897  /* we haven't received headers yet. */
898  result = false;
899  }
900  } else if (op_id == OP_SEND_MESSAGE) {
902  /* already executed (note we're checking op specific state, not stream
903  state) */
904  result = false;
905  } else if (!stream_state
907  /* we haven't sent headers yet. */
908  result = false;
909  }
910  } else if (op_id == OP_RECV_MESSAGE) {
912  /* already executed */
913  result = false;
914  } else if (!stream_state
916  !stream_state->state_op_done[OP_RECV_TRAILING_METADATA]) {
917  /* we haven't received headers yet. */
918  result = false;
919  }
920  } else if (op_id == OP_RECV_TRAILING_METADATA) {
921  if (stream_state->state_op_done[OP_RECV_TRAILING_METADATA]) {
922  /* already executed */
923  result = false;
924  } else if (stream_state->state_op_done[OP_READ_REQ_MADE] &&
925  !stream_state->state_op_done[OP_RECV_MESSAGE]) {
926  /* we have asked for but haven't received message yet. */
927  result = false;
928  } else if (!stream_state
930  /* we haven't received trailers yet. */
931  result = false;
932  } else if (!stream_state->state_callback_received[OP_SUCCEEDED]) {
933  /* we haven't received on_succeeded yet. */
934  result = false;
935  }
936  } else if (op_id == OP_SEND_TRAILING_METADATA) {
937  if (stream_state->state_op_done[OP_SEND_TRAILING_METADATA]) {
938  /* already executed */
939  result = false;
940  } else if (!stream_state
942  /* we haven't sent initial metadata yet */
943  result = false;
944  } else if (stream_state->pending_send_message &&
945  !stream_state->state_op_done[OP_SEND_MESSAGE]) {
946  /* we haven't sent message yet */
947  result = false;
948  } else if (stream_state->state_op_done[OP_SEND_MESSAGE] &&
949  !stream_state->state_callback_received[OP_SEND_MESSAGE] &&
950  !(t->use_packet_coalescing &&
951  stream_state->pending_write_for_trailer)) {
952  /* we haven't got on_write_completed for the send yet */
953  result = false;
954  }
955  } else if (op_id == OP_CANCEL_ERROR) {
956  /* already executed */
957  if (stream_state->state_op_done[OP_CANCEL_ERROR]) result = false;
958  } else if (op_id == OP_ON_COMPLETE) {
960  /* already executed (note we're checking op specific state, not stream
961  state) */
962  CRONET_LOG(GPR_DEBUG, "Because");
963  result = false;
964  }
965  /* Check if every op that was asked for is done. */
966  /* TODO(muxi): We should not consider the recv ops here, since they
967  * have their own callbacks. We should invoke a batch's on_complete
968  * as soon as all of the batch's send ops are complete, even if
969  * there are still recv ops pending. */
970  else if (curr_op->send_initial_metadata &&
972  CRONET_LOG(GPR_DEBUG, "Because");
973  result = false;
974  } else if (curr_op->send_message &&
976  CRONET_LOG(GPR_DEBUG, "Because");
977  result = false;
978  } else if (curr_op->send_message &&
979  !stream_state->state_callback_received[OP_SEND_MESSAGE]) {
980  CRONET_LOG(GPR_DEBUG, "Because");
981  result = false;
982  } else if (curr_op->send_trailing_metadata &&
983  !stream_state->state_op_done[OP_SEND_TRAILING_METADATA]) {
984  CRONET_LOG(GPR_DEBUG, "Because");
985  result = false;
986  } else if (curr_op->recv_initial_metadata &&
987  !stream_state->state_op_done[OP_RECV_INITIAL_METADATA]) {
988  CRONET_LOG(GPR_DEBUG, "Because");
989  result = false;
990  } else if (curr_op->recv_message &&
992  CRONET_LOG(GPR_DEBUG, "Because");
993  result = false;
994  } else if (curr_op->cancel_stream &&
995  !stream_state->state_callback_received[OP_CANCELED]) {
996  CRONET_LOG(GPR_DEBUG, "Because");
997  result = false;
998  } else if (curr_op->recv_trailing_metadata) {
999  /* We aren't done with trailing metadata yet */
1000  if (!stream_state->state_op_done[OP_RECV_TRAILING_METADATA]) {
1001  CRONET_LOG(GPR_DEBUG, "Because");
1002  result = false;
1003  }
1004  /* We've asked for actual message in an earlier op, and it hasn't been
1005  delivered yet. */
1006  else if (stream_state->state_op_done[OP_READ_REQ_MADE]) {
1007  /* If this op is not the one asking for read, (which means some earlier
1008  op has asked), and the read hasn't been delivered. */
1009  if (!curr_op->recv_message &&
1010  !stream_state->state_callback_received[OP_SUCCEEDED]) {
1011  CRONET_LOG(GPR_DEBUG, "Because");
1012  result = false;
1013  }
1014  }
1015  }
1016  /* We should see at least one on_write_completed for the trailers that we
1017  sent */
1018  else if (curr_op->send_trailing_metadata &&
1019  !stream_state->state_callback_received[OP_SEND_MESSAGE]) {
1020  result = false;
1021  }
1022  }
1023  CRONET_LOG(GPR_DEBUG, "op_can_be_run %s : %s", op_id_string(op_id),
1024  result ? "YES" : "NO");
1025  return result;
1026 }
1027 
1028 /*
1029  TODO (makdharma): Break down this function in smaller chunks for readability.
1030 */
1031 static enum e_op_result execute_stream_op(struct op_and_state* oas) {
1032  grpc_transport_stream_op_batch* stream_op = &oas->op;
1033  struct stream_obj* s = oas->s;
1034  grpc_cronet_transport* t = s->curr_ct;
1035  struct op_state* stream_state = &s->state;
1037  if (stream_op->send_initial_metadata &&
1038  op_can_be_run(stream_op, s, &oas->state, OP_SEND_INITIAL_METADATA)) {
1039  CRONET_LOG(GPR_DEBUG, "running: %p OP_SEND_INITIAL_METADATA", oas);
1040  /* Start new cronet stream. It is destroyed in on_succeeded, on_canceled,
1041  * on_failed */
1042  GPR_ASSERT(s->cbs == nullptr);
1044  s->cbs =
1045  bidirectional_stream_create(t->engine, s->curr_gs, &cronet_callbacks);
1046  CRONET_LOG(GPR_DEBUG, "%p = bidirectional_stream_create()", s->cbs);
1047  if (t->use_packet_coalescing) {
1050  }
1051  std::string url;
1052  const char* method = "POST";
1053  s->header_array.headers = nullptr;
1055  stream_op->payload->send_initial_metadata.send_initial_metadata,
1056  t->host, &url, &s->header_array.headers, &s->header_array.count,
1057  &method);
1058  s->header_array.capacity = s->header_array.count;
1059  CRONET_LOG(GPR_DEBUG, "bidirectional_stream_start(%p, %s)", s->cbs,
1060  url.c_str());
1061  bidirectional_stream_start(s->cbs, url.c_str(), 0, method, &s->header_array,
1062  false);
1063  unsigned int header_index;
1064  for (header_index = 0; header_index < s->header_array.count;
1065  header_index++) {
1066  gpr_free(const_cast<char*>(s->header_array.headers[header_index].key));
1067  gpr_free(const_cast<char*>(s->header_array.headers[header_index].value));
1068  }
1069  stream_state->state_op_done[OP_SEND_INITIAL_METADATA] = true;
1070  if (t->use_packet_coalescing) {
1071  if (!stream_op->send_message && !stream_op->send_trailing_metadata) {
1072  s->state.flush_cronet_when_ready = true;
1073  }
1074  }
1076  } else if (stream_op->send_message &&
1077  op_can_be_run(stream_op, s, &oas->state, OP_SEND_MESSAGE)) {
1078  CRONET_LOG(GPR_DEBUG, "running: %p OP_SEND_MESSAGE", oas);
1079  stream_state->pending_send_message = false;
1080  if (stream_state->state_op_done[OP_CANCEL_ERROR] ||
1081  stream_state->state_callback_received[OP_FAILED] ||
1082  stream_state->state_callback_received[OP_SUCCEEDED]) {
1084  CRONET_LOG(GPR_DEBUG, "Stream is either cancelled, failed or finished");
1085  } else {
1086  size_t write_buffer_size;
1088  stream_op->payload->send_message.send_message->c_slice_buffer(),
1089  &stream_state->ws.write_buffer, &write_buffer_size,
1090  stream_op->payload->send_message.flags);
1091  if (write_buffer_size > 0) {
1092  CRONET_LOG(GPR_DEBUG, "bidirectional_stream_write (%p, %p)", s->cbs,
1093  stream_state->ws.write_buffer);
1094  stream_state->state_callback_received[OP_SEND_MESSAGE] = false;
1095  bidirectional_stream_write(s->cbs, stream_state->ws.write_buffer,
1096  static_cast<int>(write_buffer_size), false);
1097  if (t->use_packet_coalescing) {
1098  if (!stream_op->send_trailing_metadata) {
1099  CRONET_LOG(GPR_DEBUG, "bidirectional_stream_flush (%p)", s->cbs);
1102  } else {
1103  stream_state->pending_write_for_trailer = true;
1105  }
1106  } else {
1108  }
1109  } else {
1110  /* Should never reach here */
1111  GPR_ASSERT(false);
1112  }
1113  }
1114  stream_state->state_op_done[OP_SEND_MESSAGE] = true;
1115  oas->state.state_op_done[OP_SEND_MESSAGE] = true;
1116  } else if (stream_op->send_trailing_metadata &&
1117  op_can_be_run(stream_op, s, &oas->state,
1119  CRONET_LOG(GPR_DEBUG, "running: %p OP_SEND_TRAILING_METADATA", oas);
1120  if (stream_state->state_op_done[OP_CANCEL_ERROR] ||
1121  stream_state->state_callback_received[OP_FAILED] ||
1122  stream_state->state_callback_received[OP_SUCCEEDED]) {
1124  CRONET_LOG(GPR_DEBUG, "Stream is either cancelled, failed or finished");
1125  } else {
1126  CRONET_LOG(GPR_DEBUG, "bidirectional_stream_write (%p, 0)", s->cbs);
1127  stream_state->state_callback_received[OP_SEND_MESSAGE] = false;
1128  bidirectional_stream_write(s->cbs, "", 0, true);
1129  if (t->use_packet_coalescing) {
1130  CRONET_LOG(GPR_DEBUG, "bidirectional_stream_flush (%p)", s->cbs);
1132  }
1134  }
1135  stream_state->state_op_done[OP_SEND_TRAILING_METADATA] = true;
1136  } else if (stream_op->recv_initial_metadata &&
1137  op_can_be_run(stream_op, s, &oas->state,
1139  CRONET_LOG(GPR_DEBUG, "running: %p OP_RECV_INITIAL_METADATA", oas);
1140  if (stream_state->state_op_done[OP_CANCEL_ERROR]) {
1143  stream_op->payload->recv_initial_metadata.recv_initial_metadata_ready,
1144  GRPC_ERROR_NONE);
1145  } else if (stream_state->state_callback_received[OP_FAILED]) {
1148  stream_op->payload->recv_initial_metadata.recv_initial_metadata_ready,
1149  GRPC_ERROR_NONE);
1150  } else if (stream_state->state_op_done[OP_RECV_TRAILING_METADATA]) {
1153  stream_op->payload->recv_initial_metadata.recv_initial_metadata_ready,
1154  GRPC_ERROR_NONE);
1155  } else {
1156  *stream_op->payload->recv_initial_metadata.recv_initial_metadata =
1160  stream_op->payload->recv_initial_metadata.recv_initial_metadata_ready,
1161  GRPC_ERROR_NONE);
1162  }
1163  stream_state->state_op_done[OP_RECV_INITIAL_METADATA] = true;
1165  } else if (stream_op->recv_message &&
1166  op_can_be_run(stream_op, s, &oas->state, OP_RECV_MESSAGE)) {
1167  CRONET_LOG(GPR_DEBUG, "running: %p OP_RECV_MESSAGE", oas);
1168  if (stream_state->state_op_done[OP_CANCEL_ERROR]) {
1169  CRONET_LOG(GPR_DEBUG, "Stream is cancelled.");
1171  DEBUG_LOCATION, stream_op->payload->recv_message.recv_message_ready,
1172  GRPC_ERROR_NONE);
1173  stream_state->state_op_done[OP_RECV_MESSAGE] = true;
1174  oas->state.state_op_done[OP_RECV_MESSAGE] = true;
1176  } else if (stream_state->state_callback_received[OP_FAILED]) {
1177  CRONET_LOG(GPR_DEBUG, "Stream failed.");
1179  DEBUG_LOCATION, stream_op->payload->recv_message.recv_message_ready,
1180  GRPC_ERROR_NONE);
1181  stream_state->state_op_done[OP_RECV_MESSAGE] = true;
1182  oas->state.state_op_done[OP_RECV_MESSAGE] = true;
1184  } else if (stream_state->rs.read_stream_closed) {
1185  /* No more data will be received */
1186  CRONET_LOG(GPR_DEBUG, "read stream closed");
1188  DEBUG_LOCATION, stream_op->payload->recv_message.recv_message_ready,
1189  GRPC_ERROR_NONE);
1190  stream_state->state_op_done[OP_RECV_MESSAGE] = true;
1191  oas->state.state_op_done[OP_RECV_MESSAGE] = true;
1193  } else if (stream_state->flush_read) {
1194  CRONET_LOG(GPR_DEBUG, "flush read");
1196  DEBUG_LOCATION, stream_op->payload->recv_message.recv_message_ready,
1197  GRPC_ERROR_NONE);
1198  stream_state->state_op_done[OP_RECV_MESSAGE] = true;
1199  oas->state.state_op_done[OP_RECV_MESSAGE] = true;
1201  } else if (!stream_state->rs.length_field_received) {
1202  if (stream_state->rs.received_bytes == GRPC_HEADER_SIZE_IN_BYTES &&
1203  stream_state->rs.remaining_bytes == 0) {
1204  /* Start a read operation for data */
1205  stream_state->rs.length_field_received = true;
1207  reinterpret_cast<const uint8_t*>(stream_state->rs.read_buffer),
1208  &stream_state->rs.length_field, &stream_state->rs.compressed);
1209  CRONET_LOG(GPR_DEBUG, "length field = %d",
1210  stream_state->rs.length_field);
1211  if (stream_state->rs.length_field > 0) {
1212  stream_state->rs.read_buffer = static_cast<char*>(
1213  gpr_malloc(static_cast<size_t>(stream_state->rs.length_field)));
1214  GPR_ASSERT(stream_state->rs.read_buffer);
1215  stream_state->rs.remaining_bytes = stream_state->rs.length_field;
1216  stream_state->rs.received_bytes = 0;
1217  CRONET_LOG(GPR_DEBUG, "bidirectional_stream_read(%p)", s->cbs);
1218  stream_state->state_op_done[OP_READ_REQ_MADE] =
1219  true; /* Indicates that at least one read request has been made */
1220  bidirectional_stream_read(s->cbs, stream_state->rs.read_buffer,
1221  stream_state->rs.remaining_bytes);
1223  } else {
1224  stream_state->rs.remaining_bytes = 0;
1225  CRONET_LOG(GPR_DEBUG, "read operation complete. Empty response.");
1226  /* Clean up read_slice_buffer in case there is unread data. */
1227  stream_state->rs.read_slice_buffer.Clear();
1228  uint32_t flags = 0;
1229  if (stream_state->rs.compressed) {
1231  }
1232  *stream_op->payload->recv_message.flags = flags;
1233  *stream_op->payload->recv_message.recv_message =
1234  std::move(stream_state->rs.read_slice_buffer);
1237  stream_op->payload->recv_message.recv_message_ready,
1238  GRPC_ERROR_NONE);
1239  stream_state->state_op_done[OP_RECV_MESSAGE] = true;
1240  oas->state.state_op_done[OP_RECV_MESSAGE] = true;
1241 
1242  /* Extra read to trigger on_succeed */
1243  stream_state->rs.length_field_received = false;
1244  stream_state->state_op_done[OP_READ_REQ_MADE] =
1245  true; /* Indicates that at least one read request has been made */
1246  read_grpc_header(s);
1248  }
1249  } else if (stream_state->rs.remaining_bytes == 0) {
1250  /* Start a read operation for first 5 bytes (GRPC header) */
1251  stream_state->rs.read_buffer = stream_state->rs.grpc_header_bytes;
1253  stream_state->rs.received_bytes = 0;
1254  stream_state->rs.compressed = false;
1255  CRONET_LOG(GPR_DEBUG, "bidirectional_stream_read(%p)", s->cbs);
1256  stream_state->state_op_done[OP_READ_REQ_MADE] =
1257  true; /* Indicates that at least one read request has been made */
1258  bidirectional_stream_read(s->cbs, stream_state->rs.read_buffer,
1259  stream_state->rs.remaining_bytes);
1261  } else {
1263  }
1264  } else if (stream_state->rs.remaining_bytes == 0) {
1265  CRONET_LOG(GPR_DEBUG, "read operation complete");
1266  grpc_slice read_data_slice =
1267  GRPC_SLICE_MALLOC((uint32_t)stream_state->rs.length_field);
1268  uint8_t* dst_p = GRPC_SLICE_START_PTR(read_data_slice);
1269  memcpy(dst_p, stream_state->rs.read_buffer,
1270  static_cast<size_t>(stream_state->rs.length_field));
1272  /* Clean up read_slice_buffer in case there is unread data. */
1273  stream_state->rs.read_slice_buffer.Clear();
1274  stream_state->rs.read_slice_buffer.Append(
1275  grpc_core::Slice(read_data_slice));
1276  uint32_t flags = 0;
1277  if (stream_state->rs.compressed) {
1279  }
1280  *stream_op->payload->recv_message.flags = flags;
1281  *stream_op->payload->recv_message.recv_message =
1282  std::move(stream_state->rs.read_slice_buffer);
1284  DEBUG_LOCATION, stream_op->payload->recv_message.recv_message_ready,
1285  GRPC_ERROR_NONE);
1286  stream_state->state_op_done[OP_RECV_MESSAGE] = true;
1287  oas->state.state_op_done[OP_RECV_MESSAGE] = true;
1288  /* Do an extra read to trigger on_succeeded() callback in case connection
1289  is closed */
1290  stream_state->rs.length_field_received = false;
1291  read_grpc_header(s);
1293  }
1294  } else if (stream_op->recv_trailing_metadata &&
1295  op_can_be_run(stream_op, s, &oas->state,
1297  CRONET_LOG(GPR_DEBUG, "running: %p OP_RECV_TRAILING_METADATA", oas);
1299  if (stream_state->state_op_done[OP_CANCEL_ERROR]) {
1300  error = GRPC_ERROR_REF(stream_state->cancel_error);
1301  } else if (stream_state->state_callback_received[OP_FAILED]) {
1302  grpc_status_code grpc_error_code =
1304  const char* desc = cronet_net_error_as_string(stream_state->net_error);
1305  error =
1306  make_error_with_desc(grpc_error_code, stream_state->net_error, desc);
1307  } else if (oas->s->state.rs.trailing_metadata_valid) {
1308  *stream_op->payload->recv_trailing_metadata.recv_trailing_metadata =
1310  stream_state->rs.trailing_metadata_valid = false;
1311  }
1314  stream_op->payload->recv_trailing_metadata.recv_trailing_metadata_ready,
1315  error);
1316  stream_state->state_op_done[OP_RECV_TRAILING_METADATA] = true;
1318  } else if (stream_op->cancel_stream &&
1319  op_can_be_run(stream_op, s, &oas->state, OP_CANCEL_ERROR)) {
1320  CRONET_LOG(GPR_DEBUG, "running: %p OP_CANCEL_ERROR", oas);
1321  if (s->cbs) {
1322  CRONET_LOG(GPR_DEBUG, "W: bidirectional_stream_cancel(%p)", s->cbs);
1325  } else {
1327  }
1328  stream_state->state_op_done[OP_CANCEL_ERROR] = true;
1329  if (GRPC_ERROR_IS_NONE(stream_state->cancel_error)) {
1330  stream_state->cancel_error =
1332  }
1333  } else if (op_can_be_run(stream_op, s, &oas->state, OP_ON_COMPLETE)) {
1334  CRONET_LOG(GPR_DEBUG, "running: %p OP_ON_COMPLETE", oas);
1335  if (stream_state->state_op_done[OP_CANCEL_ERROR]) {
1336  if (stream_op->on_complete) {
1338  GRPC_ERROR_REF(stream_state->cancel_error));
1339  }
1340  } else if (stream_state->state_callback_received[OP_FAILED]) {
1341  if (stream_op->on_complete) {
1342  const char* error_message =
1343  cronet_net_error_as_string(stream_state->net_error);
1344  grpc_status_code grpc_error_code =
1347  DEBUG_LOCATION, stream_op->on_complete,
1348  make_error_with_desc(grpc_error_code, stream_state->net_error,
1349  error_message));
1350  }
1351  } else {
1352  /* All actions in this stream_op are complete. Call the on_complete
1353  * callback
1354  */
1355  if (stream_op->on_complete) {
1357  GRPC_ERROR_NONE);
1358  }
1359  }
1360  oas->state.state_op_done[OP_ON_COMPLETE] = true;
1361  oas->done = true;
1362  /* reset any send message state, only if this ON_COMPLETE is about a send.
1363  */
1364  if (stream_op->send_message) {
1365  stream_state->state_callback_received[OP_SEND_MESSAGE] = false;
1366  stream_state->state_op_done[OP_SEND_MESSAGE] = false;
1367  }
1369  /* If this is the on_complete callback being called for a received message -
1370  make a note */
1371  if (stream_op->recv_message) {
1372  stream_state->state_op_done[OP_RECV_MESSAGE_AND_ON_COMPLETE] = true;
1373  }
1374  } else {
1376  }
1377  return result;
1378 }
1379 
1380 /*
1381  Functions used by upper layers to access transport functionality.
1382 */
1383 
1387  : arena(arena),
1388  curr_ct(reinterpret_cast<grpc_cronet_transport*>(gt)),
1389  curr_gs(gs),
1390  state(arena),
1391  refcount(refcount) {
1392  GRPC_CRONET_STREAM_REF(this, "cronet transport");
1393  gpr_mu_init(&mu);
1394 }
1395 
1398  GRPC_ERROR_UNREF(state.cancel_error);
1399 }
1400 
1403  const void* /*server_data*/, grpc_core::Arena* arena) {
1404  new (gs) stream_obj(gt, gs, refcount, arena);
1405  return 0;
1406 }
1407 
1409  grpc_pollset* /*pollset*/) {}
1410 
1412  grpc_stream* /*gs*/,
1413  grpc_pollset_set* /*pollset_set*/) {}
1414 
1417  CRONET_LOG(GPR_DEBUG, "perform_stream_op");
1418  if (op->send_initial_metadata &&
1420  op->payload->send_initial_metadata.send_initial_metadata)) {
1421  /* Cronet does not support :authority header field. We cancel the call when
1422  this field is present in metadata */
1423  if (op->recv_initial_metadata) {
1426  op->payload->recv_initial_metadata.recv_initial_metadata_ready,
1428  }
1429  if (op->recv_message) {
1431  op->payload->recv_message.recv_message_ready,
1433  }
1434  if (op->recv_trailing_metadata) {
1437  op->payload->recv_trailing_metadata.recv_trailing_metadata_ready,
1439  }
1442  return;
1443  }
1444  stream_obj* s = reinterpret_cast<stream_obj*>(gs);
1445  add_to_storage(s, op);
1447 }
1448 
1449 static void destroy_stream(grpc_transport* /*gt*/, grpc_stream* gs,
1450  grpc_closure* then_schedule_closure) {
1451  stream_obj* s = reinterpret_cast<stream_obj*>(gs);
1452  s->~stream_obj();
1453  grpc_core::ExecCtx::Run(DEBUG_LOCATION, then_schedule_closure,
1454  GRPC_ERROR_NONE);
1455 }
1456 
1457 static void destroy_transport(grpc_transport* /*gt*/) {}
1458 
1459 static grpc_endpoint* get_endpoint(grpc_transport* /*gt*/) { return nullptr; }
1460 
1461 static void perform_op(grpc_transport* /*gt*/, grpc_transport_op* /*op*/) {}
1462 
1464  sizeof(stream_obj),
1465  "cronet_http",
1466  init_stream,
1467  nullptr,
1471  perform_op,
1474  get_endpoint};
1475 
1477  const grpc_channel_args* args,
1478  void* /*reserved*/) {
1479  grpc_cronet_transport* ct = static_cast<grpc_cronet_transport*>(
1481  if (!ct) {
1482  goto error;
1483  }
1485  ct->engine = static_cast<stream_engine*>(engine);
1486  ct->host = static_cast<char*>(gpr_malloc(strlen(target) + 1));
1487  if (!ct->host) {
1488  goto error;
1489  }
1490  strcpy(ct->host, target);
1491 
1492  ct->use_packet_coalescing = true;
1493  if (args) {
1494  for (size_t i = 0; i < args->num_args; i++) {
1495  if (0 ==
1496  strcmp(args->args[i].key, GRPC_ARG_USE_CRONET_PACKET_COALESCING)) {
1497  if (GPR_UNLIKELY(args->args[i].type != GRPC_ARG_INTEGER)) {
1498  gpr_log(GPR_ERROR, "%s ignored: it must be an integer",
1500  } else {
1501  ct->use_packet_coalescing = (args->args[i].value.integer != 0);
1502  }
1503  }
1504  }
1505  }
1506 
1507  return &ct->base;
1508 
1509 error:
1510  if (ct) {
1511  if (ct->host) {
1512  gpr_free(ct->host);
1513  }
1514  gpr_free(ct);
1515  }
1516 
1517  return nullptr;
1518 }
write_state
Definition: cronet_transport.cc:158
GRPC_CRONET_STREAM_UNREF
#define GRPC_CRONET_STREAM_UNREF(stream, reason)
Definition: cronet_transport.cc:234
trace.h
op_state::flush_read
bool flush_read
Definition: cronet_transport.cc:171
on_failed
static void on_failed(bidirectional_stream *, int)
Definition: cronet_transport.cc:444
slice.h
grpc_chttp2_base64_infer_length_after_decode
size_t grpc_chttp2_base64_infer_length_after_decode(const grpc_slice &slice)
Definition: bin_decoder.cc:85
_gevent_test_main.result
result
Definition: _gevent_test_main.py:96
grpc_cronet_vtable
static const grpc_transport_vtable grpc_cronet_vtable
Definition: cronet_transport.cc:1463
gpr_mu_unlock
GPRAPI void gpr_mu_unlock(gpr_mu *mu)
set_pollset_set_do_nothing
static void set_pollset_set_do_nothing(grpc_transport *, grpc_stream *, grpc_pollset_set *)
Definition: cronet_transport.cc:1411
pollset.h
GRPC_ERROR_NONE
#define GRPC_ERROR_NONE
Definition: error.h:234
log.h
op_and_state
Definition: cronet_transport.cc:187
metadata_batch.h
grpc_core::MetadataMap::Append
void Append(absl::string_view key, Slice value, MetadataParseErrorFn on_error)
Definition: metadata_batch.h:1156
absl::StrCat
std::string StrCat(const AlphaNum &a, const AlphaNum &b)
Definition: abseil-cpp/absl/strings/str_cat.cc:98
read_state::received_bytes
int received_bytes
Definition: cronet_transport.cc:139
metadata
Definition: cq_verifier.cc:48
GRPC_ARG_INTEGER
@ GRPC_ARG_INTEGER
Definition: grpc_types.h:81
absl::StrFormat
ABSL_MUST_USE_RESULT std::string StrFormat(const FormatSpec< Args... > &format, const Args &... args)
Definition: abseil-cpp/absl/strings/str_format.h:338
grpc_transport_vtable
Definition: transport_impl.h:37
grpc_transport_stream_op_batch::recv_message
bool recv_message
Definition: transport.h:322
read_state::length_field
int length_field
Definition: cronet_transport.cc:141
cronet_status.h
slice.h
bidirectional_stream_cancel
void bidirectional_stream_cancel(bidirectional_stream *)
Definition: cronet_api_phony.cc:66
op_and_state::s
struct stream_obj * s
Definition: cronet_transport.cc:193
capacity
uint16_t capacity
Definition: protobuf/src/google/protobuf/descriptor.cc:948
grpc_core::HttpMethodMetadata
Definition: metadata_batch.h:136
get_endpoint
static grpc_endpoint * get_endpoint(grpc_transport *)
Definition: cronet_transport.cc:1459
grpc_transport_stream_op_batch::on_complete
grpc_closure * on_complete
Definition: transport.h:304
grpc_core::Slice
Definition: src/core/lib/slice/slice.h:282
create_grpc_frame
static void create_grpc_frame(grpc_slice_buffer *write_slice_buffer, char **pp_write_buffer, size_t *p_write_buffer_size, uint32_t flags)
Definition: cronet_transport.cc:685
op_state::ws
struct write_state ws
Definition: cronet_transport.cc:182
grpc_core::HttpPathMetadata
Definition: metadata_batch.h:262
grpc_pollset_set
struct grpc_pollset_set grpc_pollset_set
Definition: iomgr_fwd.h:23
string.h
grpc_core::slice_detail::StaticConstructors< Slice >::FromStaticString
static Slice FromStaticString(const char *s)
Definition: src/core/lib/slice/slice.h:201
op_state::state_op_done
bool state_op_done[OP_NUM_OPS]
Definition: cronet_transport.cc:166
absl::string_view
Definition: abseil-cpp/absl/strings/string_view.h:167
grpc_slice_buffer::slices
grpc_slice * slices
Definition: include/grpc/impl/codegen/slice.h:89
op_state::rs
struct read_state rs
Definition: cronet_transport.cc:180
grpc_core::HttpSchemeMetadata::ValueType
ValueType
Definition: metadata_batch.h:116
gpr_free
GPRAPI void gpr_free(void *ptr)
Definition: alloc.cc:51
GRPC_SLICE_MALLOC
#define GRPC_SLICE_MALLOC(len)
Definition: include/grpc/slice.h:70
bidirectional_stream_header
Definition: bidirectional_stream_c.h:39
cronet_transport.h
testing::internal::string
::std::string string
Definition: bloaty/third_party/protobuf/third_party/googletest/googletest/include/gtest/internal/gtest-port.h:881
error
grpc_error_handle error
Definition: retry_filter.cc:499
bidirectional_stream
Definition: bidirectional_stream_c.h:33
grpc_status_code
grpc_status_code
Definition: include/grpc/impl/codegen/status.h:28
grpc_core::HttpMethodMetadata::kPut
@ kPut
Definition: metadata_batch.h:141
op_id_string
static const char * op_id_string(enum e_op_id i)
Definition: cronet_transport.cc:267
destroy_stream
static void destroy_stream(grpc_transport *, grpc_stream *gs, grpc_closure *then_schedule_closure)
Definition: cronet_transport.cc:1449
grpc_cronet_transport::use_packet_coalescing
bool use_packet_coalescing
Definition: cronet_transport.cc:126
arena.h
init_stream
static int init_stream(grpc_transport *gt, grpc_stream *gs, grpc_stream_refcount *refcount, const void *, grpc_core::Arena *arena)
Definition: cronet_transport.cc:1401
gpr_malloc
GPRAPI void * gpr_malloc(size_t size)
Definition: alloc.cc:29
closure.h
bidirectional_stream_header::value
const char * value
Definition: bidirectional_stream_c.h:41
cronet_callbacks
static bidirectional_stream_callback cronet_callbacks
Definition: cronet_transport.cc:111
OK
@ OK
Definition: cronet_status.h:43
grpc_core::ApplicationCallbackExecCtx
Definition: exec_ctx.h:283
OP_SEND_MESSAGE
@ OP_SEND_MESSAGE
Definition: cronet_transport.cc:83
grpc_transport_stream_op_batch_payload::send_initial_metadata
grpc_metadata_batch * send_initial_metadata
Definition: transport.h:346
absl::FormatConversionChar::s
@ s
check_documentation.path
path
Definition: check_documentation.py:57
grpc_cronet_trace
grpc_core::TraceFlag grpc_cronet_trace(false, "cronet")
xds_manager.p
p
Definition: xds_manager.py:60
op_and_state::op
grpc_transport_stream_op_batch op
Definition: cronet_transport.cc:190
GRPC_ERROR_CANCELLED
#define GRPC_ERROR_CANCELLED
Definition: error.h:238
bidirectional_stream_c.h
grpc_is_binary_header_internal
int grpc_is_binary_header_internal(const grpc_slice &slice)
Definition: validate_metadata.cc:126
on_write_completed
static void on_write_completed(bidirectional_stream *, const char *)
Definition: cronet_transport.cc:587
add_to_storage
static void add_to_storage(struct stream_obj *s, grpc_transport_stream_op_batch *op)
Definition: cronet_transport.cc:335
parse_grpc_header
static void parse_grpc_header(const uint8_t *data, int *length, bool *compressed)
Definition: cronet_transport.cc:807
op_state::net_error
cronet_net_error_code net_error
Definition: cronet_transport.cc:177
read_state::read_buffer
char * read_buffer
Definition: cronet_transport.cc:137
uint8_t
unsigned char uint8_t
Definition: stdint-msvc2008.h:78
OP_RECV_INITIAL_METADATA
@ OP_RECV_INITIAL_METADATA
Definition: cronet_transport.cc:86
grpc_create_cronet_transport
grpc_transport * grpc_create_cronet_transport(void *engine, const char *target, const grpc_channel_args *args, void *)
Definition: cronet_transport.cc:1476
read_grpc_header
static void read_grpc_header(stream_obj *s)
Definition: cronet_transport.cc:309
read_state::remaining_bytes
int remaining_bytes
Definition: cronet_transport.cc:140
write_buffer
static char write_buffer[WRITE_BUFFER_SIZE]
Definition: benchmark-pump.c:69
grpc_core::Arena
Definition: src/core/lib/resource_quota/arena.h:45
grpc_channel_args
Definition: grpc_types.h:132
on_succeeded
static void on_succeeded(bidirectional_stream *)
Definition: cronet_transport.cc:499
T
#define T(upbtypeconst, upbtype, ctype, default_value)
grpc_transport_op
Definition: transport.h:452
arena
grpc_core::ScopedArenaPtr arena
Definition: binder_transport_test.cc:237
read_state::read_state
read_state(grpc_core::Arena *arena)
Definition: cronet_transport.cc:133
OP_SEND_INITIAL_METADATA
@ OP_SEND_INITIAL_METADATA
Definition: cronet_transport.cc:82
op_can_be_run
static bool op_can_be_run(grpc_transport_stream_op_batch *curr_op, struct stream_obj *s, struct op_state *op_state, enum e_op_id op_id)
Definition: cronet_transport.cc:827
cronet_net_error_code
cronet_net_error_code
Definition: cronet_status.h:26
status.h
OP_RECV_TRAILING_METADATA
@ OP_RECV_TRAILING_METADATA
Definition: cronet_transport.cc:87
grpc_cronet_transport::base
grpc_transport base
Definition: cronet_transport.cc:123
read_state::trailing_metadata_valid
bool trailing_metadata_valid
Definition: cronet_transport.cc:152
bidirectional_stream_start
int bidirectional_stream_start(bidirectional_stream *, const char *, int, const char *, const bidirectional_stream_header_array *, bool)
Definition: cronet_api_phony.cc:44
grpc_cronet_transport
Definition: cronet_transport.cc:122
uint32_t
unsigned int uint32_t
Definition: stdint-msvc2008.h:80
DEBUG_LOCATION
#define DEBUG_LOCATION
Definition: debug_location.h:41
setup.url
url
Definition: setup.py:547
refcount
size_t refcount
Definition: abseil-cpp/absl/strings/internal/cordz_info.cc:122
memcpy
memcpy(mem, inblock.get(), min(CONTAINING_RECORD(inblock.get(), MEMBLOCK, data) ->size, size))
grpc_transport_stream_op_batch::cancel_stream
bool cancel_stream
Definition: transport.h:329
grpc_cronet_stream_unref
void grpc_cronet_stream_unref(stream_obj *s, const char *reason)
Definition: cronet_transport.cc:239
read_state::grpc_header_bytes
char grpc_header_bytes[GRPC_HEADER_SIZE_IN_BYTES]
Definition: cronet_transport.cc:143
c
void c(T a)
Definition: miscompile_with_no_unique_address_test.cc:40
on_stream_ready
static void on_stream_ready(bidirectional_stream *)
Definition: cronet_transport.cc:518
stream_obj::header_array
bidirectional_stream_header_array header_array
Definition: cronet_transport.cc:214
asyncio_get_stats.args
args
Definition: asyncio_get_stats.py:40
grpc_core::SliceBuffer::Clear
void Clear()
Removes and unrefs all slices in the SliceBuffer.
Definition: src/core/lib/slice/slice_buffer.h:84
OP_FAILED
@ OP_FAILED
Definition: cronet_transport.cc:90
absl::move
constexpr absl::remove_reference_t< T > && move(T &&t) noexcept
Definition: abseil-cpp/absl/utility/utility.h:221
GPR_ASSERT
#define GPR_ASSERT(x)
Definition: include/grpc/impl/codegen/log.h:94
read_state::payload_field
char * payload_field
Definition: cronet_transport.cc:144
capacity_
uint32_t capacity_
Definition: abseil-cpp/absl/synchronization/internal/graphcycles.cc:132
convert_cronet_array_to_metadata
static void convert_cronet_array_to_metadata(const bidirectional_stream_header_array *header_array, grpc_metadata_batch *mds)
Definition: cronet_transport.cc:416
grpc_core::SliceBuffer::c_slice_buffer
grpc_slice_buffer * c_slice_buffer()
Return a pointer to the back raw grpc_slice_buffer.
Definition: src/core/lib/slice/slice_buffer.h:117
grpc_slice_buffer::count
size_t count
Definition: include/grpc/impl/codegen/slice.h:91
grpc_core::HttpAuthorityMetadata
Definition: metadata_batch.h:256
perform_op
static void perform_op(grpc_transport *, grpc_transport_op *)
Definition: cronet_transport.cc:1461
op_and_state::state
struct op_state state
Definition: cronet_transport.cc:191
gpr_log
GPRAPI void gpr_log(const char *file, int line, gpr_log_severity severity, const char *format,...) GPR_PRINT_FORMAT_CHECK(4
gpr_mu_init
GPRAPI void gpr_mu_init(gpr_mu *mu)
execute_stream_op
static enum e_op_result execute_stream_op(struct op_and_state *oas)
Definition: cronet_transport.cc:1031
GPR_UNLIKELY
#define GPR_UNLIKELY(x)
Definition: impl/codegen/port_platform.h:770
read_state::read_stream_closed
bool read_stream_closed
Definition: cronet_transport.cc:145
destroy_transport
static void destroy_transport(grpc_transport *)
Definition: cronet_transport.cc:1457
grpc_core::slice_detail::BaseSlice::c_slice
const grpc_slice & c_slice() const
Definition: src/core/lib/slice/slice.h:78
op_result_string
static const char * op_result_string(enum e_op_result i)
Definition: cronet_transport.cc:255
OP_RECV_MESSAGE
@ OP_RECV_MESSAGE
Definition: cronet_transport.cc:85
bidirectional_stream_disable_auto_flush
void bidirectional_stream_disable_auto_flush(bidirectional_stream *, bool)
Definition: cronet_api_phony.cc:70
OP_RECV_MESSAGE_AND_ON_COMPLETE
@ OP_RECV_MESSAGE_AND_ON_COMPLETE
Definition: cronet_transport.cc:93
grpc_transport_stream_op_batch_payload::cancel_error
grpc_error_handle cancel_error
Definition: transport.h:444
OP_NUM_OPS
@ OP_NUM_OPS
Definition: cronet_transport.cc:95
grpc_core::HttpMethodMetadata::ValueType
ValueType
Definition: metadata_batch.h:138
op_state::fail_state
bool fail_state
Definition: cronet_transport.cc:169
grpc_slice_buffer::length
size_t length
Definition: include/grpc/impl/codegen/slice.h:96
GRPC_SLICE_START_PTR
#define GRPC_SLICE_START_PTR(slice)
Definition: include/grpc/impl/codegen/slice.h:101
write_state::write_buffer
char * write_buffer
Definition: cronet_transport.cc:159
on_response_headers_received
static void on_response_headers_received(bidirectional_stream *, const bidirectional_stream_header_array *, const char *)
Definition: cronet_transport.cc:547
grpc_slice_from_static_string
GPRAPI grpc_slice grpc_slice_from_static_string(const char *source)
Definition: slice/slice.cc:89
grpc_transport_stream_op_batch::payload
grpc_transport_stream_op_batch_payload * payload
Definition: transport.h:307
OP_CANCEL_ERROR
@ OP_CANCEL_ERROR
Definition: cronet_transport.cc:88
grpc_slice
Definition: include/grpc/impl/codegen/slice.h:65
cronet_net_error_as_string
const char * cronet_net_error_as_string(cronet_net_error_code net_error)
Definition: cronet_status.cc:23
grpc_stream_ref
void grpc_stream_ref(grpc_stream_refcount *refcount, const char *reason)
Definition: transport.h:203
gpr_mu_lock
GPRAPI void gpr_mu_lock(gpr_mu *mu)
read_state::compressed
bool compressed
Definition: cronet_transport.cc:142
bidirectional_stream_header::key
const char * key
Definition: bidirectional_stream_c.h:40
op_state::cancel_error
grpc_error_handle cancel_error
Definition: cronet_transport.cc:178
metadata::count
size_t count
Definition: cq_verifier.cc:49
error.h
op_storage::num_pending_ops
int num_pending_ops
Definition: cronet_transport.cc:199
data
char data[kBufferLength]
Definition: abseil-cpp/absl/strings/internal/str_format/float_conversion.cc:1006
CRONET_LOG
#define CRONET_LOG(...)
Definition: cronet_transport.cc:70
GRPC_ARG_USE_CRONET_PACKET_COALESCING
#define GRPC_ARG_USE_CRONET_PACKET_COALESCING
Definition: grpc_types.h:326
execute_from_storage
static void execute_from_storage(stream_obj *s)
Definition: cronet_transport.cc:393
stream_obj::oas
struct op_and_state * oas
Definition: cronet_transport.cc:209
GPR_ERROR
#define GPR_ERROR
Definition: include/grpc/impl/codegen/log.h:57
GPR_UNREACHABLE_CODE
#define GPR_UNREACHABLE_CODE(STATEMENT)
Definition: impl/codegen/port_platform.h:652
op_and_state::next
struct op_and_state * next
Definition: cronet_transport.cc:195
b
uint64_t b
Definition: abseil-cpp/absl/container/internal/layout_test.cc:53
bidirectional_stream_create
bidirectional_stream * bidirectional_stream_create(stream_engine *, void *, bidirectional_stream_callback *)
Definition: cronet_api_phony.cc:32
on_canceled
static void on_canceled(bidirectional_stream *)
Definition: cronet_transport.cc:472
stream_obj::storage
struct op_storage storage
Definition: cronet_transport.cc:222
grpc_core::SliceBuffer::Append
void Append(Slice slice)
Definition: slice/slice_buffer.cc:38
OP_SEND_TRAILING_METADATA
@ OP_SEND_TRAILING_METADATA
Definition: cronet_transport.cc:84
grpc_core::ExecCtx
Definition: exec_ctx.h:97
on_read_completed
static void on_read_completed(bidirectional_stream *, char *, int)
Definition: cronet_transport.cc:605
make_error_with_desc
static grpc_error_handle make_error_with_desc(int error_code, int cronet_internal_error_code, const char *desc)
Definition: cronet_transport.cc:319
stdint.h
remove_from_storage
static void remove_from_storage(struct stream_obj *s, struct op_and_state *oas)
Definition: cronet_transport.cc:359
bidirectional_stream_delay_request_headers_until_flush
void bidirectional_stream_delay_request_headers_until_flush(bidirectional_stream *, bool)
Definition: cronet_api_phony.cc:75
GRPC_SLICE_LENGTH
#define GRPC_SLICE_LENGTH(slice)
Definition: include/grpc/impl/codegen/slice.h:104
grpc_cronet_transport::host
char * host
Definition: cronet_transport.cc:125
grpc_core::TraceFlag
Definition: debug/trace.h:63
stream_engine
Definition: bidirectional_stream_c.h:25
e_op_result
e_op_result
Definition: cronet_transport.cc:75
value
const char * value
Definition: hpack_parser_table.cc:165
grpc_slice_to_c_string
GPRAPI char * grpc_slice_to_c_string(grpc_slice s)
Definition: slice/slice.cc:35
stream_obj::cbs
bidirectional_stream * cbs
Definition: cronet_transport.cc:213
grpc_transport_stream_op_batch::recv_initial_metadata
bool recv_initial_metadata
Definition: transport.h:319
count_
int * count_
Definition: connectivity_state_test.cc:65
stream_obj::state
struct op_state state
Definition: cronet_transport.cc:219
bidirectional_stream_destroy
int bidirectional_stream_destroy(bidirectional_stream *)
Definition: cronet_api_phony.cc:39
bidirectional_stream_header_array::headers
bidirectional_stream_header * headers
Definition: bidirectional_stream_c.h:48
GRPC_HEADER_SIZE_IN_BYTES
#define GRPC_HEADER_SIZE_IN_BYTES
Definition: cronet_transport.cc:66
stream_obj::refcount
grpc_stream_refcount * refcount
Definition: cronet_transport.cc:228
op_state::pending_write_for_trailer
bool pending_write_for_trailer
Definition: cronet_transport.cc:173
read_state::initial_metadata
grpc_metadata_batch initial_metadata
Definition: cronet_transport.cc:155
grpc_transport_stream_op_batch::send_trailing_metadata
bool send_trailing_metadata
Definition: transport.h:313
grpc_core::SliceBuffer
Definition: src/core/lib/slice/slice_buffer.h:44
GRPC_ERROR_REF
#define GRPC_ERROR_REF(err)
Definition: error.h:261
GRPC_WRITE_INTERNAL_COMPRESS
#define GRPC_WRITE_INTERNAL_COMPRESS
Definition: transport.h:75
debug_location.h
GRPC_CRONET_STREAM_REF
#define GRPC_CRONET_STREAM_REF(stream, reason)
Definition: cronet_transport.cc:232
key
const char * key
Definition: hpack_parser_table.cc:164
grpc_chttp2_base64_encode
grpc_slice grpc_chttp2_base64_encode(const grpc_slice &input)
Definition: bin_encoder.cc:52
grpc_transport_stream_op_batch::send_message
bool send_message
Definition: transport.h:316
grpc_error_set_int
grpc_error_handle grpc_error_set_int(grpc_error_handle src, grpc_error_ints which, intptr_t value)
Definition: error.cc:613
bidirectional_stream_header_array
struct bidirectional_stream_header_array bidirectional_stream_header_array
perform_stream_op
static void perform_stream_op(grpc_transport *, grpc_stream *gs, grpc_transport_stream_op_batch *op)
Definition: cronet_transport.cc:1415
absl::flags_internal
Definition: abseil-cpp/absl/flags/commandlineflag.h:40
on_response_trailers_received
static void on_response_trailers_received(bidirectional_stream *, const bidirectional_stream_header_array *)
Definition: cronet_transport.cc:644
gpr_mu
pthread_mutex_t gpr_mu
Definition: impl/codegen/sync_posix.h:47
header_has_authority
static bool header_has_authority(const grpc_metadata_batch *b)
Definition: cronet_transport.cc:819
count
int * count
Definition: bloaty/third_party/googletest/googlemock/test/gmock_stress_test.cc:96
grpc_core::HttpSchemeMetadata
Definition: metadata_batch.h:114
exec_ctx
grpc_core::ExecCtx exec_ctx
Definition: end2end_binder_transport_test.cc:75
grpc_core::HttpMethodMetadata::kPost
@ kPost
Definition: metadata_batch.h:139
grpc_transport::vtable
const grpc_transport_vtable * vtable
Definition: transport_impl.h:93
validate_metadata.h
grpc_core::HttpMethodMetadata::kGet
@ kGet
Definition: metadata_batch.h:140
bidirectional_stream_header_array
Definition: bidirectional_stream_c.h:45
read_state::trailing_metadata
grpc_metadata_batch trailing_metadata
Definition: cronet_transport.cc:151
bidirectional_stream_write
int bidirectional_stream_write(bidirectional_stream *, const char *, int, bool)
Definition: cronet_api_phony.cc:59
NO_ACTION_POSSIBLE
@ NO_ACTION_POSSIBLE
Definition: cronet_transport.cc:78
OP_CANCELED
@ OP_CANCELED
Definition: cronet_transport.cc:92
GRPC_ERROR_CREATE_FROM_CPP_STRING
#define GRPC_ERROR_CREATE_FROM_CPP_STRING(desc)
Definition: error.h:297
stream_obj::curr_op
grpc_transport_stream_op_batch * curr_op
Definition: cronet_transport.cc:210
alloc.h
next
AllocList * next[kMaxLevel]
Definition: abseil-cpp/absl/base/internal/low_level_alloc.cc:100
grpc_chttp2_base64_decode_with_length
grpc_slice grpc_chttp2_base64_decode_with_length(const grpc_slice &input, size_t output_length)
Definition: bin_decoder.cc:207
grpc_transport_stream_op_batch_payload::send_message
grpc_core::SliceBuffer * send_message
Definition: transport.h:373
op_state::op_state
op_state(grpc_core::Arena *arena)
Definition: cronet_transport.cc:164
grpc_transport_stream_op_batch_payload::recv_initial_metadata
grpc_metadata_batch * recv_initial_metadata
Definition: transport.h:390
stream_obj::curr_gs
grpc_stream * curr_gs
Definition: cronet_transport.cc:212
cronet_net_error_to_grpc_error
grpc_status_code cronet_net_error_to_grpc_error(cronet_net_error_code net_error)
Definition: cronet_status.cc:491
op_storage::head
struct op_and_state * head
Definition: cronet_transport.cc:200
bidirectional_stream_read
int bidirectional_stream_read(bidirectional_stream *, char *, int)
Definition: cronet_api_phony.cc:53
grpc_transport_stream_op_batch::send_initial_metadata
bool send_initial_metadata
Definition: transport.h:310
state
Definition: bloaty/third_party/zlib/contrib/blast/blast.c:41
grpc_transport_stream_op_batch_payload::recv_message
absl::optional< grpc_core::SliceBuffer > * recv_message
Definition: transport.h:416
exec_ctx.h
bidirectional_stream_flush
void bidirectional_stream_flush(bidirectional_stream *)
Definition: cronet_api_phony.cc:80
desc
#define desc
Definition: bloaty/third_party/protobuf/src/google/protobuf/extension_set.h:338
null_and_maybe_free_read_buffer
static void null_and_maybe_free_read_buffer(stream_obj *s)
Definition: cronet_transport.cc:301
slice_refcount.h
GRPC_ERROR_UNREF
#define GRPC_ERROR_UNREF(err)
Definition: error.h:262
OP_READ_REQ_MADE
@ OP_READ_REQ_MADE
Definition: cronet_transport.cc:94
grpc_core::ExecCtx::Run
static void Run(const DebugLocation &location, grpc_closure *closure, grpc_error_handle error)
Definition: exec_ctx.cc:98
stream_obj
Definition: cronet_transport.cc:203
op_state::pending_recv_trailing_metadata
bool pending_recv_trailing_metadata
Definition: cronet_transport.cc:176
grpc_transport
Definition: transport_impl.h:89
transport.h
stream_obj::~stream_obj
~stream_obj()
Definition: cronet_transport.cc:1396
op_state::state_callback_received
bool state_callback_received[OP_NUM_OPS]
Definition: cronet_transport.cc:167
grpc_stream
struct grpc_stream grpc_stream
Definition: transport.h:174
OP_ON_COMPLETE
@ OP_ON_COMPLETE
Definition: cronet_transport.cc:89
grpc_transport_stream_op_batch::recv_trailing_metadata
bool recv_trailing_metadata
Definition: transport.h:326
grpc_cronet_transport::engine
stream_engine * engine
Definition: cronet_transport.cc:124
GPR_DEBUG
#define GPR_DEBUG
Definition: include/grpc/impl/codegen/log.h:55
op_state
Definition: cronet_transport.cc:163
op_storage
Definition: cronet_transport.cc:198
stream_obj::arena
grpc_core::Arena * arena
Definition: cronet_transport.cc:208
grpc_cronet_stream_ref
void grpc_cronet_stream_ref(stream_obj *s, const char *reason)
Definition: cronet_transport.cc:236
bidirectional_stream_callback
Definition: bidirectional_stream_c.h:52
flags
uint32_t flags
Definition: retry_filter.cc:632
grpc_slice_buffer
Definition: include/grpc/impl/codegen/slice.h:83
transport_impl.h
grpc_transport_stream_op_batch_payload::recv_trailing_metadata
grpc_metadata_batch * recv_trailing_metadata
Definition: transport.h:425
ACTION_TAKEN_NO_CALLBACK
@ ACTION_TAKEN_NO_CALLBACK
Definition: cronet_transport.cc:77
read_state::read_slice_buffer
grpc_core::SliceBuffer read_slice_buffer
Definition: cronet_transport.cc:148
bidirectional_stream_header_array::count
size_t count
Definition: bidirectional_stream_c.h:46
iomgr_fwd.h
endpoint.h
op_state::flush_cronet_when_ready
bool flush_cronet_when_ready
Definition: cronet_transport.cc:172
grpc_error
Definition: error_internal.h:42
length
std::size_t length
Definition: abseil-cpp/absl/time/internal/test_util.cc:57
read_state::length_field_received
bool length_field_received
Definition: cronet_transport.cc:138
op_and_state::op_and_state
op_and_state(stream_obj *s, const grpc_transport_stream_op_batch &op)
Definition: cronet_transport.cc:328
op_state::pending_send_message
bool pending_send_message
Definition: cronet_transport.cc:174
ACTION_TAKEN_WITH_CALLBACK
@ ACTION_TAKEN_WITH_CALLBACK
Definition: cronet_transport.cc:76
method
NSString * method
Definition: ProtoMethod.h:28
grpc_metadata_batch
Definition: metadata_batch.h:1259
grpc_pollset
Definition: bm_cq_multiple_threads.cc:37
convert_metadata_to_cronet_headers
static void convert_metadata_to_cronet_headers(grpc_metadata_batch *metadata, const char *host, std::string *pp_url, bidirectional_stream_header **pp_headers, size_t *p_num_headers, const char **method)
Definition: cronet_transport.cc:798
grpc_transport_stream_op_batch
Definition: transport.h:284
sync.h
grpc_closure
Definition: closure.h:56
bin_decoder.h
op
static grpc_op * op
Definition: test/core/fling/client.cc:47
slice_buffer.h
grpc_stream_unref
void grpc_stream_unref(grpc_stream_refcount *refcount, const char *reason)
Definition: transport.h:220
absl::status_internal::storage
static ABSL_INTERNAL_ATOMIC_HOOK_ATTRIBUTES absl::base_internal::AtomicHook< StatusPayloadPrinter > storage
Definition: abseil-cpp/absl/status/status_payload_printer.cc:26
stream_obj::mu
gpr_mu mu
Definition: cronet_transport.cc:225
grpc_endpoint
Definition: endpoint.h:105
setup.target
target
Definition: third_party/bloaty/third_party/protobuf/python/setup.py:179
bin_encoder.h
e_op_id
e_op_id
Definition: cronet_transport.cc:81
grpc_core::HttpMethodMetadata::kInvalid
@ kInvalid
Definition: metadata_batch.h:142
absl::EndsWith
bool EndsWith(absl::string_view text, absl::string_view suffix) noexcept
Definition: third_party/abseil-cpp/absl/strings/match.h:68
read_state
Definition: cronet_transport.cc:132
grpc_stream_refcount
Definition: transport.h:178
i
uint64_t i
Definition: abseil-cpp/absl/container/btree_benchmark.cc:230
GRPC_ERROR_INT_GRPC_STATUS
@ GRPC_ERROR_INT_GRPC_STATUS
grpc status code representing this error
Definition: error.h:66
offset
voidpf uLong offset
Definition: bloaty/third_party/zlib/contrib/minizip/ioapi.h:142
grpc_slice_unref_internal
void grpc_slice_unref_internal(const grpc_slice &slice)
Definition: slice_refcount.h:39
set_pollset_do_nothing
static void set_pollset_do_nothing(grpc_transport *, grpc_stream *, grpc_pollset *)
Definition: cronet_transport.cc:1408
stream_obj::stream_obj
stream_obj(grpc_transport *gt, grpc_stream *gs, grpc_stream_refcount *refcount, grpc_core::Arena *arena)
Definition: cronet_transport.cc:1384
grpc_transport_stream_op_batch_payload::cancel_stream
struct grpc_transport_stream_op_batch_payload::@46 cancel_stream
op_and_state::done
bool done
Definition: cronet_transport.cc:192
stream_obj::curr_ct
grpc_cronet_transport * curr_ct
Definition: cronet_transport.cc:211
GRPC_FLUSH_READ_SIZE
#define GRPC_FLUSH_READ_SIZE
Definition: cronet_transport.cc:67
GRPC_ERROR_IS_NONE
#define GRPC_ERROR_IS_NONE(err)
Definition: error.h:241
OP_SUCCEEDED
@ OP_SUCCEEDED
Definition: cronet_transport.cc:91
port_platform.h
stream
voidpf stream
Definition: bloaty/third_party/zlib/contrib/minizip/ioapi.h:136


grpc
Author(s):
autogenerated on Fri May 16 2025 02:58:06