binder_transport.cc
Go to the documentation of this file.
1 // Copyright 2021 gRPC authors.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
16 
18 
19 #ifndef GRPC_NO_BINDER
20 
21 #include <cstdint>
22 #include <memory>
23 #include <string>
24 #include <utility>
25 
26 #include "absl/memory/memory.h"
27 #include "absl/strings/str_cat.h"
28 #include "absl/strings/substitute.h"
29 
30 #include <grpc/support/log.h>
31 
43 
44 #ifndef NDEBUG
45 static void grpc_binder_stream_ref(grpc_binder_stream* s, const char* reason) {
46  grpc_stream_ref(s->refcount, reason);
47 }
49  const char* reason) {
50  grpc_stream_unref(s->refcount, reason);
51 }
53  const char* reason, const char* file,
54  int line) {
55  t->refs.Ref(grpc_core::DebugLocation(file, line), reason);
56 }
58  const char* reason, const char* file,
59  int line) {
60  if (t->refs.Unref(grpc_core::DebugLocation(file, line), reason)) {
61  delete t;
62  }
63 }
64 #else
66  grpc_stream_ref(s->refcount);
67 }
69  grpc_stream_unref(s->refcount);
70 }
72  t->refs.Ref();
73 }
75  if (t->refs.Unref()) {
76  delete t;
77  }
78 }
79 #endif
80 
81 #ifndef NDEBUG
82 #define GRPC_BINDER_STREAM_REF(stream, reason) \
83  grpc_binder_stream_ref(stream, reason)
84 #define GRPC_BINDER_STREAM_UNREF(stream, reason) \
85  grpc_binder_stream_unref(stream, reason)
86 #define GRPC_BINDER_REF_TRANSPORT(t, r) \
87  grpc_binder_ref_transport(t, r, __FILE__, __LINE__)
88 #define GRPC_BINDER_UNREF_TRANSPORT(t, r) \
89  grpc_binder_unref_transport(t, r, __FILE__, __LINE__)
90 #else
91 #define GRPC_BINDER_STREAM_REF(stream, reason) grpc_binder_stream_ref(stream)
92 #define GRPC_BINDER_STREAM_UNREF(stream, reason) \
93  grpc_binder_stream_unref(stream)
94 #define GRPC_BINDER_REF_TRANSPORT(t, r) grpc_binder_ref_transport(t)
95 #define GRPC_BINDER_UNREF_TRANSPORT(t, r) grpc_binder_unref_transport(t)
96 #endif
97 
98 static void register_stream_locked(void* arg, grpc_error_handle /*error*/) {
100  args->gbt->registered_stream[args->gbs->GetTxCode()] = args->gbs;
101 }
102 
104  grpc_stream_refcount* refcount, const void* server_data,
106  GPR_TIMER_SCOPE("init_stream", 0);
107  gpr_log(GPR_INFO, "%s = %p %p %p %p %p", __func__, gt, gs, refcount,
108  server_data, arena);
109  // Note that this function is not locked and may be invoked concurrently
110  grpc_binder_transport* t = reinterpret_cast<grpc_binder_transport*>(gt);
111  new (gs) grpc_binder_stream(t, refcount, server_data, arena,
112  t->NewStreamTxCode(), t->is_client);
113 
114  // `grpc_binder_transport::registered_stream` should only be updated in
115  // combiner
116  grpc_binder_stream* gbs = reinterpret_cast<grpc_binder_stream*>(gs);
117  gbs->register_stream_args.gbs = gbs;
118  gbs->register_stream_args.gbt = t;
120  t->combiner->Run(
122  &gbs->register_stream_args, nullptr),
124 
125  return 0;
126 }
127 
129  gpr_log(GPR_INFO, "%s = %p %p %p", __func__, gt, gs, gp);
130 }
131 
133  gpr_log(GPR_INFO, __func__);
134 }
135 
137  const grpc_binder::Metadata& md) {
138  mb->Clear();
139  for (auto& p : md) {
140  mb->Append(p.first, grpc_core::Slice::FromCopiedString(p.second),
142  gpr_log(
143  GPR_DEBUG, "Failed to parse metadata: %s",
144  absl::StrCat("key=", p.first, " error=", error).c_str());
145  });
146  }
147 }
148 
150  grpc_binder_stream* gbs,
152  gpr_log(GPR_INFO, "cancel_stream_locked");
153  if (!gbs->is_closed) {
155  gbs->is_closed = true;
157  gbt->transport_stream_receiver->CancelStream(gbs->tx_code);
158  gbt->registered_stream.erase(gbs->tx_code);
159  if (gbs->recv_initial_metadata_ready != nullptr) {
162  gbs->recv_initial_metadata_ready = nullptr;
163  gbs->recv_initial_metadata = nullptr;
164  gbs->trailing_metadata_available = nullptr;
165  }
166  if (gbs->recv_message_ready != nullptr) {
169  gbs->recv_message_ready = nullptr;
170  gbs->recv_message->reset();
171  gbs->recv_message = nullptr;
172  gbs->call_failed_before_recv_message = nullptr;
173  }
174  if (gbs->recv_trailing_metadata_finished != nullptr) {
178  gbs->recv_trailing_metadata_finished = nullptr;
179  gbs->recv_trailing_metadata = nullptr;
180  }
181  }
183 }
184 
186  bool has_authority = false;
187  bool has_path = false;
188  for (const auto& kv : metadata) {
189  if (kv.first == ":authority") {
190  has_authority = true;
191  }
192  if (kv.first == ":path") {
193  has_path = true;
194  }
195  }
196  return has_authority && has_path;
197 }
198 
200  grpc_error_handle /*error*/) {
202  grpc_binder_stream* gbs = args->gbs;
203 
205  "recv_initial_metadata_locked is_client = %d is_closed = %d",
206  gbs->is_client, gbs->is_closed);
207 
208  if (!gbs->is_closed) {
209  grpc_error_handle error = [&] {
212  if (!args->initial_metadata.ok()) {
213  gpr_log(GPR_ERROR, "Failed to parse initial metadata");
214  return absl_status_to_grpc_error(args->initial_metadata.status());
215  }
216  if (!gbs->is_client) {
217  // For server, we expect :authority and :path in initial metadata.
218  if (!ContainsAuthorityAndPath(*args->initial_metadata)) {
220  "Missing :authority or :path in initial metadata");
221  }
222  }
223  AssignMetadata(gbs->recv_initial_metadata, *args->initial_metadata);
224  return GRPC_ERROR_NONE;
225  }();
226 
228  gbs->recv_initial_metadata_ready = nullptr;
229  gbs->recv_initial_metadata = nullptr;
231  }
232  GRPC_BINDER_STREAM_UNREF(gbs, "recv_initial_metadata");
233 }
234 
235 static void recv_message_locked(void* arg, grpc_error_handle /*error*/) {
236  RecvMessageArgs* args = static_cast<RecvMessageArgs*>(arg);
237  grpc_binder_stream* gbs = args->gbs;
238 
239  gpr_log(GPR_INFO, "recv_message_locked is_client = %d is_closed = %d",
240  gbs->is_client, gbs->is_closed);
241 
242  if (!gbs->is_closed) {
243  grpc_error_handle error = [&] {
244  GPR_ASSERT(gbs->recv_message);
246  if (!args->message.ok()) {
247  gpr_log(GPR_ERROR, "Failed to receive message");
248  if (args->message.status().message() ==
250  kGrpcBinderTransportCancelledGracefully) {
251  gpr_log(GPR_ERROR, "message cancelled gracefully");
252  // Cancelled because we've already received trailing metadata.
253  // It's not an error in this case.
254  return GRPC_ERROR_NONE;
255  } else {
256  return absl_status_to_grpc_error(args->message.status());
257  }
258  }
261  *gbs->recv_message = std::move(buf);
262  return GRPC_ERROR_NONE;
263  }();
264 
265  if (!GRPC_ERROR_IS_NONE(error) &&
266  gbs->call_failed_before_recv_message != nullptr) {
267  *gbs->call_failed_before_recv_message = true;
268  }
270  gbs->recv_message_ready = nullptr;
271  gbs->recv_message = nullptr;
273  }
274 
275  GRPC_BINDER_STREAM_UNREF(gbs, "recv_message");
276 }
277 
279  grpc_error_handle /*error*/) {
281  grpc_binder_stream* gbs = args->gbs;
282 
284  "recv_trailing_metadata_locked is_client = %d is_closed = %d",
285  gbs->is_client, gbs->is_closed);
286 
287  if (!gbs->is_closed) {
288  grpc_error_handle error = [&] {
291  if (!args->trailing_metadata.ok()) {
292  gpr_log(GPR_ERROR, "Failed to receive trailing metadata");
293  return absl_status_to_grpc_error(args->trailing_metadata.status());
294  }
295  if (!gbs->is_client) {
296  // Client will not send non-empty trailing metadata.
297  if (!args->trailing_metadata.value().empty()) {
298  gpr_log(GPR_ERROR, "Server receives non-empty trailing metadata.");
299  return GRPC_ERROR_CANCELLED;
300  }
301  } else {
302  AssignMetadata(gbs->recv_trailing_metadata, *args->trailing_metadata);
303  // Append status to metadata
304  // TODO(b/192208695): See if we can avoid to manually put status
305  // code into the header
306  gpr_log(GPR_INFO, "status = %d", args->status);
309  static_cast<grpc_status_code>(args->status));
310  }
311  return GRPC_ERROR_NONE;
312  }();
313 
314  if (gbs->is_client || gbs->trailing_metadata_sent) {
316  gbs->recv_trailing_metadata_finished = nullptr;
317  gbs->recv_trailing_metadata = nullptr;
319  } else {
320  // According to transport explaineer - "Server extra: This op shouldn't
321  // actually be considered complete until the server has also sent trailing
322  // metadata to provide the other side with final status"
323  //
324  // We haven't sent trailing metadata yet, so we have to delay completing
325  // the recv_trailing_metadata callback.
327  }
328  }
329  GRPC_BINDER_STREAM_UNREF(gbs, "recv_trailing_metadata");
330 }
331 
332 namespace grpc_binder {
333 namespace {
334 
335 class MetadataEncoder {
336  public:
337  MetadataEncoder(bool is_client, Transaction* tx, Metadata* init_md)
338  : is_client_(is_client), tx_(tx), init_md_(init_md) {}
339 
340  void Encode(const grpc_core::Slice& key_slice,
341  const grpc_core::Slice& value_slice) {
342  absl::string_view key = key_slice.as_string_view();
343  absl::string_view value = value_slice.as_string_view();
344  init_md_->emplace_back(std::string(key), std::string(value));
345  }
346 
347  void Encode(grpc_core::HttpPathMetadata, const grpc_core::Slice& value) {
348  // TODO(b/192208403): Figure out if it is correct to simply drop '/'
349  // prefix and treat it as rpc method name
350  GPR_ASSERT(value[0] == '/');
351  std::string path = std::string(value.as_string_view().substr(1));
352 
353  // Only client send method ref.
355  tx_->SetMethodRef(path);
356  }
357 
359  gpr_log(GPR_INFO, "send trailing metadata status = %d", status);
360  tx_->SetStatus(status);
361  }
362 
363  template <typename Trait>
364  void Encode(Trait, const typename Trait::ValueType& value) {
365  init_md_->emplace_back(std::string(Trait::key()),
366  std::string(Trait::Encode(value).as_string_view()));
367  }
368 
369  private:
370  const bool is_client_;
371  Transaction* const tx_;
373 };
374 
375 } // namespace
376 } // namespace grpc_binder
377 
378 static void perform_stream_op_locked(void* stream_op,
379  grpc_error_handle /*error*/) {
381  static_cast<grpc_transport_stream_op_batch*>(stream_op);
382  grpc_binder_stream* gbs =
383  static_cast<grpc_binder_stream*>(op->handler_private.extra_arg);
384  grpc_binder_transport* gbt = gbs->t;
385  if (op->cancel_stream) {
386  // TODO(waynetu): Is this true?
387  GPR_ASSERT(!op->send_initial_metadata && !op->send_message &&
388  !op->send_trailing_metadata && !op->recv_initial_metadata &&
389  !op->recv_message && !op->recv_trailing_metadata);
390  gpr_log(GPR_INFO, "cancel_stream is_client = %d", gbs->is_client);
391  if (!gbs->is_client) {
392  // Send trailing metadata to inform the other end about the cancellation,
393  // regardless if we'd already done that or not.
394  auto cancel_tx = absl::make_unique<grpc_binder::Transaction>(
395  gbs->GetTxCode(), gbt->is_client);
396  cancel_tx->SetSuffix(grpc_binder::Metadata{});
397  cancel_tx->SetStatus(1);
398  absl::Status status = gbt->wire_writer->RpcCall(std::move(cancel_tx));
399  }
400  cancel_stream_locked(gbt, gbs, op->payload->cancel_stream.cancel_error);
401  if (op->on_complete != nullptr) {
403  }
404  GRPC_BINDER_STREAM_UNREF(gbs, "perform_stream_op");
405  return;
406  }
407 
408  if (gbs->is_closed) {
409  if (op->send_message) {
410  // Reset the send_message payload to prevent memory leaks.
411  op->payload->send_message.send_message->Clear();
412  }
413  if (op->recv_initial_metadata) {
416  op->payload->recv_initial_metadata.recv_initial_metadata_ready,
418  }
419  if (op->recv_message) {
421  op->payload->recv_message.recv_message_ready,
423  }
424  if (op->recv_trailing_metadata) {
427  op->payload->recv_trailing_metadata.recv_trailing_metadata_ready,
429  }
430  if (op->on_complete != nullptr) {
433  }
434  GRPC_BINDER_STREAM_UNREF(gbs, "perform_stream_op");
435  return;
436  }
437 
438  int tx_code = gbs->tx_code;
439  auto tx =
440  absl::make_unique<grpc_binder::Transaction>(tx_code, gbt->is_client);
441 
442  if (op->send_initial_metadata) {
443  gpr_log(GPR_INFO, "send_initial_metadata");
444  grpc_binder::Metadata init_md;
445  auto batch = op->payload->send_initial_metadata.send_initial_metadata;
446 
447  grpc_binder::MetadataEncoder encoder(gbt->is_client, tx.get(), &init_md);
448  batch->Encode(&encoder);
449  tx->SetPrefix(init_md);
450  }
451  if (op->send_message) {
452  gpr_log(GPR_INFO, "send_message");
453  tx->SetData(op->payload->send_message.send_message->JoinIntoString());
454  }
455 
456  if (op->send_trailing_metadata) {
457  gpr_log(GPR_INFO, "send_trailing_metadata");
458  auto batch = op->payload->send_trailing_metadata.send_trailing_metadata;
460 
461  grpc_binder::MetadataEncoder encoder(gbt->is_client, tx.get(),
463  batch->Encode(&encoder);
464 
465  // TODO(mingcl): Will we ever has key-value pair here? According to
466  // wireformat client suffix data is always empty.
467  tx->SetSuffix(trailing_metadata);
468  }
469  if (op->recv_initial_metadata) {
470  gpr_log(GPR_INFO, "recv_initial_metadata");
472  op->payload->recv_initial_metadata.recv_initial_metadata_ready;
473  gbs->recv_initial_metadata =
474  op->payload->recv_initial_metadata.recv_initial_metadata;
476  op->payload->recv_initial_metadata.trailing_metadata_available;
477  GRPC_BINDER_STREAM_REF(gbs, "recv_initial_metadata");
478  gbt->transport_stream_receiver->RegisterRecvInitialMetadata(
479  tx_code, [tx_code, gbs,
480  gbt](absl::StatusOr<grpc_binder::Metadata> initial_metadata) {
484  std::move(initial_metadata);
485  gbt->combiner->Run(
488  &gbs->recv_initial_metadata_args, nullptr),
490  });
491  }
492  if (op->recv_message) {
493  gpr_log(GPR_INFO, "recv_message");
494  gbs->recv_message_ready = op->payload->recv_message.recv_message_ready;
495  gbs->recv_message = op->payload->recv_message.recv_message;
497  op->payload->recv_message.call_failed_before_recv_message;
498  GRPC_BINDER_STREAM_REF(gbs, "recv_message");
499  gbt->transport_stream_receiver->RegisterRecvMessage(
504  gbt->combiner->Run(
506  &gbs->recv_message_args, nullptr),
508  });
509  }
510  if (op->recv_trailing_metadata) {
511  gpr_log(GPR_INFO, "recv_trailing_metadata");
513  op->payload->recv_trailing_metadata.recv_trailing_metadata_ready;
515  op->payload->recv_trailing_metadata.recv_trailing_metadata;
516  GRPC_BINDER_STREAM_REF(gbs, "recv_trailing_metadata");
517  gbt->transport_stream_receiver->RegisterRecvTrailingMetadata(
518  tx_code, [tx_code, gbs, gbt](
520  int status) {
526  gbt->combiner->Run(
529  &gbs->recv_trailing_metadata_args, nullptr),
531  });
532  }
533  // Only send transaction when there's a send op presented.
535  if (op->send_initial_metadata || op->send_message ||
536  op->send_trailing_metadata) {
537  status = gbt->wire_writer->RpcCall(std::move(tx));
538  if (!gbs->is_client && op->send_trailing_metadata) {
539  gbs->trailing_metadata_sent = true;
540  // According to transport explaineer - "Server extra: This op shouldn't
541  // actually be considered complete until the server has also sent trailing
542  // metadata to provide the other side with final status"
543  //
544  // Because we've done sending trailing metadata here, we can safely
545  // complete the recv_trailing_metadata callback here.
548  gbs->recv_trailing_metadata_finished = nullptr;
551  }
552  }
553  }
554  // Note that this should only be scheduled when all non-recv ops are
555  // completed
556  if (op->on_complete != nullptr) {
559  gpr_log(GPR_INFO, "on_complete closure schuduled");
560  }
561  GRPC_BINDER_STREAM_UNREF(gbs, "perform_stream_op");
562 }
563 
566  GPR_TIMER_SCOPE("perform_stream_op", 0);
567  grpc_binder_transport* gbt = reinterpret_cast<grpc_binder_transport*>(gt);
568  grpc_binder_stream* gbs = reinterpret_cast<grpc_binder_stream*>(gs);
569  gpr_log(GPR_INFO, "%s = %p %p %p is_client = %d", __func__, gt, gs, op,
570  gbs->is_client);
571  GRPC_BINDER_STREAM_REF(gbs, "perform_stream_op");
572  op->handler_private.extra_arg = gbs;
573  gbt->combiner->Run(GRPC_CLOSURE_INIT(&op->handler_private.closure,
574  perform_stream_op_locked, op, nullptr),
576 }
577 
580  "transport closed due to disconnection/goaway");
581  while (!gbt->registered_stream.empty()) {
583  gbt, gbt->registered_stream.begin()->second,
585  GRPC_ERROR_CREATE_FROM_STATIC_STRING("transport closed"),
587  }
588 }
589 
590 static void perform_transport_op_locked(void* transport_op,
591  grpc_error_handle /*error*/) {
592  grpc_transport_op* op = static_cast<grpc_transport_op*>(transport_op);
593  grpc_binder_transport* gbt =
594  static_cast<grpc_binder_transport*>(op->handler_private.extra_arg);
595  // TODO(waynetu): Should we lock here to avoid data race?
596  if (op->start_connectivity_watch != nullptr) {
597  gbt->state_tracker.AddWatcher(op->start_connectivity_watch_state,
598  std::move(op->start_connectivity_watch));
599  }
600  if (op->stop_connectivity_watch != nullptr) {
601  gbt->state_tracker.RemoveWatcher(op->stop_connectivity_watch);
602  }
603  if (op->set_accept_stream) {
604  gbt->accept_stream_fn = op->set_accept_stream_fn;
605  gbt->accept_stream_user_data = op->set_accept_stream_user_data;
606  }
607  if (op->on_consumed) {
609  }
610  bool do_close = false;
611  if (!GRPC_ERROR_IS_NONE(op->disconnect_with_error)) {
612  do_close = true;
613  GRPC_ERROR_UNREF(op->disconnect_with_error);
614  }
615  if (!GRPC_ERROR_IS_NONE(op->goaway_error)) {
616  do_close = true;
617  GRPC_ERROR_UNREF(op->goaway_error);
618  }
619  if (do_close) {
621  }
622  GRPC_BINDER_UNREF_TRANSPORT(gbt, "perform_transport_op");
623 }
624 
626  gpr_log(GPR_INFO, __func__);
627  grpc_binder_transport* gbt = reinterpret_cast<grpc_binder_transport*>(gt);
628  op->handler_private.extra_arg = gbt;
629  GRPC_BINDER_REF_TRANSPORT(gbt, "perform_transport_op");
630  gbt->combiner->Run(
631  GRPC_CLOSURE_INIT(&op->handler_private.closure,
632  perform_transport_op_locked, op, nullptr),
634 }
635 
636 static void destroy_stream_locked(void* sp, grpc_error_handle /*error*/) {
637  grpc_binder_stream* gbs = static_cast<grpc_binder_stream*>(sp);
638  grpc_binder_transport* gbt = gbs->t;
640  gbt, gbs,
643  gbs->~grpc_binder_stream();
644 }
645 
646 static void destroy_stream(grpc_transport* /*gt*/, grpc_stream* gs,
647  grpc_closure* then_schedule_closure) {
648  gpr_log(GPR_INFO, __func__);
649  grpc_binder_stream* gbs = reinterpret_cast<grpc_binder_stream*>(gs);
650  gbs->destroy_stream_then_closure = then_schedule_closure;
652  destroy_stream_locked, gbs, nullptr),
654 }
655 
656 static void destroy_transport_locked(void* gt, grpc_error_handle /*error*/) {
657  grpc_binder_transport* gbt = static_cast<grpc_binder_transport*>(gt);
659  // Release the references held by the transport.
660  gbt->wire_reader = nullptr;
661  gbt->transport_stream_receiver = nullptr;
662  gbt->wire_writer = nullptr;
663  GRPC_BINDER_UNREF_TRANSPORT(gbt, "transport destroyed");
664 }
665 
667  gpr_log(GPR_INFO, __func__);
668  grpc_binder_transport* gbt = reinterpret_cast<grpc_binder_transport*>(gt);
669  gbt->combiner->Run(
672 }
673 
675  gpr_log(GPR_INFO, __func__);
676  return nullptr;
677 }
678 
679 // See grpc_transport_vtable declaration for meaning of each field
681  "binder",
682  init_stream,
683  nullptr,
684  set_pollset,
690  get_endpoint};
691 
692 static const grpc_transport_vtable* get_vtable() { return &vtable; }
693 
694 static void accept_stream_locked(void* gt, grpc_error_handle /*error*/) {
695  grpc_binder_transport* gbt = static_cast<grpc_binder_transport*>(gt);
696  if (gbt->accept_stream_fn) {
697  // must pass in a non-null value.
698  (*gbt->accept_stream_fn)(gbt->accept_stream_user_data, &gbt->base, gbt);
699  }
700 }
701 
703  std::unique_ptr<grpc_binder::Binder> binder, bool is_client,
704  std::shared_ptr<grpc::experimental::binder::SecurityPolicy> security_policy)
705  : is_client(is_client),
706  combiner(grpc_combiner_create()),
707  state_tracker(
708  is_client ? "binder_transport_client" : "binder_transport_server",
710  refs(1, nullptr) {
711  gpr_log(GPR_INFO, __func__);
712  base.vtable = get_vtable();
714  std::make_shared<grpc_binder::TransportStreamReceiverImpl>(
715  is_client, /*accept_stream_callback=*/[this] {
717  combiner->Run(
720  });
721  // WireReader holds a ref to grpc_binder_transport.
722  GRPC_BINDER_REF_TRANSPORT(this, "wire reader");
723  wire_reader = grpc_core::MakeOrphanable<grpc_binder::WireReaderImpl>(
724  transport_stream_receiver, is_client, security_policy,
725  /*on_destruct_callback=*/
726  [this] {
727  // Unref transport when destructed.
728  GRPC_BINDER_UNREF_TRANSPORT(this, "wire reader");
729  });
730  wire_writer = wire_reader->SetupTransport(std::move(binder));
731 }
732 
734  GRPC_COMBINER_UNREF(combiner, "binder_transport");
735 }
736 
738  std::unique_ptr<grpc_binder::Binder> endpoint_binder,
739  std::shared_ptr<grpc::experimental::binder::SecurityPolicy>
740  security_policy) {
741  gpr_log(GPR_INFO, __func__);
742 
743  GPR_ASSERT(endpoint_binder != nullptr);
744  GPR_ASSERT(security_policy != nullptr);
745 
747  std::move(endpoint_binder), /*is_client=*/true, security_policy);
748 
749  return &t->base;
750 }
751 
753  std::unique_ptr<grpc_binder::Binder> client_binder,
754  std::shared_ptr<grpc::experimental::binder::SecurityPolicy>
755  security_policy) {
756  gpr_log(GPR_INFO, __func__);
757 
758  GPR_ASSERT(client_binder != nullptr);
759  GPR_ASSERT(security_policy != nullptr);
760 
762  std::move(client_binder), /*is_client=*/false, security_policy);
763 
764  return &t->base;
765 }
766 #endif
GRPC_CLOSURE_INIT
#define GRPC_CLOSURE_INIT(closure, cb, cb_arg, scheduler)
Definition: closure.h:115
grpc_binder_transport::~grpc_binder_transport
~grpc_binder_transport()
Definition: binder_transport.cc:733
GPR_INFO
#define GPR_INFO
Definition: include/grpc/impl/codegen/log.h:56
grpc_core::ConnectivityStateTracker::RemoveWatcher
void RemoveWatcher(ConnectivityStateWatcherInterface *watcher)
Definition: connectivity_state.cc:145
GRPC_CHANNEL_READY
@ GRPC_CHANNEL_READY
Definition: include/grpc/impl/codegen/connectivity_state.h:36
vtable
static const grpc_transport_vtable vtable
Definition: binder_transport.cc:680
grpc_core::MetadataMap::Set
absl::enable_if_t< Which::kRepeatable==false, void > Set(Which, Args &&... args)
Definition: metadata_batch.h:1075
GRPC_ERROR_NONE
#define GRPC_ERROR_NONE
Definition: error.h:234
log.h
grpc_binder_transport::base
grpc_transport base
Definition: binder_transport.h:60
metadata_batch.h
GRPC_STATUS_UNAVAILABLE
@ GRPC_STATUS_UNAVAILABLE
Definition: include/grpc/impl/codegen/status.h:143
grpc_binder_stream::recv_message
absl::optional< grpc_core::SliceBuffer > * recv_message
Definition: binder_stream.h:108
grpc_core::MetadataMap::Append
void Append(absl::string_view key, Slice value, MetadataParseErrorFn on_error)
Definition: metadata_batch.h:1156
grpc_core::DebugLocation
Definition: debug_location.h:31
grpc_binder_stream_unref
static void grpc_binder_stream_unref(grpc_binder_stream *s, const char *reason)
Definition: binder_transport.cc:48
tx_
Transaction *const tx_
Definition: binder_transport.cc:371
metadata
Definition: cq_verifier.cc:48
grpc_binder_transport::state_tracker
grpc_core::ConnectivityStateTracker state_tracker
Definition: binder_transport.h:78
grpc_transport_vtable
Definition: transport_impl.h:37
recv_initial_metadata_locked
static void recv_initial_metadata_locked(void *arg, grpc_error_handle)
Definition: binder_transport.cc:199
grpc_binder_stream::is_client
const bool is_client
Definition: binder_stream.h:84
RecvTrailingMetadataArgs::trailing_metadata
absl::StatusOr< grpc_binder::Metadata > trailing_metadata
Definition: binder_stream.h:40
GPR_TIMER_SCOPE
#define GPR_TIMER_SCOPE(tag, important)
Definition: src/core/lib/profiling/timers.h:43
init_md_
Metadata *const init_md_
Definition: binder_transport.cc:372
grpc_core::Slice
Definition: src/core/lib/slice/slice.h:282
grpc_core::HttpPathMetadata
Definition: metadata_batch.h:262
grpc_pollset_set
struct grpc_pollset_set grpc_pollset_set
Definition: iomgr_fwd.h:23
grpc_binder_stream::recv_trailing_metadata
grpc_metadata_batch * recv_trailing_metadata
Definition: binder_stream.h:111
grpc_binder_stream::trailing_metadata_available
bool * trailing_metadata_available
Definition: binder_stream.h:107
buf
voidpf void * buf
Definition: bloaty/third_party/zlib/contrib/minizip/ioapi.h:136
absl::string_view
Definition: abseil-cpp/absl/strings/string_view.h:167
grpc_binder
Definition: connection_id_generator.cc:45
RecvTrailingMetadataArgs::status
int status
Definition: binder_stream.h:41
testing::internal::string
::std::string string
Definition: bloaty/third_party/protobuf/third_party/googletest/googletest/include/gtest/internal/gtest-port.h:881
error
grpc_error_handle error
Definition: retry_filter.cc:499
grpc_status_code
grpc_status_code
Definition: include/grpc/impl/codegen/status.h:28
GRPC_BINDER_STREAM_REF
#define GRPC_BINDER_STREAM_REF(stream, reason)
Definition: binder_transport.cc:82
grpc_binder_stream::cancel_self_error
grpc_error_handle cancel_self_error
Definition: binder_stream.h:91
absl::OkStatus
Status OkStatus()
Definition: third_party/abseil-cpp/absl/status/status.h:882
file
Definition: bloaty/third_party/zlib/examples/gzappend.c:170
grpc_binder_stream::recv_message_ready
grpc_closure * recv_message_ready
Definition: binder_stream.h:109
grpc_core::ConnectivityStateTracker::AddWatcher
void AddWatcher(grpc_connectivity_state initial_state, OrphanablePtr< ConnectivityStateWatcherInterface > watcher)
Definition: connectivity_state.cc:120
status
absl::Status status
Definition: rls.cc:251
RecvTrailingMetadataArgs
Definition: binder_stream.h:36
grpc_binder_stream::tx_code
int tx_code
Definition: binder_stream.h:83
absl::FormatConversionChar::s
@ s
check_documentation.path
path
Definition: check_documentation.py:57
xds_manager.p
p
Definition: xds_manager.py:60
GRPC_CLOSURE_CREATE
#define GRPC_CLOSURE_CREATE(cb, cb_arg, scheduler)
Definition: closure.h:160
tx_code
int tx_code
Definition: fake_binder_test.cc:241
GRPC_ERROR_CANCELLED
#define GRPC_ERROR_CANCELLED
Definition: error.h:238
grpc_create_binder_transport_client
grpc_transport * grpc_create_binder_transport_client(std::unique_ptr< grpc_binder::Binder > endpoint_binder, std::shared_ptr< grpc::experimental::binder::SecurityPolicy > security_policy)
Definition: binder_transport.cc:737
grpc_binder_stream::recv_initial_metadata_args
RecvInitialMetadataArgs recv_initial_metadata_args
Definition: binder_stream.h:94
do_close
static void do_close(void *handle)
Definition: test-ref.c:47
grpc_core::Arena
Definition: src/core/lib/resource_quota/arena.h:45
RegisterStreamArgs::gbt
grpc_binder_transport * gbt
Definition: binder_stream.h:46
recv_message_locked
static void recv_message_locked(void *arg, grpc_error_handle)
Definition: binder_transport.cc:235
message
char * message
Definition: libuv/docs/code/tty-gravity/main.c:12
grpc_binder_stream::GetTxCode
int GetTxCode() const
Definition: binder_stream.h:78
grpc_transport_op
Definition: transport.h:452
arena
grpc_core::ScopedArenaPtr arena
Definition: binder_transport_test.cc:237
destroy_transport_locked
static void destroy_transport_locked(void *gt, grpc_error_handle)
Definition: binder_transport.cc:656
wire_reader.h
get_endpoint
static grpc_endpoint * get_endpoint(grpc_transport *)
Definition: binder_transport.cc:674
DEBUG_LOCATION
#define DEBUG_LOCATION
Definition: debug_location.h:41
grpc_binder_transport::transport_stream_receiver
std::shared_ptr< grpc_binder::TransportStreamReceiver > transport_stream_receiver
Definition: binder_transport.h:63
set_pollset_set
static void set_pollset_set(grpc_transport *, grpc_stream *, grpc_pollset_set *)
Definition: binder_transport.cc:132
destroy_stream_locked
static void destroy_stream_locked(void *sp, grpc_error_handle)
Definition: binder_transport.cc:636
RegisterStreamArgs
Definition: binder_stream.h:44
refcount
size_t refcount
Definition: abseil-cpp/absl/strings/internal/cordz_info.cc:122
grpc_binder_stream
Definition: binder_stream.h:50
grpc_combiner_create
grpc_core::Combiner * grpc_combiner_create(void)
Definition: combiner.cc:54
refs
std::vector< CordRep * > refs
Definition: cordz_info_statistics_test.cc:81
grpc_core::slice_detail::CopyConstructors< Slice >::FromCopiedString
static Slice FromCopiedString(const char *s)
Definition: src/core/lib/slice/slice.h:173
grpc_core::ConnectivityStateTracker::SetState
void SetState(grpc_connectivity_state state, const absl::Status &status, const char *reason)
Definition: connectivity_state.cc:154
grpc_status._async.trailing_metadata
trailing_metadata
Definition: grpcio_status/grpc_status/_async.py:36
asyncio_get_stats.args
args
Definition: asyncio_get_stats.py:40
grpc_binder_stream::recv_trailing_metadata_finished
grpc_closure * recv_trailing_metadata_finished
Definition: binder_stream.h:112
grpc_binder_stream::recv_message_args
RecvMessageArgs recv_message_args
Definition: binder_stream.h:96
absl::move
constexpr absl::remove_reference_t< T > && move(T &&t) noexcept
Definition: abseil-cpp/absl/utility/utility.h:221
GPR_ASSERT
#define GPR_ASSERT(x)
Definition: include/grpc/impl/codegen/log.h:94
perform_stream_op
static void perform_stream_op(grpc_transport *gt, grpc_stream *gs, grpc_transport_stream_op_batch *op)
Definition: binder_transport.cc:564
wire_reader_impl.h
gpr_log
GPRAPI void gpr_log(const char *file, int line, gpr_log_severity severity, const char *format,...) GPR_PRINT_FORMAT_CHECK(4
grpc_binder_transport::registered_stream
absl::flat_hash_map< int, grpc_binder_stream * > registered_stream
Definition: binder_transport.h:69
grpc_binder_transport::is_client
bool is_client
Definition: binder_transport.h:67
get_vtable
static const grpc_transport_vtable * get_vtable()
Definition: binder_transport.cc:692
destroy_transport
static void destroy_transport(grpc_transport *gt)
Definition: binder_transport.cc:666
grpc_binder_stream::call_failed_before_recv_message
bool * call_failed_before_recv_message
Definition: binder_stream.h:110
is_client_
const bool is_client_
Definition: binder_transport.cc:370
grpc_binder_transport
Definition: binder_transport.h:45
grpc_binder_stream::register_stream_args
RegisterStreamArgs register_stream_args
Definition: binder_stream.h:101
arg
Definition: cmdline.cc:40
grpc_core::Combiner::Run
void Run(grpc_closure *closure, grpc_error_handle error)
Definition: combiner.cc:343
RecvTrailingMetadataArgs::tx_code
int tx_code
Definition: binder_stream.h:39
grpc_binder_transport::grpc_binder_transport
grpc_binder_transport(std::unique_ptr< grpc_binder::Binder > binder, bool is_client, std::shared_ptr< grpc::experimental::binder::SecurityPolicy > security_policy)
Definition: binder_transport.cc:702
grpc_binder_unref_transport
static void grpc_binder_unref_transport(grpc_binder_transport *t, const char *reason, const char *file, int line)
Definition: binder_transport.cc:57
GRPC_BINDER_UNREF_TRANSPORT
#define GRPC_BINDER_UNREF_TRANSPORT(t, r)
Definition: binder_transport.cc:88
grpc_core::slice_detail::BaseSlice::as_string_view
absl::string_view as_string_view() const
Definition: src/core/lib/slice/slice.h:89
grpc_stream_ref
void grpc_stream_ref(grpc_stream_refcount *refcount, const char *reason)
Definition: transport.h:203
grpc_core::MetadataMap::Clear
void Clear()
Definition: metadata_batch.h:1214
grpc_binder_transport::wire_writer
std::shared_ptr< grpc_binder::WireWriter > wire_writer
Definition: binder_transport.h:65
AssignMetadata
static void AssignMetadata(grpc_metadata_batch *mb, const grpc_binder::Metadata &md)
Definition: binder_transport.cc:136
batch
grpc_transport_stream_op_batch * batch
Definition: retry_filter.cc:243
RecvMessageArgs::message
absl::StatusOr< std::string > message
Definition: binder_stream.h:33
slice_internal.h
grpc_binder_transport::combiner
grpc_core::Combiner * combiner
Definition: binder_transport.h:70
GPR_ERROR
#define GPR_ERROR
Definition: include/grpc/impl/codegen/log.h:57
RecvInitialMetadataArgs
Definition: binder_stream.h:22
grpc_create_binder_transport_server
grpc_transport * grpc_create_binder_transport_server(std::unique_ptr< grpc_binder::Binder > client_binder, std::shared_ptr< grpc::experimental::binder::SecurityPolicy > security_policy)
Definition: binder_transport.cc:752
grpc_binder_stream::recv_trailing_metadata_closure
grpc_closure recv_trailing_metadata_closure
Definition: binder_stream.h:97
grpc_core::ExecCtx
Definition: exec_ctx.h:97
grpc_core::GrpcStatusMetadata
Definition: metadata_batch.h:293
Json::ValueType
ValueType
Type of the value held by a Value object.
Definition: third_party/bloaty/third_party/protobuf/conformance/third_party/jsoncpp/json.h:463
ContainsAuthorityAndPath
static bool ContainsAuthorityAndPath(const grpc_binder::Metadata &metadata)
Definition: binder_transport.cc:185
GRPC_ERROR_CREATE_FROM_STATIC_STRING
#define GRPC_ERROR_CREATE_FROM_STATIC_STRING(desc)
Definition: error.h:291
init_stream
static int init_stream(grpc_transport *gt, grpc_stream *gs, grpc_stream_refcount *refcount, const void *server_data, grpc_core::Arena *arena)
Definition: binder_transport.cc:103
grpc_binder_stream::destroy_stream
grpc_closure destroy_stream
Definition: binder_stream.h:88
RecvMessageArgs::tx_code
int tx_code
Definition: binder_stream.h:32
RecvMessageArgs
Definition: binder_stream.h:29
value
const char * value
Definition: hpack_parser_table.cc:165
perform_transport_op
static void perform_transport_op(grpc_transport *gt, grpc_transport_op *op)
Definition: binder_transport.cc:625
grpc_binder_stream::recv_message_closure
grpc_closure recv_message_closure
Definition: binder_stream.h:95
grpc_binder_transport::accept_stream_fn
void(* accept_stream_fn)(void *user_data, grpc_transport *transport, const void *server_data)
Definition: binder_transport.h:74
benchmark.md
md
Definition: benchmark.py:86
grpc_core::SliceBuffer
Definition: src/core/lib/slice/slice_buffer.h:44
GRPC_ERROR_REF
#define GRPC_ERROR_REF(err)
Definition: error.h:261
grpc_binder_ref_transport
static void grpc_binder_ref_transport(grpc_binder_transport *t, const char *reason, const char *file, int line)
Definition: binder_transport.cc:52
absl_status_to_grpc_error
grpc_error_handle absl_status_to_grpc_error(absl::Status status)
Definition: error_utils.cc:167
grpc_binder_stream::recv_trailing_metadata_args
RecvTrailingMetadataArgs recv_trailing_metadata_args
Definition: binder_stream.h:98
key
const char * key
Definition: hpack_parser_table.cc:164
grpc_error_set_int
grpc_error_handle grpc_error_set_int(grpc_error_handle src, grpc_error_ints which, intptr_t value)
Definition: error.cc:613
accept_stream_locked
static void accept_stream_locked(void *gt, grpc_error_handle)
Definition: binder_transport.cc:694
grpc_binder_stream::is_closed
bool is_closed
Definition: binder_stream.h:85
GRPC_BINDER_STREAM_UNREF
#define GRPC_BINDER_STREAM_UNREF(stream, reason)
Definition: binder_transport.cc:84
exec_ctx
grpc_core::ExecCtx exec_ctx
Definition: end2end_binder_transport_test.cc:75
binder_stream.h
grpc_transport::vtable
const grpc_transport_vtable * vtable
Definition: transport_impl.h:93
absl::Status
Definition: third_party/abseil-cpp/absl/status/status.h:424
absl::str_format_internal::LengthMod::t
@ t
perform_stream_op_locked
static void perform_stream_op_locked(void *stream_op, grpc_error_handle)
Definition: binder_transport.cc:378
GRPC_ERROR_CREATE_FROM_CPP_STRING
#define GRPC_ERROR_CREATE_FROM_CPP_STRING(desc)
Definition: error.h:297
grpc_binder_stream::recv_initial_metadata_ready
grpc_closure * recv_initial_metadata_ready
Definition: binder_stream.h:106
wire_writer.h
regen-readme.line
line
Definition: regen-readme.py:30
recv_trailing_metadata_locked
static void recv_trailing_metadata_locked(void *arg, grpc_error_handle)
Definition: binder_transport.cc:278
arg
struct arg arg
grpc_binder_stream::destroy_stream_then_closure
grpc_closure * destroy_stream_then_closure
Definition: binder_stream.h:87
grpc_binder_stream::trailing_metadata_sent
bool trailing_metadata_sent
Definition: binder_stream.h:114
exec_ctx.h
grpc_slice_from_cpp_string
grpc_slice grpc_slice_from_cpp_string(std::string str)
Definition: slice/slice.cc:202
transport_stream_receiver.h
GRPC_ERROR_UNREF
#define GRPC_ERROR_UNREF(err)
Definition: error.h:262
register_stream_locked
static void register_stream_locked(void *arg, grpc_error_handle)
Definition: binder_transport.cc:98
grpc_core::ExecCtx::Run
static void Run(const DebugLocation &location, grpc_closure *closure, grpc_error_handle error)
Definition: exec_ctx.cc:98
grpc_transport
Definition: transport_impl.h:89
transport.h
grpc_stream
struct grpc_stream grpc_stream
Definition: transport.h:174
RegisterStreamArgs::gbs
grpc_binder_stream * gbs
Definition: binder_stream.h:45
grpc_binder_stream::recv_initial_metadata_closure
grpc_closure recv_initial_metadata_closure
Definition: binder_stream.h:93
grpc_binder_stream::recv_initial_metadata
grpc_metadata_batch * recv_initial_metadata
Definition: binder_stream.h:105
RecvInitialMetadataArgs::initial_metadata
absl::StatusOr< grpc_binder::Metadata > initial_metadata
Definition: binder_stream.h:26
grpc_binder_stream::need_to_call_trailing_metadata_callback
bool need_to_call_trailing_metadata_callback
Definition: binder_stream.h:115
RecvInitialMetadataArgs::tx_code
int tx_code
Definition: binder_stream.h:25
grpc_binder_stream::register_stream_closure
grpc_closure register_stream_closure
Definition: binder_stream.h:100
grpc_binder::Metadata
std::vector< std::pair< std::string, std::string > > Metadata
Definition: transaction.h:38
GRPC_CHANNEL_SHUTDOWN
@ GRPC_CHANNEL_SHUTDOWN
Definition: include/grpc/impl/codegen/connectivity_state.h:40
grpc_binder_stream::~grpc_binder_stream
~grpc_binder_stream()
Definition: binder_stream.h:70
perform_transport_op_locked
static void perform_transport_op_locked(void *transport_op, grpc_error_handle)
Definition: binder_transport.cc:590
absl::StatusOr< grpc_binder::Metadata >
grpc_binder::TransportStreamReceiver
Definition: transport_stream_receiver.h:32
grpc_error
Definition: error_internal.h:42
destroy_stream
static void destroy_stream(grpc_transport *, grpc_stream *gs, grpc_closure *then_schedule_closure)
Definition: binder_transport.cc:646
grpc_binder_stream::t
grpc_binder_transport * t
Definition: binder_stream.h:80
grpc_metadata_batch
Definition: metadata_batch.h:1259
grpc_pollset
Definition: bm_cq_multiple_threads.cc:37
set_pollset
static void set_pollset(grpc_transport *gt, grpc_stream *gs, grpc_pollset *gp)
Definition: binder_transport.cc:128
GRPC_COMBINER_UNREF
#define GRPC_COMBINER_UNREF(combiner, reason)
Definition: combiner.h:71
grpc_transport_stream_op_batch
Definition: transport.h:284
grpc_closure
Definition: closure.h:56
cancel_stream_locked
static void cancel_stream_locked(grpc_binder_transport *gbt, grpc_binder_stream *gbs, grpc_error_handle error)
Definition: binder_transport.cc:149
op
static grpc_op * op
Definition: test/core/fling/client.cc:47
grpc_stream_unref
void grpc_stream_unref(grpc_stream_refcount *refcount, const char *reason)
Definition: transport.h:220
close_transport_locked
static void close_transport_locked(grpc_binder_transport *gbt)
Definition: binder_transport.cc:578
grpc_binder_transport::wire_reader
grpc_core::OrphanablePtr< grpc_binder::WireReader > wire_reader
Definition: binder_transport.h:64
GRPC_BINDER_REF_TRANSPORT
#define GRPC_BINDER_REF_TRANSPORT(t, r)
Definition: binder_transport.cc:86
grpc_endpoint
Definition: endpoint.h:105
grpc_binder_transport::accept_stream_user_data
void * accept_stream_user_data
Definition: binder_transport.h:76
cb
OPENSSL_EXPORT pem_password_cb * cb
Definition: pem.h:351
grpc_stream_refcount
Definition: transport.h:178
GRPC_ERROR_INT_GRPC_STATUS
@ GRPC_ERROR_INT_GRPC_STATUS
grpc status code representing this error
Definition: error.h:66
error_utils.h
grpc_binder_stream_ref
static void grpc_binder_stream_ref(grpc_binder_stream *s, const char *reason)
Definition: binder_transport.cc:45
transport_stream_receiver_impl.h
binder_transport.h
GRPC_ERROR_IS_NONE
#define GRPC_ERROR_IS_NONE(err)
Definition: error.h:241
port_platform.h


grpc
Author(s):
autogenerated on Fri May 16 2025 02:57:48