wire_writer.h
Go to the documentation of this file.
1 // Copyright 2021 gRPC authors.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #ifndef GRPC_CORE_EXT_TRANSPORT_BINDER_WIRE_FORMAT_WIRE_WRITER_H
16 #define GRPC_CORE_EXT_TRANSPORT_BINDER_WIRE_FORMAT_WIRE_WRITER_H
17 
19 
20 #include <queue>
21 #include <string>
22 #include <vector>
23 
24 #include "absl/container/flat_hash_map.h"
25 
30 
31 namespace grpc_binder {
32 
33 // Member functions are thread safe.
34 class WireWriter {
35  public:
36  virtual ~WireWriter() = default;
37  virtual absl::Status RpcCall(std::unique_ptr<Transaction> tx) = 0;
38  virtual absl::Status SendAck(int64_t num_bytes) = 0;
39  virtual void OnAckReceived(int64_t num_bytes) = 0;
40 };
41 
42 class WireWriterImpl : public WireWriter {
43  public:
44  explicit WireWriterImpl(std::unique_ptr<Binder> binder);
45  ~WireWriterImpl() override;
46  absl::Status RpcCall(std::unique_ptr<Transaction> tx) override;
47  absl::Status SendAck(int64_t num_bytes) override;
48  void OnAckReceived(int64_t num_bytes) override;
49 
50  // Required to be public because we would like to call this in combiner (which
51  // cannot invoke class member function). `RunScheduledTxArgs` and
52  // `RunScheduledTxInternal` should not be used by user directly.
55  struct StreamTx {
56  std::unique_ptr<Transaction> tx;
57  // How many data in transaction's `data` field has been sent.
59  };
60  struct AckTx {
62  };
64  };
65 
67 
68  // Split long message into chunks of size 16k. This doesn't necessarily have
69  // to be the same as the flow control acknowledgement size, but it should not
70  // exceed 128k.
71  static const int64_t kBlockSize;
72 
73  // Flow control allows sending at most 128k between acknowledgements.
75 
76  private:
77  // Fast path: send data in one transaction.
78  absl::Status RpcCallFastPath(std::unique_ptr<Transaction> tx);
79 
80  // This function will acquire `write_mu_` to make sure the binder is not used
81  // concurrently, so this can be called by different threads safely.
85 
86  // Send a stream to `binder_`. Set `is_last_chunk` to `true` if the stream
87  // transaction has been sent completely. Otherwise set to `false`.
89  WritableParcel* parcel, bool* is_last_chunk)
91 
92  // Schdule `RunScheduledTxArgs*` in `pending_outgoing_tx_` to `combiner_`, as
93  // many as possible (under the constraint of `kFlowControlWindowSize`).
95 
96  // Guards variables related to transport state.
98  std::unique_ptr<Binder> binder_ ABSL_GUARDED_BY(write_mu_);
99 
100  // Maps the transaction code (which identifies streams) to their next
101  // available sequence number. See
102  // https://github.com/grpc/proposal/blob/master/L73-java-binderchannel/wireformat.md#sequence-number
104 
105  // Number of bytes we have already sent in stream transactions.
106  std::atomic<int64_t> num_outgoing_bytes_{0};
107 
108  // Guards variables related to flow control logic.
110  int64_t num_acknowledged_bytes_ ABSL_GUARDED_BY(flow_control_mu_) = 0;
111 
112  // The queue takes ownership of the pointer.
113  std::queue<RunScheduledTxArgs*> pending_outgoing_tx_
115  int num_non_acked_tx_in_combiner_ ABSL_GUARDED_BY(flow_control_mu_) = 0;
116 
117  // Helper variable for determining if we are currently calling into
118  // `Binder::Transact`. Useful for avoiding the attempt of acquiring
119  // `write_mu_` multiple times on the same thread.
120  std::atomic_bool is_transacting_{false};
121 
123 };
124 
125 } // namespace grpc_binder
126 
127 #endif // GRPC_CORE_EXT_TRANSPORT_BINDER_WIRE_FORMAT_WIRE_WRITER_H
grpc_binder::WireWriterImpl::RunStreamTx
absl::Status RunStreamTx(RunScheduledTxArgs::StreamTx *stream_tx, WritableParcel *parcel, bool *is_last_chunk) ABSL_EXCLUSIVE_LOCKS_REQUIRED(write_mu_)
Definition: wire_writer.cc:151
grpc_binder::WireWriterImpl::is_transacting_
std::atomic_bool is_transacting_
Definition: wire_writer.h:120
transaction.h
grpc_binder::WireWriterImpl::RunScheduledTxArgs::StreamTx::bytes_sent
int64_t bytes_sent
Definition: wire_writer.h:58
binder.h
grpc_binder::WireWriterImpl::kBlockSize
static const int64_t kBlockSize
Definition: wire_writer.h:71
grpc_binder
Definition: connection_id_generator.cc:45
grpc_binder::WireWriter
Definition: wire_writer.h:34
grpc_binder::WireWriterImpl::RunScheduledTxArgs::writer
WireWriterImpl * writer
Definition: wire_writer.h:54
grpc_binder::WireWriter::OnAckReceived
virtual void OnAckReceived(int64_t num_bytes)=0
tx_code
int tx_code
Definition: fake_binder_test.cc:241
grpc_binder::WireWriterImpl::SendAck
absl::Status SendAck(int64_t num_bytes) override
Definition: wire_writer.cc:286
grpc_binder::WireWriterImpl::RunScheduledTxArgs::tx
absl::variant< AckTx, StreamTx > tx
Definition: wire_writer.h:63
grpc_binder::BinderTransportTxCode
BinderTransportTxCode
Definition: binder_constants.h:31
grpc_binder::WireWriterImpl::TryScheduleTransaction
void TryScheduleTransaction()
Definition: wire_writer.cc:347
int64_t
signed __int64 int64_t
Definition: stdint-msvc2008.h:89
grpc_binder::WireWriterImpl::RpcCallFastPath
absl::Status RpcCallFastPath(std::unique_ptr< Transaction > tx)
Definition: wire_writer.cc:130
ABSL_EXCLUSIVE_LOCKS_REQUIRED
#define ABSL_EXCLUSIVE_LOCKS_REQUIRED(...)
Definition: abseil-cpp/absl/base/thread_annotations.h:145
grpc_binder::WireWriter::SendAck
virtual absl::Status SendAck(int64_t num_bytes)=0
grpc_binder::WireWriterImpl::WireWriterImpl
WireWriterImpl(std::unique_ptr< Binder > binder)
Definition: wire_writer.cc:83
grpc_binder::WireWriterImpl::RunScheduledTxArgs::StreamTx::tx
std::unique_ptr< Transaction > tx
Definition: wire_writer.h:56
arg
Definition: cmdline.cc:40
grpc_binder::WireWriterImpl::kFlowControlWindowSize
static const int64_t kFlowControlWindowSize
Definition: wire_writer.h:74
grpc_binder::WritableParcel
Definition: binder.h:44
grpc_core::Mutex
Definition: src/core/lib/gprpp/sync.h:61
grpc_binder::WireWriterImpl::RunScheduledTxInternal
void RunScheduledTxInternal(RunScheduledTxArgs *arg)
Definition: wire_writer.cc:206
grpc_binder::WireWriterImpl::RunScheduledTxArgs::AckTx::num_bytes
int64_t num_bytes
Definition: wire_writer.h:61
absl::Status
Definition: third_party/abseil-cpp/absl/status/status.h:424
grpc_binder::WireWriterImpl::write_mu_
grpc_core::Mutex write_mu_
Definition: wire_writer.h:97
absl::flat_hash_map
Definition: abseil-cpp/absl/container/flat_hash_map.h:113
grpc_binder::WireWriterImpl::RunScheduledTxArgs
Definition: wire_writer.h:53
combiner.h
grpc_binder::WireWriter::~WireWriter
virtual ~WireWriter()=default
grpc_core::Combiner
Definition: combiner.h:34
grpc_binder::WireWriterImpl
Definition: wire_writer.h:42
grpc_binder::WireWriterImpl::RunScheduledTxArgs::AckTx
Definition: wire_writer.h:60
grpc_binder::WireWriterImpl::MakeBinderTransaction
absl::Status MakeBinderTransaction(BinderTransportTxCode tx_code, std::function< absl::Status(WritableParcel *)> fill_parcel)
Definition: wire_writer.cc:102
grpc_binder::WireWriterImpl::OnAckReceived
void OnAckReceived(int64_t num_bytes) override
Definition: wire_writer.cc:324
function
std::function< bool(GrpcTool *, int, const char **, const CliCredentials &, GrpcToolOutputCallback)> function
Definition: grpc_tool.cc:250
grpc_binder::WireWriterImpl::~WireWriterImpl
~WireWriterImpl() override
Definition: wire_writer.cc:89
absl::variant
Definition: abseil-cpp/absl/types/internal/variant.h:46
grpc_binder::WireWriterImpl::RpcCall
absl::Status RpcCall(std::unique_ptr< Transaction > tx) override
Definition: wire_writer.cc:270
grpc_binder::WireWriterImpl::ABSL_GUARDED_BY
std::unique_ptr< Binder > binder_ ABSL_GUARDED_BY(write_mu_)
sync.h
grpc_binder::WireWriterImpl::RunScheduledTxArgs::StreamTx
Definition: wire_writer.h:55
grpc_binder::WireWriterImpl::combiner_
grpc_core::Combiner * combiner_
Definition: wire_writer.h:122
grpc_binder::WireWriterImpl::num_outgoing_bytes_
std::atomic< int64_t > num_outgoing_bytes_
Definition: wire_writer.h:106
grpc_binder::WireWriter::RpcCall
virtual absl::Status RpcCall(std::unique_ptr< Transaction > tx)=0
port_platform.h
grpc_binder::WireWriterImpl::flow_control_mu_
grpc_core::Mutex flow_control_mu_
Definition: wire_writer.h:109


grpc
Author(s):
autogenerated on Thu Mar 13 2025 03:01:52