Go to the documentation of this file.
19 #ifndef GRPC_NO_BINDER
23 #include "absl/cleanup/cleanup.h"
24 #include "absl/types/variant.h"
28 #define RETURN_IF_ERROR(expr) \
30 const absl::Status status = (expr); \
31 if (!status.ok()) return status; \
44 auto* run_scheduled_tx_args =
85 gpr_log(
GPR_INFO,
"%s write_mu_ = %p , flow_control_mu_ = %p", __func__,
91 while (!pending_outgoing_tx_.empty()) {
92 delete pending_outgoing_tx_.front();
93 pending_outgoing_tx_.pop();
114 "Unexpected large transaction (possibly caused by a very large "
115 "metadata). This might overflow the binder "
116 "transaction buffer. Size: %" PRId64
" bytes",
133 [
this, tx = tx.get()](
153 bool* is_last_chunk) {
186 *is_last_chunk =
true;
190 *is_last_chunk =
false;
208 if (absl::holds_alternative<RunScheduledTxArgs::AckTx>(
args->tx)) {
210 absl::get<RunScheduledTxArgs::AckTx>(
args->tx).num_bytes;
219 result.ToString().c_str());
224 GPR_ASSERT(absl::holds_alternative<RunScheduledTxArgs::StreamTx>(
args->tx));
226 &absl::get<RunScheduledTxArgs::StreamTx>(
args->tx);
233 GPR_ASSERT(num_non_acked_tx_in_combiner_ > 0);
234 num_non_acked_tx_in_combiner_--;
243 result.ToString().c_str());
248 bool is_last_chunk =
true;
253 return RunStreamTx(stream_tx, parcel, &is_last_chunk);
257 result.ToString().c_str());
259 if (!is_last_chunk) {
262 pending_outgoing_tx_.push(
args);
276 absl::get<RunScheduledTxArgs::StreamTx>(
args->tx).tx =
std::move(tx);
277 absl::get<RunScheduledTxArgs::StreamTx>(
args->tx).bytes_sent = 0;
280 pending_outgoing_tx_.push(
args);
300 "Scheduling ACK transaction instead of directly execute it to avoid "
305 absl::get<RunScheduledTxArgs::AckTx>(
args->tx).num_bytes = num_bytes;
319 result.ToString().c_str());
335 num_acknowledged_bytes_ =
std::max(num_acknowledged_bytes_, num_bytes);
337 if (num_acknowledged_bytes_ > num_outgoing_bytes) {
339 "The other end of transport acked more bytes than we ever sent, "
340 "%" PRId64
" > %" PRId64,
341 num_acknowledged_bytes_, num_outgoing_bytes);
350 if (pending_outgoing_tx_.empty()) {
358 int64_t num_bytes_scheduled_in_combiner =
363 int64_t num_total_bytes_will_be_sent =
368 int64_t num_non_acked_bytes_estimation =
369 num_total_bytes_will_be_sent - num_acknowledged_bytes_;
370 if (num_non_acked_bytes_estimation < 0) {
373 "Something went wrong. `num_non_acked_bytes_estimation` should be "
374 "non-negative but it is %" PRId64,
375 num_non_acked_bytes_estimation);
379 if ((num_non_acked_bytes_estimation +
kBlockSize <
381 num_non_acked_tx_in_combiner_++;
383 pending_outgoing_tx_.front(),
nullptr),
385 pending_outgoing_tx_.pop();
391 "Some work cannot be scheduled yet due to slow ack from the "
392 "other end of transport. This transport might be blocked if this "
393 "number don't go down. pending_outgoing_tx_.size() = %zu "
394 "pending_outgoing_tx_.front() = %p",
395 pending_outgoing_tx_.size(), pending_outgoing_tx_.front());
absl::Status RunStreamTx(RunScheduledTxArgs::StreamTx *stream_tx, WritableParcel *parcel, bool *is_last_chunk) ABSL_EXCLUSIVE_LOCKS_REQUIRED(write_mu_)
std::atomic_bool is_transacting_
const Metadata & GetSuffixMetadata() const
absl::Cleanup< cleanup_internal::Tag, Callback > MakeCleanup(Callback callback)
static const int64_t kBlockSize
absl::string_view GetStatusDesc() const
const int kFlagMessageDataIsPartial
virtual absl::Status WriteString(absl::string_view s)=0
absl::string_view GetMessageData() const
#define GRPC_CLOSURE_CREATE(cb, cb_arg, scheduler)
absl::Status SendAck(int64_t num_bytes) override
void TryScheduleTransaction()
const int kFlagStatusDescription
grpc_core::Combiner * grpc_combiner_create(void)
constexpr absl::remove_reference_t< T > && move(T &&t) noexcept
constexpr size_type size() const noexcept
absl::Status RpcCallFastPath(std::unique_ptr< Transaction > tx)
GPRAPI void gpr_log(const char *file, int line, gpr_log_severity severity, const char *format,...) GPR_PRINT_FORMAT_CHECK(4
#define ABSL_EXCLUSIVE_LOCKS_REQUIRED(...)
WireWriterImpl(std::unique_ptr< Binder > binder)
std::unique_ptr< Transaction > tx
void Run(grpc_closure *closure, grpc_error_handle error)
#define RETURN_IF_ERROR(expr)
virtual int32_t GetDataSize() const =0
void RunScheduledTx(void *arg, grpc_error_handle)
static const int64_t kFlowControlWindowSize
void RunScheduledTxInternal(RunScheduledTxArgs *arg)
virtual absl::Status WriteInt32(int32_t data)=0
grpc_core::ExecCtx exec_ctx
absl::Status WriteByteArrayWithLength(absl::string_view buffer)
grpc_core::Mutex write_mu_
const Metadata & GetPrefixMetadata() const
const int kFlagMessageData
absl::Status WriteInitialMetadata(const Transaction &tx, WritableParcel *parcel)
absl::string_view GetMethodRef() const
absl::Status MakeBinderTransaction(BinderTransportTxCode tx_code, std::function< absl::Status(WritableParcel *)> fill_parcel)
void OnAckReceived(int64_t num_bytes) override
~WireWriterImpl() override
#define GRPC_COMBINER_UNREF(combiner, reason)
absl::Status RpcCall(std::unique_ptr< Transaction > tx) override
bool CanBeSentInOneTransaction(const Transaction &tx)
absl::Status WriteTrailingMetadata(const Transaction &tx, WritableParcel *parcel)
grpc_core::Combiner * combiner_
std::atomic< int64_t > num_outgoing_bytes_
virtual absl::Status WriteInt64(int64_t data)=0
grpc_core::Mutex flow_control_mu_
grpc
Author(s):
autogenerated on Fri May 16 2025 03:00:54