transport_stream_receiver_impl.h
Go to the documentation of this file.
1 // Copyright 2021 gRPC authors.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #ifndef GRPC_CORE_EXT_TRANSPORT_BINDER_UTILS_TRANSPORT_STREAM_RECEIVER_IMPL_H
16 #define GRPC_CORE_EXT_TRANSPORT_BINDER_UTILS_TRANSPORT_STREAM_RECEIVER_IMPL_H
17 
19 
20 #include <functional>
21 #include <map>
22 #include <queue>
23 #include <set>
24 #include <string>
25 #include <vector>
26 
29 
30 namespace grpc_binder {
31 
32 // Routes the data received from transport to corresponding streams
34  public:
36  bool is_client, std::function<void()> accept_stream_callback = nullptr)
37  : is_client_(is_client),
38  accept_stream_callback_(accept_stream_callback) {}
42  MessageDataCallbackType cb) override;
46  StreamIdentifier id, absl::StatusOr<Metadata> initial_metadata) override;
51  int status) override;
52 
53  void CancelStream(StreamIdentifier id) override;
54 
55  private:
56  // Trailing metadata marks the end of one-side of the stream. Thus, after
57  // receiving trailing metadata from the other-end, we know that there will
58  // never be in-coming message data anymore, and all recv_message callbacks
59  // (as well as recv_initial_metadata callback, if there's any) registered will
60  // never be satisfied. This function cancels all such callbacks gracefully
61  // (with GRPC_ERROR_NONE) to avoid being blocked waiting for them.
63 
67 
68  std::map<StreamIdentifier, InitialMetadataCallbackType> initial_metadata_cbs_;
69  std::map<StreamIdentifier, MessageDataCallbackType> message_cbs_;
70  std::map<StreamIdentifier, TrailingMetadataCallbackType>
72  // TODO(waynetu): Better thread safety design. For example, use separate
73  // mutexes for different type of messages.
75  // TODO(waynetu): gRPC surface layer will not wait for the current message to
76  // be delivered before sending the next message. The following implementation
77  // is still buggy with the current implementation of wire writer if
78  // transaction issued first completes after the one issued later does. This is
79  // because we just take the first element out of the queue and assume it's the
80  // one issued first without further checking, which results in callbacks being
81  // invoked with incorrect data.
82  //
83  // This should be fixed in the wire writer level and make sure out-of-order
84  // messages will be re-ordered by it. In such case, the queueing approach will
85  // work fine. Refer to the TODO in WireWriterImpl::ProcessTransaction() at
86  // wire_reader_impl.cc for detecting and resolving out-of-order transactions.
87  //
88  // TODO(waynetu): Use absl::flat_hash_map.
89  std::map<StreamIdentifier, std::queue<absl::StatusOr<Metadata>>>
90  pending_initial_metadata_ ABSL_GUARDED_BY(m_);
91  std::map<StreamIdentifier, std::queue<absl::StatusOr<std::string>>>
92  pending_message_ ABSL_GUARDED_BY(m_);
94  std::queue<std::pair<absl::StatusOr<Metadata>, int>>>
95  pending_trailing_metadata_ ABSL_GUARDED_BY(m_);
96  // Record whether or not the recv_message callbacks of a given stream is
97  // cancelled. Although we explicitly cancel the registered recv_message() in
98  // CancelRecvMessageCallbacksDueToTrailingMetadata(), there are chances that
99  // the registration comes "after" we receive trailing metadata. Therefore,
100  // when RegisterRecvMessage() gets called, we should check whether
101  // recv_message_cancelled_ contains the corresponding stream ID, and if so,
102  // directly cancel the callback gracefully without pending it.
103  std::set<StreamIdentifier> trailing_metadata_recvd_ ABSL_GUARDED_BY(m_);
104 
106  // Called when receiving initial metadata to inform the server about a new
107  // stream.
109 };
110 } // namespace grpc_binder
111 
112 #endif // GRPC_CORE_EXT_TRANSPORT_BINDER_UTILS_TRANSPORT_STREAM_RECEIVER_IMPL_H
grpc_binder::TransportStreamReceiverImpl::CancelTrailingMetadataCallback
void CancelTrailingMetadataCallback(StreamIdentifier id, absl::Status error)
Definition: transport_stream_receiver_impl.cc:215
grpc_binder::TransportStreamReceiverImpl::TransportStreamReceiverImpl
TransportStreamReceiverImpl(bool is_client, std::function< void()> accept_stream_callback=nullptr)
Definition: transport_stream_receiver_impl.h:35
grpc_binder::TransportStreamReceiverImpl::initial_metadata_cbs_
std::map< StreamIdentifier, InitialMetadataCallbackType > initial_metadata_cbs_
Definition: transport_stream_receiver_impl.h:68
grpc_binder
Definition: connection_id_generator.cc:45
error
grpc_error_handle error
Definition: retry_filter.cc:499
grpc_binder::TransportStreamReceiverImpl::accept_stream_callback_
std::function< void()> accept_stream_callback_
Definition: transport_stream_receiver_impl.h:108
status
absl::Status status
Definition: rls.cc:251
grpc_binder::TransportStreamReceiverImpl::m_
grpc_core::Mutex m_
Definition: transport_stream_receiver_impl.h:74
grpc_binder::TransportStreamReceiverImpl::is_client_
bool is_client_
Definition: transport_stream_receiver_impl.h:105
grpc_binder::TransportStreamReceiverImpl::OnRecvTrailingMetadata
void OnRecvTrailingMetadata(StreamIdentifier id)
Definition: transport_stream_receiver_impl.cc:231
grpc_binder::TransportStreamReceiver::InitialMetadataCallbackType
std::function< void(absl::StatusOr< Metadata >)> InitialMetadataCallbackType
Definition: transport_stream_receiver.h:37
map
zval * map
Definition: php/ext/google/protobuf/encode_decode.c:480
message
char * message
Definition: libuv/docs/code/tty-gravity/main.c:12
grpc_binder::TransportStreamReceiverImpl::NotifyRecvTrailingMetadata
void NotifyRecvTrailingMetadata(StreamIdentifier id, absl::StatusOr< Metadata > trailing_metadata, int status) override
Definition: transport_stream_receiver_impl.cc:158
grpc_binder::TransportStreamReceiverImpl::CancelMessageCallback
void CancelMessageCallback(StreamIdentifier id, absl::Status error)
Definition: transport_stream_receiver_impl.cc:199
grpc_binder::TransportStreamReceiverImpl::NotifyRecvMessage
void NotifyRecvMessage(StreamIdentifier id, absl::StatusOr< std::string > message) override
Definition: transport_stream_receiver_impl.cc:140
grpc_binder::TransportStreamReceiver::TrailingMetadataCallbackType
std::function< void(absl::StatusOr< Metadata >, int)> TrailingMetadataCallbackType
Definition: transport_stream_receiver.h:41
grpc_status._async.trailing_metadata
trailing_metadata
Definition: grpcio_status/grpc_status/_async.py:36
grpc_binder::TransportStreamReceiverImpl::RegisterRecvInitialMetadata
void RegisterRecvInitialMetadata(StreamIdentifier id, InitialMetadataCallbackType cb) override
Definition: transport_stream_receiver_impl.cc:33
grpc_binder::TransportStreamReceiverImpl::NotifyRecvInitialMetadata
void NotifyRecvInitialMetadata(StreamIdentifier id, absl::StatusOr< Metadata > initial_metadata) override
Definition: transport_stream_receiver_impl.cc:119
grpc_binder::TransportStreamReceiverImpl::trailing_metadata_cbs_
std::map< StreamIdentifier, TrailingMetadataCallbackType > trailing_metadata_cbs_
Definition: transport_stream_receiver_impl.h:71
grpc_binder::TransportStreamReceiver::MessageDataCallbackType
std::function< void(absl::StatusOr< std::string >)> MessageDataCallbackType
Definition: transport_stream_receiver.h:39
grpc_binder::StreamIdentifier
int StreamIdentifier
Definition: transport_stream_receiver.h:30
grpc_binder::TransportStreamReceiverImpl::ABSL_GUARDED_BY
std::map< StreamIdentifier, std::queue< absl::StatusOr< Metadata > > > pending_initial_metadata_ ABSL_GUARDED_BY(m_)
grpc_core::Mutex
Definition: src/core/lib/gprpp/sync.h:61
grpc_binder::TransportStreamReceiverImpl::message_cbs_
std::map< StreamIdentifier, MessageDataCallbackType > message_cbs_
Definition: transport_stream_receiver_impl.h:69
grpc_binder::TransportStreamReceiverImpl::CancelStream
void CancelStream(StreamIdentifier id) override
Definition: transport_stream_receiver_impl.cc:243
absl::Status
Definition: third_party/abseil-cpp/absl/status/status.h:424
transport_stream_receiver.h
grpc_binder::TransportStreamReceiverImpl::CancelInitialMetadataCallback
void CancelInitialMetadataCallback(StreamIdentifier id, absl::Status error)
Definition: transport_stream_receiver_impl.cc:183
absl::StatusOr
Definition: abseil-cpp/absl/status/statusor.h:187
grpc_binder::TransportStreamReceiver
Definition: transport_stream_receiver.h:32
function
std::function< bool(GrpcTool *, int, const char **, const CliCredentials &, GrpcToolOutputCallback)> function
Definition: grpc_tool.cc:250
grpc_binder::TransportStreamReceiverImpl::RegisterRecvTrailingMetadata
void RegisterRecvTrailingMetadata(StreamIdentifier id, TrailingMetadataCallbackType cb) override
Definition: transport_stream_receiver_impl.cc:95
grpc_binder::TransportStreamReceiverImpl
Definition: transport_stream_receiver_impl.h:33
sync.h
cb
OPENSSL_EXPORT pem_password_cb * cb
Definition: pem.h:351
grpc_binder::TransportStreamReceiverImpl::RegisterRecvMessage
void RegisterRecvMessage(StreamIdentifier id, MessageDataCallbackType cb) override
Definition: transport_stream_receiver_impl.cc:61
port_platform.h


grpc
Author(s):
autogenerated on Fri May 16 2025 03:00:40