Go to the documentation of this file.
19 #ifndef GRPC_NO_BINDER
26 #include "absl/memory/memory.h"
27 #include "absl/strings/str_cat.h"
28 #include "absl/strings/substitute.h"
53 const char* reason,
const char*
file,
58 const char* reason,
const char*
file,
75 if (
t->refs.Unref()) {
82 #define GRPC_BINDER_STREAM_REF(stream, reason) \
83 grpc_binder_stream_ref(stream, reason)
84 #define GRPC_BINDER_STREAM_UNREF(stream, reason) \
85 grpc_binder_stream_unref(stream, reason)
86 #define GRPC_BINDER_REF_TRANSPORT(t, r) \
87 grpc_binder_ref_transport(t, r, __FILE__, __LINE__)
88 #define GRPC_BINDER_UNREF_TRANSPORT(t, r) \
89 grpc_binder_unref_transport(t, r, __FILE__, __LINE__)
91 #define GRPC_BINDER_STREAM_REF(stream, reason) grpc_binder_stream_ref(stream)
92 #define GRPC_BINDER_STREAM_UNREF(stream, reason) \
93 grpc_binder_stream_unref(stream)
94 #define GRPC_BINDER_REF_TRANSPORT(t, r) grpc_binder_ref_transport(t)
95 #define GRPC_BINDER_UNREF_TRANSPORT(t, r) grpc_binder_unref_transport(t)
100 args->gbt->registered_stream[
args->gbs->GetTxCode()] =
args->gbs;
112 t->NewStreamTxCode(), t->is_client);
143 GPR_DEBUG,
"Failed to parse metadata: %s",
144 absl::StrCat(
"key=", p.first,
" error=", error).c_str());
186 bool has_authority =
false;
187 bool has_path =
false;
189 if (kv.first ==
":authority") {
190 has_authority =
true;
192 if (kv.first ==
":path") {
196 return has_authority && has_path;
205 "recv_initial_metadata_locked is_client = %d is_closed = %d",
212 if (!
args->initial_metadata.ok()) {
220 "Missing :authority or :path in initial metadata");
246 if (!
args->message.ok()) {
248 if (
args->message.status().message() ==
250 kGrpcBinderTransportCancelledGracefully) {
284 "recv_trailing_metadata_locked is_client = %d is_closed = %d",
291 if (!
args->trailing_metadata.ok()) {
297 if (!
args->trailing_metadata.value().empty()) {
335 class MetadataEncoder {
337 MetadataEncoder(
bool is_client, Transaction* tx,
Metadata* init_md)
363 template <
typename Trait>
385 if (
op->cancel_stream) {
388 !
op->send_trailing_metadata && !
op->recv_initial_metadata &&
389 !
op->recv_message && !
op->recv_trailing_metadata);
394 auto cancel_tx = absl::make_unique<grpc_binder::Transaction>(
397 cancel_tx->SetStatus(1);
401 if (
op->on_complete !=
nullptr) {
409 if (
op->send_message) {
411 op->payload->send_message.send_message->Clear();
413 if (
op->recv_initial_metadata) {
416 op->payload->recv_initial_metadata.recv_initial_metadata_ready,
419 if (
op->recv_message) {
421 op->payload->recv_message.recv_message_ready,
424 if (
op->recv_trailing_metadata) {
427 op->payload->recv_trailing_metadata.recv_trailing_metadata_ready,
430 if (
op->on_complete !=
nullptr) {
442 if (
op->send_initial_metadata) {
445 auto batch =
op->payload->send_initial_metadata.send_initial_metadata;
447 grpc_binder::MetadataEncoder encoder(gbt->
is_client, tx.get(), &init_md);
448 batch->Encode(&encoder);
449 tx->SetPrefix(init_md);
451 if (
op->send_message) {
453 tx->SetData(
op->payload->send_message.send_message->JoinIntoString());
456 if (
op->send_trailing_metadata) {
458 auto batch =
op->payload->send_trailing_metadata.send_trailing_metadata;
461 grpc_binder::MetadataEncoder encoder(gbt->
is_client, tx.get(),
463 batch->Encode(&encoder);
469 if (
op->recv_initial_metadata) {
472 op->payload->recv_initial_metadata.recv_initial_metadata_ready;
474 op->payload->recv_initial_metadata.recv_initial_metadata;
476 op->payload->recv_initial_metadata.trailing_metadata_available;
492 if (
op->recv_message) {
497 op->payload->recv_message.call_failed_before_recv_message;
510 if (
op->recv_trailing_metadata) {
513 op->payload->recv_trailing_metadata.recv_trailing_metadata_ready;
515 op->payload->recv_trailing_metadata.recv_trailing_metadata;
535 if (
op->send_initial_metadata ||
op->send_message ||
536 op->send_trailing_metadata) {
538 if (!gbs->
is_client &&
op->send_trailing_metadata) {
556 if (
op->on_complete !=
nullptr) {
572 op->handler_private.extra_arg = gbs;
580 "transport closed due to disconnection/goaway");
596 if (
op->start_connectivity_watch !=
nullptr) {
600 if (
op->stop_connectivity_watch !=
nullptr) {
603 if (
op->set_accept_stream) {
607 if (
op->on_consumed) {
628 op->handler_private.extra_arg = gbt;
703 std::unique_ptr<grpc_binder::Binder> binder,
bool is_client,
704 std::shared_ptr<grpc::experimental::binder::SecurityPolicy> security_policy)
705 : is_client(is_client),
708 is_client ?
"binder_transport_client" :
"binder_transport_server",
714 std::make_shared<grpc_binder::TransportStreamReceiverImpl>(
723 wire_reader = grpc_core::MakeOrphanable<grpc_binder::WireReaderImpl>(
738 std::unique_ptr<grpc_binder::Binder> endpoint_binder,
739 std::shared_ptr<grpc::experimental::binder::SecurityPolicy>
747 std::move(endpoint_binder),
true, security_policy);
753 std::unique_ptr<grpc_binder::Binder> client_binder,
754 std::shared_ptr<grpc::experimental::binder::SecurityPolicy>
762 std::move(client_binder),
false, security_policy);
#define GRPC_CLOSURE_INIT(closure, cb, cb_arg, scheduler)
void RemoveWatcher(ConnectivityStateWatcherInterface *watcher)
static const grpc_transport_vtable vtable
@ GRPC_STATUS_UNAVAILABLE
absl::optional< grpc_core::SliceBuffer > * recv_message
static void grpc_binder_stream_unref(grpc_binder_stream *s, const char *reason)
grpc_core::ConnectivityStateTracker state_tracker
static void recv_initial_metadata_locked(void *arg, grpc_error_handle)
#define GPR_TIMER_SCOPE(tag, important)
struct grpc_pollset_set grpc_pollset_set
grpc_metadata_batch * recv_trailing_metadata
bool * trailing_metadata_available
#define GRPC_BINDER_STREAM_REF(stream, reason)
grpc_error_handle cancel_self_error
grpc_closure * recv_message_ready
void AddWatcher(grpc_connectivity_state initial_state, OrphanablePtr< ConnectivityStateWatcherInterface > watcher)
#define GRPC_CLOSURE_CREATE(cb, cb_arg, scheduler)
#define GRPC_ERROR_CANCELLED
grpc_transport * grpc_create_binder_transport_client(std::unique_ptr< grpc_binder::Binder > endpoint_binder, std::shared_ptr< grpc::experimental::binder::SecurityPolicy > security_policy)
RecvInitialMetadataArgs recv_initial_metadata_args
static void do_close(void *handle)
grpc_binder_transport * gbt
static void recv_message_locked(void *arg, grpc_error_handle)
grpc_core::ScopedArenaPtr arena
static void destroy_transport_locked(void *gt, grpc_error_handle)
static grpc_endpoint * get_endpoint(grpc_transport *)
std::shared_ptr< grpc_binder::TransportStreamReceiver > transport_stream_receiver
static void set_pollset_set(grpc_transport *, grpc_stream *, grpc_pollset_set *)
static void destroy_stream_locked(void *sp, grpc_error_handle)
grpc_core::Combiner * grpc_combiner_create(void)
std::vector< CordRep * > refs
static Slice FromCopiedString(const char *s)
void SetState(grpc_connectivity_state state, const absl::Status &status, const char *reason)
grpc_closure * recv_trailing_metadata_finished
RecvMessageArgs recv_message_args
constexpr absl::remove_reference_t< T > && move(T &&t) noexcept
static void perform_stream_op(grpc_transport *gt, grpc_stream *gs, grpc_transport_stream_op_batch *op)
GPRAPI void gpr_log(const char *file, int line, gpr_log_severity severity, const char *format,...) GPR_PRINT_FORMAT_CHECK(4
absl::flat_hash_map< int, grpc_binder_stream * > registered_stream
static const grpc_transport_vtable * get_vtable()
static void destroy_transport(grpc_transport *gt)
bool * call_failed_before_recv_message
RegisterStreamArgs register_stream_args
void Run(grpc_closure *closure, grpc_error_handle error)
grpc_binder_transport(std::unique_ptr< grpc_binder::Binder > binder, bool is_client, std::shared_ptr< grpc::experimental::binder::SecurityPolicy > security_policy)
static void grpc_binder_unref_transport(grpc_binder_transport *t, const char *reason, const char *file, int line)
#define GRPC_BINDER_UNREF_TRANSPORT(t, r)
absl::string_view as_string_view() const
void grpc_stream_ref(grpc_stream_refcount *refcount, const char *reason)
std::shared_ptr< grpc_binder::WireWriter > wire_writer
static void AssignMetadata(grpc_metadata_batch *mb, const grpc_binder::Metadata &md)
grpc_transport_stream_op_batch * batch
absl::StatusOr< std::string > message
grpc_core::Combiner * combiner
grpc_transport * grpc_create_binder_transport_server(std::unique_ptr< grpc_binder::Binder > client_binder, std::shared_ptr< grpc::experimental::binder::SecurityPolicy > security_policy)
grpc_closure recv_trailing_metadata_closure
ValueType
Type of the value held by a Value object.
static bool ContainsAuthorityAndPath(const grpc_binder::Metadata &metadata)
#define GRPC_ERROR_CREATE_FROM_STATIC_STRING(desc)
static int init_stream(grpc_transport *gt, grpc_stream *gs, grpc_stream_refcount *refcount, const void *server_data, grpc_core::Arena *arena)
grpc_closure destroy_stream
static void perform_transport_op(grpc_transport *gt, grpc_transport_op *op)
grpc_closure recv_message_closure
void(* accept_stream_fn)(void *user_data, grpc_transport *transport, const void *server_data)
#define GRPC_ERROR_REF(err)
static void grpc_binder_ref_transport(grpc_binder_transport *t, const char *reason, const char *file, int line)
grpc_error_handle absl_status_to_grpc_error(absl::Status status)
RecvTrailingMetadataArgs recv_trailing_metadata_args
grpc_error_handle grpc_error_set_int(grpc_error_handle src, grpc_error_ints which, intptr_t value)
static void accept_stream_locked(void *gt, grpc_error_handle)
#define GRPC_BINDER_STREAM_UNREF(stream, reason)
grpc_core::ExecCtx exec_ctx
const grpc_transport_vtable * vtable
static void perform_stream_op_locked(void *stream_op, grpc_error_handle)
#define GRPC_ERROR_CREATE_FROM_CPP_STRING(desc)
grpc_closure * recv_initial_metadata_ready
static void recv_trailing_metadata_locked(void *arg, grpc_error_handle)
grpc_closure * destroy_stream_then_closure
bool trailing_metadata_sent
grpc_slice grpc_slice_from_cpp_string(std::string str)
#define GRPC_ERROR_UNREF(err)
static void register_stream_locked(void *arg, grpc_error_handle)
static void Run(const DebugLocation &location, grpc_closure *closure, grpc_error_handle error)
struct grpc_stream grpc_stream
grpc_closure recv_initial_metadata_closure
grpc_metadata_batch * recv_initial_metadata
bool need_to_call_trailing_metadata_callback
grpc_closure register_stream_closure
std::vector< std::pair< std::string, std::string > > Metadata
static void perform_transport_op_locked(void *transport_op, grpc_error_handle)
static void destroy_stream(grpc_transport *, grpc_stream *gs, grpc_closure *then_schedule_closure)
grpc_binder_transport * t
static void set_pollset(grpc_transport *gt, grpc_stream *gs, grpc_pollset *gp)
#define GRPC_COMBINER_UNREF(combiner, reason)
static void cancel_stream_locked(grpc_binder_transport *gbt, grpc_binder_stream *gbs, grpc_error_handle error)
void grpc_stream_unref(grpc_stream_refcount *refcount, const char *reason)
static void close_transport_locked(grpc_binder_transport *gbt)
grpc_core::OrphanablePtr< grpc_binder::WireReader > wire_reader
#define GRPC_BINDER_REF_TRANSPORT(t, r)
void * accept_stream_user_data
OPENSSL_EXPORT pem_password_cb * cb
@ GRPC_ERROR_INT_GRPC_STATUS
grpc status code representing this error
static void grpc_binder_stream_ref(grpc_binder_stream *s, const char *reason)
#define GRPC_ERROR_IS_NONE(err)
grpc
Author(s):
autogenerated on Fri May 16 2025 02:57:48