transport_stream_receiver_impl.cc
Go to the documentation of this file.
1 // Copyright 2021 gRPC authors.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
16 
18 
19 #ifndef GRPC_NO_BINDER
20 
21 #include <functional>
22 #include <string>
23 #include <utility>
24 
25 #include <grpc/support/log.h>
26 
27 namespace grpc_binder {
28 
31  "grpc-binder-transport: cancelled gracefully";
32 
35  gpr_log(GPR_INFO, "%s id = %d is_client = %d", __func__, id, is_client_);
36  absl::StatusOr<Metadata> initial_metadata{};
37  {
39  GPR_ASSERT(initial_metadata_cbs_.count(id) == 0);
40  auto iter = pending_initial_metadata_.find(id);
41  if (iter == pending_initial_metadata_.end()) {
42  if (trailing_metadata_recvd_.count(id)) {
44  } else {
46  }
47  cb = nullptr;
48  } else {
49  initial_metadata = std::move(iter->second.front());
50  iter->second.pop();
51  if (iter->second.empty()) {
52  pending_initial_metadata_.erase(iter);
53  }
54  }
55  }
56  if (cb != nullptr) {
57  cb(std::move(initial_metadata));
58  }
59 }
60 
63  gpr_log(GPR_INFO, "%s id = %d is_client = %d", __func__, id, is_client_);
65  {
67  GPR_ASSERT(message_cbs_.count(id) == 0);
68  auto iter = pending_message_.find(id);
69  if (iter == pending_message_.end()) {
70  // If we'd already received trailing-metadata and there's no pending
71  // messages, cancel the callback.
72  if (trailing_metadata_recvd_.count(id)) {
75  } else {
77  }
78  cb = nullptr;
79  } else {
80  // We'll still keep all pending messages received before the trailing
81  // metadata since they're issued before the end of stream, as promised by
82  // WireReader which keeps transactions commit in-order.
83  message = std::move(iter->second.front());
84  iter->second.pop();
85  if (iter->second.empty()) {
86  pending_message_.erase(iter);
87  }
88  }
89  }
90  if (cb != nullptr) {
92  }
93 }
94 
97  gpr_log(GPR_INFO, "%s id = %d is_client = %d", __func__, id, is_client_);
98  std::pair<absl::StatusOr<Metadata>, int> trailing_metadata{};
99  {
101  GPR_ASSERT(trailing_metadata_cbs_.count(id) == 0);
102  auto iter = pending_trailing_metadata_.find(id);
103  if (iter == pending_trailing_metadata_.end()) {
105  cb = nullptr;
106  } else {
107  trailing_metadata = std::move(iter->second.front());
108  iter->second.pop();
109  if (iter->second.empty()) {
110  pending_trailing_metadata_.erase(iter);
111  }
112  }
113  }
114  if (cb != nullptr) {
116  }
117 }
118 
120  StreamIdentifier id, absl::StatusOr<Metadata> initial_metadata) {
121  gpr_log(GPR_INFO, "%s id = %d is_client = %d", __func__, id, is_client_);
122  if (!is_client_ && accept_stream_callback_ && initial_metadata.ok()) {
124  }
126  {
128  auto iter = initial_metadata_cbs_.find(id);
129  if (iter != initial_metadata_cbs_.end()) {
130  cb = iter->second;
132  } else {
133  pending_initial_metadata_[id].push(std::move(initial_metadata));
134  return;
135  }
136  }
137  cb(std::move(initial_metadata));
138 }
139 
142  gpr_log(GPR_INFO, "%s id = %d is_client = %d", __func__, id, is_client_);
144  {
146  auto iter = message_cbs_.find(id);
147  if (iter != message_cbs_.end()) {
148  cb = iter->second;
149  message_cbs_.erase(iter);
150  } else {
151  pending_message_[id].push(std::move(message));
152  return;
153  }
154  }
155  cb(std::move(message));
156 }
157 
160  int status) {
161  // Trailing metadata mark the end of the stream. Since TransportStreamReceiver
162  // assumes in-order commitments of transactions and that trailing metadata is
163  // parsed after message data, we can safely cancel all upcoming callbacks of
164  // recv_message.
165  gpr_log(GPR_INFO, "%s id = %d is_client = %d", __func__, id, is_client_);
168  {
170  auto iter = trailing_metadata_cbs_.find(id);
171  if (iter != trailing_metadata_cbs_.end()) {
172  cb = iter->second;
174  } else {
175  pending_trailing_metadata_[id].emplace(std::move(trailing_metadata),
176  status);
177  return;
178  }
179  }
181 }
182 
186  {
188  auto iter = initial_metadata_cbs_.find(id);
189  if (iter != initial_metadata_cbs_.end()) {
190  callback = std::move(iter->second);
192  }
193  }
194  if (callback != nullptr) {
196  }
197 }
198 
202  {
204  auto iter = message_cbs_.find(id);
205  if (iter != message_cbs_.end()) {
206  callback = std::move(iter->second);
207  message_cbs_.erase(iter);
208  }
209  }
210  if (callback != nullptr) {
212  }
213 }
214 
218  {
220  auto iter = trailing_metadata_cbs_.find(id);
221  if (iter != trailing_metadata_cbs_.end()) {
222  callback = std::move(iter->second);
224  }
225  }
226  if (callback != nullptr) {
227  std::move(callback)(error, 0);
228  }
229 }
230 
232  gpr_log(GPR_INFO, "%s id = %d is_client = %d", __func__, id, is_client_);
233  m_.Lock();
234  trailing_metadata_recvd_.insert(id);
235  m_.Unlock();
238  id,
241 }
242 
244  gpr_log(GPR_INFO, "%s id = %d is_client = %d", __func__, id, is_client_);
245  CancelInitialMetadataCallback(id, absl::CancelledError("Stream cancelled"));
246  CancelMessageCallback(id, absl::CancelledError("Stream cancelled"));
247  CancelTrailingMetadataCallback(id, absl::CancelledError("Stream cancelled"));
249  trailing_metadata_recvd_.erase(id);
250  pending_initial_metadata_.erase(id);
251  pending_message_.erase(id);
252  pending_trailing_metadata_.erase(id);
253 }
254 } // namespace grpc_binder
255 #endif
GPR_INFO
#define GPR_INFO
Definition: include/grpc/impl/codegen/log.h:56
log.h
grpc_binder::TransportStreamReceiverImpl::CancelTrailingMetadataCallback
void CancelTrailingMetadataCallback(StreamIdentifier id, absl::Status error)
Definition: transport_stream_receiver_impl.cc:215
grpc_core::MutexLock
Definition: src/core/lib/gprpp/sync.h:88
absl::CancelledError
Status CancelledError(absl::string_view message)
Definition: third_party/abseil-cpp/absl/status/status.cc:331
grpc_binder::TransportStreamReceiverImpl::initial_metadata_cbs_
std::map< StreamIdentifier, InitialMetadataCallbackType > initial_metadata_cbs_
Definition: transport_stream_receiver_impl.h:68
absl::string_view
Definition: abseil-cpp/absl/strings/string_view.h:167
grpc_binder
Definition: connection_id_generator.cc:45
error
grpc_error_handle error
Definition: retry_filter.cc:499
grpc_binder::TransportStreamReceiverImpl::accept_stream_callback_
std::function< void()> accept_stream_callback_
Definition: transport_stream_receiver_impl.h:108
status
absl::Status status
Definition: rls.cc:251
grpc_binder::TransportStreamReceiverImpl::m_
grpc_core::Mutex m_
Definition: transport_stream_receiver_impl.h:74
grpc_binder::TransportStreamReceiverImpl::is_client_
bool is_client_
Definition: transport_stream_receiver_impl.h:105
grpc_binder::TransportStreamReceiverImpl::OnRecvTrailingMetadata
void OnRecvTrailingMetadata(StreamIdentifier id)
Definition: transport_stream_receiver_impl.cc:231
grpc_binder::TransportStreamReceiver::InitialMetadataCallbackType
std::function< void(absl::StatusOr< Metadata >)> InitialMetadataCallbackType
Definition: transport_stream_receiver.h:37
message
char * message
Definition: libuv/docs/code/tty-gravity/main.c:12
grpc_binder::TransportStreamReceiverImpl::NotifyRecvTrailingMetadata
void NotifyRecvTrailingMetadata(StreamIdentifier id, absl::StatusOr< Metadata > trailing_metadata, int status) override
Definition: transport_stream_receiver_impl.cc:158
grpc_binder::TransportStreamReceiverImpl::CancelMessageCallback
void CancelMessageCallback(StreamIdentifier id, absl::Status error)
Definition: transport_stream_receiver_impl.cc:199
grpc_binder::TransportStreamReceiverImpl::NotifyRecvMessage
void NotifyRecvMessage(StreamIdentifier id, absl::StatusOr< std::string > message) override
Definition: transport_stream_receiver_impl.cc:140
grpc_binder::TransportStreamReceiver::TrailingMetadataCallbackType
std::function< void(absl::StatusOr< Metadata >, int)> TrailingMetadataCallbackType
Definition: transport_stream_receiver.h:41
grpc_status._async.trailing_metadata
trailing_metadata
Definition: grpcio_status/grpc_status/_async.py:36
absl::move
constexpr absl::remove_reference_t< T > && move(T &&t) noexcept
Definition: abseil-cpp/absl/utility/utility.h:221
GPR_ASSERT
#define GPR_ASSERT(x)
Definition: include/grpc/impl/codegen/log.h:94
grpc_core::Mutex::Lock
void Lock() ABSL_EXCLUSIVE_LOCK_FUNCTION()
Definition: src/core/lib/gprpp/sync.h:69
grpc_binder::TransportStreamReceiverImpl::RegisterRecvInitialMetadata
void RegisterRecvInitialMetadata(StreamIdentifier id, InitialMetadataCallbackType cb) override
Definition: transport_stream_receiver_impl.cc:33
gpr_log
GPRAPI void gpr_log(const char *file, int line, gpr_log_severity severity, const char *format,...) GPR_PRINT_FORMAT_CHECK(4
grpc_binder::TransportStreamReceiverImpl::NotifyRecvInitialMetadata
void NotifyRecvInitialMetadata(StreamIdentifier id, absl::StatusOr< Metadata > initial_metadata) override
Definition: transport_stream_receiver_impl.cc:119
grpc_binder::TransportStreamReceiver::kGrpcBinderTransportCancelledGracefully
static const absl::string_view kGrpcBinderTransportCancelledGracefully
Definition: transport_stream_receiver.h:66
grpc_binder::TransportStreamReceiverImpl::trailing_metadata_cbs_
std::map< StreamIdentifier, TrailingMetadataCallbackType > trailing_metadata_cbs_
Definition: transport_stream_receiver_impl.h:71
callback
static void callback(void *arg, int status, int timeouts, struct hostent *host)
Definition: acountry.c:224
grpc_binder::TransportStreamReceiver::MessageDataCallbackType
std::function< void(absl::StatusOr< std::string >)> MessageDataCallbackType
Definition: transport_stream_receiver.h:39
grpc_binder::StreamIdentifier
int StreamIdentifier
Definition: transport_stream_receiver.h:30
absl::StatusOr::ok
ABSL_MUST_USE_RESULT bool ok() const
Definition: abseil-cpp/absl/status/statusor.h:491
grpc_core::Mutex::Unlock
void Unlock() ABSL_UNLOCK_FUNCTION()
Definition: src/core/lib/gprpp/sync.h:70
grpc_binder::TransportStreamReceiverImpl::message_cbs_
std::map< StreamIdentifier, MessageDataCallbackType > message_cbs_
Definition: transport_stream_receiver_impl.h:69
grpc_binder::TransportStreamReceiverImpl::CancelStream
void CancelStream(StreamIdentifier id) override
Definition: transport_stream_receiver_impl.cc:243
absl::Status
Definition: third_party/abseil-cpp/absl/status/status.h:424
grpc_binder::TransportStreamReceiverImpl::CancelInitialMetadataCallback
void CancelInitialMetadataCallback(StreamIdentifier id, absl::Status error)
Definition: transport_stream_receiver_impl.cc:183
iter
Definition: test_winkernel.cpp:47
absl::StatusOr
Definition: abseil-cpp/absl/status/statusor.h:187
grpc_binder::TransportStreamReceiverImpl::RegisterRecvTrailingMetadata
void RegisterRecvTrailingMetadata(StreamIdentifier id, TrailingMetadataCallbackType cb) override
Definition: transport_stream_receiver_impl.cc:95
cb
OPENSSL_EXPORT pem_password_cb * cb
Definition: pem.h:351
grpc_binder::TransportStreamReceiverImpl::RegisterRecvMessage
void RegisterRecvMessage(StreamIdentifier id, MessageDataCallbackType cb) override
Definition: transport_stream_receiver_impl.cc:61
id
uint32_t id
Definition: flow_control_fuzzer.cc:70
transport_stream_receiver_impl.h
port_platform.h


grpc
Author(s):
autogenerated on Fri May 16 2025 03:00:40