30 #include "absl/container/inlined_vector.h"
31 #include "absl/status/statusor.h"
32 #include "absl/strings/str_cat.h"
33 #include "absl/strings/string_view.h"
34 #include "absl/strings/strip.h"
35 #include "absl/types/optional.h"
36 #include "absl/utility/utility.h"
124 #define DEFAULT_PER_RPC_RETRY_BUFFER_SIZE (256 << 10)
128 #define RETRY_BACKOFF_JITTER 0.2
134 using internal::RetryGlobalConfig;
135 using internal::RetryMethodConfig;
136 using internal::RetryServiceConfigParser;
137 using internal::ServerRetryThrottleData;
139 TraceFlag grpc_retry_trace(
false,
"retry");
154 new (
elem->channel_data) RetryFilter(
args->channel_args, &
error);
159 auto* chand =
static_cast<RetryFilter*
>(
elem->channel_data);
160 chand->~RetryFilter();
183 auto* service_config = grpc_channel_args_find_pointer<ServiceConfig>(
185 if (service_config ==
nullptr)
return;
186 const auto*
config =
static_cast<const RetryGlobalConfig*
>(
187 service_config->GetGlobalParsedConfig(
188 RetryServiceConfigParser::ParserIndex()));
189 if (
config ==
nullptr)
return;
195 "server URI channel arg missing or wrong type in client channel "
200 if (!uri.
ok() || uri->path().empty()) {
202 "could not extract server name from target URI");
210 config->milli_token_ratio());
213 const RetryMethodConfig* GetRetryPolicy(
226 class RetryFilter::CallData {
233 static void StartTransportStreamOpBatch(
238 class CallStackDestructionBarrier;
241 struct PendingBatch {
249 class CallAttempt :
public RefCounted<CallAttempt> {
251 CallAttempt(CallData* calld,
bool is_transparent_retry);
252 ~CallAttempt()
override;
258 void StartRetriableBatches();
262 void FreeCachedSendOpDataAfterCommit();
274 :
public RefCounted<BatchData, PolymorphicRefCount, kUnrefCallDtor> {
276 BatchData(RefCountedPtr<CallAttempt> call_attempt,
int refcount,
277 bool set_on_complete);
278 ~BatchData()
override;
283 void AddRetriableSendInitialMetadataOp();
285 void AddRetriableSendMessageOp();
287 void AddRetriableSendTrailingMetadataOp();
289 void AddRetriableRecvInitialMetadataOp();
291 void AddRetriableRecvMessageOp();
293 void AddRetriableRecvTrailingMetadataOp();
301 void FreeCachedSendOpDataForCompletedBatch();
305 void MaybeAddClosureForRecvInitialMetadataCallback(
313 void MaybeAddClosureForRecvMessageCallback(
321 void MaybeAddClosureForRecvTrailingMetadataReady(
325 void AddClosuresForDeferredCompletionCallbacks(
326 CallCombinerClosureList* closures);
329 void AddClosuresToFailUnstartedPendingBatches(
339 void AddClosuresForCompletedPendingBatch(
344 void AddClosuresForReplayOrPendingSendOps(
345 CallCombinerClosureList* closures);
362 class AttemptDispatchController
363 :
public ConfigSelector::CallDispatchController {
365 explicit AttemptDispatchController(CallAttempt* call_attempt)
369 bool ShouldRetry()
override {
return false; }
371 void Commit()
override {
374 if (calld->retry_committed_) {
375 auto* service_config_call_data =
376 static_cast<ClientChannelServiceConfigCallData*
>(
379 service_config_call_data->call_dispatch_controller()->Commit();
391 BatchData* CreateBatch(
int refcount,
bool set_on_complete) {
399 BatchData* MaybeCreateBatchForReplay();
404 CallCombinerClosureList* closures);
410 void AddBatchForInternalRecvTrailingMetadata(
411 CallCombinerClosureList* closures);
416 CallCombinerClosureList* closures);
419 void AddBatchesForPendingBatches(CallCombinerClosureList* closures);
422 void AddRetriableBatches(CallCombinerClosureList* closures);
426 bool PendingBatchContainsUnstartedSendOps(PendingBatch*
pending);
429 bool HaveSendOpsToReplay();
434 void MaybeSwitchToFastPath();
445 void MaybeCancelPerAttemptRecvTimer();
449 OrphanablePtr<ClientChannel::LoadBalancedCall>
lb_call_;
494 struct OnCompleteDeferredBatch {
495 OnCompleteDeferredBatch(RefCountedPtr<BatchData>
batch,
498 RefCountedPtr<BatchData>
batch;
522 void PendingBatchClear(PendingBatch*
pending);
523 void MaybeClearPendingBatch(PendingBatch*
pending);
524 static void FailPendingBatchInCallCombiner(
void*
arg,
530 template <
typename Predicate>
531 PendingBatch* PendingBatchFind(
const char* log_message, Predicate predicate);
535 void MaybeCacheSendOpsForBatch(PendingBatch*
pending);
536 void FreeCachedSendInitialMetadata();
538 void FreeCachedSendMessage(
size_t idx);
539 void FreeCachedSendTrailingMetadata();
540 void FreeAllCachedSendOpData();
543 void RetryCommit(CallAttempt* call_attempt);
553 void AddClosureToStartTransparentRetry(CallCombinerClosureList* closures);
556 OrphanablePtr<ClientChannel::LoadBalancedCall> CreateLoadBalancedCall(
557 ConfigSelector::CallDispatchController* call_dispatch_controller,
558 bool is_transparent_retry);
560 void CreateCallAttempt(
bool is_transparent_retry);
630 struct CachedSendMessage {
656 class RetryFilter::CallData::CallStackDestructionBarrier
657 :
public RefCounted<CallStackDestructionBarrier, PolymorphicRefCount,
660 CallStackDestructionBarrier() {}
662 ~CallStackDestructionBarrier()
override {
669 void set_on_call_stack_destruction(
grpc_closure* on_call_stack_destruction) {
674 grpc_closure* MakeLbCallDestructionClosure(CallData* calld) {
679 OnLbCallDestructionComplete,
this,
nullptr);
680 return on_lb_call_destruction_complete;
684 static void OnLbCallDestructionComplete(
void*
arg,
686 auto*
self =
static_cast<CallStackDestructionBarrier*
>(
arg);
697 RetryFilter::CallData::CallAttempt::CallAttempt(CallData* calld,
698 bool is_transparent_retry)
716 is_transparent_retry);
719 "chand=%p calld=%p attempt=%p: created attempt, lb_call=%p",
720 calld->chand_, calld,
this,
lb_call_.get());
723 if (calld->retry_policy_ !=
nullptr &&
724 calld->retry_policy_->per_attempt_recv_timeout().has_value()) {
727 *calld->retry_policy_->per_attempt_recv_timeout();
730 "chand=%p calld=%p attempt=%p: per-attempt timeout in %" PRId64
732 calld->chand_, calld,
this,
733 calld->retry_policy_->per_attempt_recv_timeout()->millis());
746 RetryFilter::CallData::CallAttempt::~CallAttempt() {
748 gpr_log(
GPR_INFO,
"chand=%p calld=%p attempt=%p: destroying call attempt",
753 void RetryFilter::CallData::CallAttempt::FreeCachedSendOpDataAfterCommit() {
759 calld_->FreeCachedSendInitialMetadata();
762 calld_->FreeCachedSendMessage(
i);
765 calld_->FreeCachedSendTrailingMetadata();
769 bool RetryFilter::CallData::CallAttempt::PendingBatchContainsUnstartedSendOps(
771 if (
pending->batch->on_complete ==
nullptr)
return false;
772 if (
pending->batch->send_initial_metadata &&
776 if (
pending->batch->send_message &&
780 if (
pending->batch->send_trailing_metadata &&
787 bool RetryFilter::CallData::CallAttempt::HaveSendOpsToReplay() {
792 (
calld_->seen_send_trailing_metadata_ &&
796 void RetryFilter::CallData::CallAttempt::MaybeSwitchToFastPath() {
802 if (!
calld_->retry_committed_)
return;
804 if (
calld_->committed_call_ !=
nullptr)
return;
808 if (HaveSendOpsToReplay())
return;
815 "chand=%p calld=%p attempt=%p: retry state no longer needed; "
816 "moving LB call to parent and unreffing the call attempt",
826 RetryFilter::CallData::CallAttempt::BatchData*
827 RetryFilter::CallData::CallAttempt::MaybeCreateBatchForReplay() {
828 BatchData* replay_batch_data =
nullptr;
831 !
calld_->pending_send_initial_metadata_) {
834 "chand=%p calld=%p attempt=%p: replaying previously completed "
835 "send_initial_metadata op",
838 replay_batch_data = CreateBatch(1,
true );
839 replay_batch_data->AddRetriableSendInitialMetadataOp();
843 if (started_send_message_count_ < calld_->
send_messages_.size() &&
845 !
calld_->pending_send_message_) {
848 "chand=%p calld=%p attempt=%p: replaying previously completed "
852 if (replay_batch_data ==
nullptr) {
853 replay_batch_data = CreateBatch(1,
true );
855 replay_batch_data->AddRetriableSendMessageOp();
861 if (
calld_->seen_send_trailing_metadata_ &&
864 !
calld_->pending_send_trailing_metadata_) {
867 "chand=%p calld=%p attempt=%p: replaying previously completed "
868 "send_trailing_metadata op",
871 if (replay_batch_data ==
nullptr) {
872 replay_batch_data = CreateBatch(1,
true );
874 replay_batch_data->AddRetriableSendTrailingMetadataOp();
876 return replay_batch_data;
884 auto* lb_call =
static_cast<ClientChannel::LoadBalancedCall*
>(
887 lb_call->StartTransportStreamOpBatch(
batch);
892 void RetryFilter::CallData::CallAttempt::AddClosureForBatch(
894 CallCombinerClosureList* closures) {
896 gpr_log(
GPR_INFO,
"chand=%p calld=%p attempt=%p: adding batch (%s): %s",
902 batch, grpc_schedule_on_exec_ctx);
906 void RetryFilter::CallData::CallAttempt::
907 AddBatchForInternalRecvTrailingMetadata(CallCombinerClosureList* closures) {
910 "chand=%p calld=%p attempt=%p: call failed but "
911 "recv_trailing_metadata not started; starting it internally",
918 BatchData* batch_data = CreateBatch(2,
false );
919 batch_data->AddRetriableRecvTrailingMetadataOp();
921 AddClosureForBatch(batch_data->batch(),
922 "starting internal recv_trailing_metadata", closures);
925 void RetryFilter::CallData::CallAttempt::MaybeAddBatchForCancelOp(
932 BatchData* cancel_batch_data = CreateBatch(1,
true);
933 cancel_batch_data->AddCancelStreamOp(
error);
934 AddClosureForBatch(cancel_batch_data->batch(),
935 "start cancellation batch on call attempt", closures);
938 void RetryFilter::CallData::CallAttempt::AddBatchesForPendingBatches(
939 CallCombinerClosureList* closures) {
943 if (
batch ==
nullptr)
continue;
944 bool has_send_ops =
false;
969 (
calld_->send_messages_.size() + !
pending->send_ops_cached)) {
979 calld_->send_messages_.size() ||
985 int num_callbacks = has_send_ops;
1013 "re-executing recv_trailing_metadata_ready to propagate "
1014 "internally triggered result");
1020 "internally started recv_trailing_metadata batch pending and "
1021 "recv_trailing_metadata started from surface");
1029 if (num_callbacks == 0)
continue;
1043 if (
calld_->retry_committed_ && !
pending->send_ops_cached &&
1047 "start non-replayable pending batch on call attempt after commit",
1053 BatchData* batch_data =
1054 CreateBatch(num_callbacks, has_send_ops );
1059 batch_data->AddRetriableSendInitialMetadataOp();
1063 batch_data->AddRetriableSendMessageOp();
1067 batch_data->AddRetriableSendTrailingMetadataOp();
1073 batch_data->AddRetriableRecvInitialMetadataOp();
1077 batch_data->AddRetriableRecvMessageOp();
1081 batch_data->AddRetriableRecvTrailingMetadataOp();
1083 AddClosureForBatch(batch_data->batch(),
1084 "start replayable pending batch on call attempt",
1089 void RetryFilter::CallData::CallAttempt::AddRetriableBatches(
1090 CallCombinerClosureList* closures) {
1092 BatchData* replay_batch_data = MaybeCreateBatchForReplay();
1093 if (replay_batch_data !=
nullptr) {
1094 AddClosureForBatch(replay_batch_data->batch(),
1095 "start replay batch on call attempt", closures);
1098 AddBatchesForPendingBatches(closures);
1101 void RetryFilter::CallData::CallAttempt::StartRetriableBatches() {
1104 "chand=%p calld=%p attempt=%p: constructing retriable batches",
1108 CallCombinerClosureList closures;
1109 AddRetriableBatches(&closures);
1114 "chand=%p calld=%p attempt=%p: starting %" PRIuPTR
1115 " retriable batches on lb_call=%p",
1118 closures.RunClosures(
calld_->call_combiner_);
1121 void RetryFilter::CallData::CallAttempt::CancelFromSurface(
1123 MaybeCancelPerAttemptRecvTimer();
1126 lb_call_->StartTransportStreamOpBatch(cancel_batch);
1129 bool RetryFilter::CallData::CallAttempt::ShouldRetry(
1133 if (
calld_->retry_policy_ ==
nullptr)
return false;
1135 if (
status.has_value()) {
1137 if (
calld_->retry_throttle_data_ !=
nullptr) {
1138 calld_->retry_throttle_data_->RecordSuccess();
1147 if (!
calld_->retry_policy_->retryable_status_codes().Contains(*
status)) {
1150 "chand=%p calld=%p attempt=%p: status %s not configured as "
1165 if (
calld_->retry_throttle_data_ !=
nullptr &&
1166 !
calld_->retry_throttle_data_->RecordFailure()) {
1174 if (
calld_->retry_committed_) {
1177 "chand=%p calld=%p attempt=%p: retries already committed",
1183 ++
calld_->num_attempts_completed_;
1184 if (
calld_->num_attempts_completed_ >=
1185 calld_->retry_policy_->max_attempts()) {
1188 GPR_INFO,
"chand=%p calld=%p attempt=%p: exceeded %d retry attempts",
1195 if (*server_pushback < Duration::Zero()) {
1198 "chand=%p calld=%p attempt=%p: not retrying due to server "
1207 "chand=%p calld=%p attempt=%p: server push-back: retry in %" PRIu64
1209 calld_->chand_,
calld_,
this, server_pushback->millis());
1214 auto* service_config_call_data =
1215 static_cast<ClientChannelServiceConfigCallData*
>(
1217 if (!service_config_call_data->call_dispatch_controller()->ShouldRetry()) {
1221 "chand=%p calld=%p attempt=%p: call dispatch controller denied retry",
1230 void RetryFilter::CallData::CallAttempt::Abandon() {
1238 "unref internal recv_trailing_metadata_ready batch; attempt abandoned");
1244 "unref deferred recv_initial_metadata_ready batch; attempt abandoned");
1249 "unref deferred recv_message_ready batch; attempt abandoned");
1253 on_complete_deferred_batch.batch.reset(
1254 DEBUG_LOCATION,
"unref deferred on_complete batch; attempt abandoned");
1260 void RetryFilter::CallData::CallAttempt::OnPerAttemptRecvTimer(
1262 auto* call_attempt =
static_cast<CallAttempt*
>(
arg);
1264 OnPerAttemptRecvTimerLocked, call_attempt,
nullptr);
1266 &call_attempt->on_per_attempt_recv_timer_,
1270 void RetryFilter::CallData::CallAttempt::OnPerAttemptRecvTimerLocked(
1272 auto* call_attempt =
static_cast<CallAttempt*
>(
arg);
1273 auto* calld = call_attempt->calld_;
1276 "chand=%p calld=%p attempt=%p: perAttemptRecvTimeout timer fired: "
1277 "error=%s, per_attempt_recv_timer_pending_=%d",
1278 calld->chand_, calld, call_attempt,
1280 call_attempt->per_attempt_recv_timer_pending_);
1282 CallCombinerClosureList closures;
1284 call_attempt->per_attempt_recv_timer_pending_) {
1285 call_attempt->per_attempt_recv_timer_pending_ =
false;
1289 call_attempt->MaybeAddBatchForCancelOp(
1291 "retry perAttemptRecvTimeout exceeded"),
1295 if (call_attempt->ShouldRetry(absl::nullopt,
1298 call_attempt->Abandon();
1300 calld->StartRetryTimer(absl::nullopt);
1303 calld->RetryCommit(call_attempt);
1306 call_attempt->MaybeSwitchToFastPath();
1309 closures.RunClosures(calld->call_combiner_);
1314 void RetryFilter::CallData::CallAttempt::MaybeCancelPerAttemptRecvTimer() {
1318 "chand=%p calld=%p attempt=%p: cancelling "
1319 "perAttemptRecvTimeout timer",
1331 RetryFilter::CallData::CallAttempt::BatchData::BatchData(
1332 RefCountedPtr<CallAttempt> attempt,
int refcount,
bool set_on_complete)
1351 if (set_on_complete) {
1357 RetryFilter::CallData::CallAttempt::BatchData::~BatchData() {
1359 gpr_log(
GPR_INFO,
"chand=%p calld=%p attempt=%p: destroying batch %p",
1367 void RetryFilter::CallData::CallAttempt::BatchData::
1368 FreeCachedSendOpDataForCompletedBatch() {
1375 calld->FreeCachedSendInitialMetadata();
1378 calld->FreeCachedSendMessage(
call_attempt_->completed_send_message_count_ -
1382 calld->FreeCachedSendTrailingMetadata();
1390 void RetryFilter::CallData::CallAttempt::BatchData::
1391 MaybeAddClosureForRecvInitialMetadataCallback(
1395 "invoking recv_initial_metadata_ready for",
1399 .recv_initial_metadata_ready !=
nullptr;
1406 *
pending->batch->payload->recv_initial_metadata.recv_initial_metadata =
1409 *
pending->batch->payload->recv_initial_metadata.trailing_metadata_available =
1415 pending->batch->payload->recv_initial_metadata
1416 .recv_initial_metadata_ready;
1417 pending->batch->payload->recv_initial_metadata.recv_initial_metadata_ready =
1422 "recv_initial_metadata_ready for pending batch");
1425 void RetryFilter::CallData::CallAttempt::BatchData::RecvInitialMetadataReady(
1427 RefCountedPtr<BatchData> batch_data(
static_cast<BatchData*
>(
arg));
1428 CallAttempt* call_attempt = batch_data->call_attempt_.get();
1429 CallData* calld = call_attempt->calld_;
1432 "chand=%p calld=%p attempt=%p batch_data=%p: "
1433 "got recv_initial_metadata_ready, error=%s",
1434 calld->chand_, calld, call_attempt, batch_data.get(),
1437 call_attempt->completed_recv_initial_metadata_ =
true;
1440 if (call_attempt->abandoned_) {
1442 calld->call_combiner_,
1443 "recv_initial_metadata_ready for abandoned attempt");
1447 call_attempt->MaybeCancelPerAttemptRecvTimer();
1449 if (!calld->retry_committed_) {
1454 if (
GPR_UNLIKELY((call_attempt->trailing_metadata_available_ ||
1456 !call_attempt->completed_recv_trailing_metadata_)) {
1459 "chand=%p calld=%p attempt=%p: deferring "
1460 "recv_initial_metadata_ready (Trailers-Only)",
1461 calld->chand_, calld, call_attempt);
1463 call_attempt->recv_initial_metadata_ready_deferred_batch_ =
1466 CallCombinerClosureList closures;
1471 if (!call_attempt->started_recv_trailing_metadata_) {
1474 call_attempt->AddBatchForInternalRecvTrailingMetadata(&closures);
1476 closures.RunClosures(calld->call_combiner_);
1480 calld->RetryCommit(call_attempt);
1483 call_attempt->MaybeSwitchToFastPath();
1486 CallCombinerClosureList closures;
1487 batch_data->MaybeAddClosureForRecvInitialMetadataCallback(
1489 closures.RunClosures(calld->call_combiner_);
1496 void RetryFilter::CallData::CallAttempt::BatchData::
1498 CallCombinerClosureList* closures) {
1501 "invoking recv_message_ready for",
1511 *
pending->batch->payload->recv_message.recv_message =
1513 *
pending->batch->payload->recv_message.flags =
1519 pending->batch->payload->recv_message.recv_message_ready;
1520 pending->batch->payload->recv_message.recv_message_ready =
nullptr;
1524 "recv_message_ready for pending batch");
1527 void RetryFilter::CallData::CallAttempt::BatchData::RecvMessageReady(
1529 RefCountedPtr<BatchData> batch_data(
static_cast<BatchData*
>(
arg));
1530 CallAttempt* call_attempt = batch_data->call_attempt_.get();
1531 CallData* calld = call_attempt->calld_;
1534 "chand=%p calld=%p attempt=%p batch_data=%p: "
1535 "got recv_message_ready, error=%s",
1536 calld->chand_, calld, call_attempt, batch_data.get(),
1539 ++call_attempt->completed_recv_message_count_;
1542 if (call_attempt->abandoned_) {
1546 call_attempt->recv_message_.reset();
1548 "recv_message_ready for abandoned attempt");
1552 call_attempt->MaybeCancelPerAttemptRecvTimer();
1554 if (!calld->retry_committed_) {
1559 if (
GPR_UNLIKELY((!call_attempt->recv_message_.has_value() ||
1561 !call_attempt->completed_recv_trailing_metadata_)) {
1564 "chand=%p calld=%p attempt=%p: deferring recv_message_ready "
1565 "(nullptr message and recv_trailing_metadata pending)",
1566 calld->chand_, calld, call_attempt);
1568 call_attempt->recv_message_ready_deferred_batch_ =
std::move(batch_data);
1570 CallCombinerClosureList closures;
1575 if (!call_attempt->started_recv_trailing_metadata_) {
1578 call_attempt->AddBatchForInternalRecvTrailingMetadata(&closures);
1580 closures.RunClosures(calld->call_combiner_);
1584 calld->RetryCommit(call_attempt);
1587 call_attempt->MaybeSwitchToFastPath();
1590 CallCombinerClosureList closures;
1593 closures.RunClosures(calld->call_combiner_);
1617 *
status = *md_batch->
get(GrpcStatusMetadata());
1619 *server_pushback = md_batch->
get(GrpcRetryPushbackMsMetadata());
1620 *stream_network_state = md_batch->
get(GrpcStreamNetworkState());
1626 void RetryFilter::CallData::CallAttempt::BatchData::
1627 MaybeAddClosureForRecvTrailingMetadataReady(
1631 PendingBatch*
pending = calld->PendingBatchFind(
1632 "invoking recv_trailing_metadata_ready for",
1636 .recv_trailing_metadata_ready !=
nullptr;
1648 pending->batch->payload->recv_trailing_metadata.collect_stats);
1650 *
pending->batch->payload->recv_trailing_metadata.recv_trailing_metadata =
1653 closures->Add(
pending->batch->payload->recv_trailing_metadata
1654 .recv_trailing_metadata_ready,
1655 error,
"recv_trailing_metadata_ready for pending batch");
1657 pending->batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready =
1659 calld->MaybeClearPendingBatch(
pending);
1662 void RetryFilter::CallData::CallAttempt::BatchData::
1663 AddClosuresForDeferredCompletionCallbacks(
1664 CallCombinerClosureList* closures) {
1668 MaybeAddClosureForRecvInitialMetadataCallback(
1670 call_attempt_->recv_initial_metadata_ready_deferred_batch_.reset(
1671 DEBUG_LOCATION,
"resuming deferred recv_initial_metadata_ready");
1677 MaybeAddClosureForRecvMessageCallback(
call_attempt_->recv_message_error_,
1684 for (
auto& on_complete_deferred_batch :
1686 closures->Add(&on_complete_deferred_batch.batch->on_complete_,
1687 on_complete_deferred_batch.error,
"resuming on_complete");
1688 on_complete_deferred_batch.batch.release();
1693 void RetryFilter::CallData::CallAttempt::BatchData::
1694 AddClosuresToFailUnstartedPendingBatches(
1698 PendingBatch*
pending = &calld->pending_batches_[
i];
1699 if (
pending->batch ==
nullptr)
continue;
1702 "failing on_complete for pending batch");
1703 pending->batch->on_complete =
nullptr;
1704 calld->MaybeClearPendingBatch(
pending);
1710 void RetryFilter::CallData::CallAttempt::BatchData::RunClosuresForCompletedCall(
1713 CallCombinerClosureList closures;
1717 AddClosuresForDeferredCompletionCallbacks(&closures);
1722 closures.RunClosures(
call_attempt_->calld_->call_combiner_);
1726 void RetryFilter::CallData::CallAttempt::BatchData::RecvTrailingMetadataReady(
1733 RefCountedPtr<grpc_call_stack> owning_call_stack =
1735 RefCountedPtr<BatchData> batch_data(
static_cast<BatchData*
>(
arg));
1736 CallAttempt* call_attempt = batch_data->call_attempt_.get();
1737 CallData* calld = call_attempt->calld_;
1740 "chand=%p calld=%p attempt=%p batch_data=%p: "
1741 "got recv_trailing_metadata_ready, error=%s",
1742 calld->chand_, calld, call_attempt, batch_data.get(),
1745 call_attempt->completed_recv_trailing_metadata_ =
true;
1748 if (call_attempt->abandoned_) {
1750 calld->call_combiner_,
1751 "recv_trailing_metadata_ready for abandoned attempt");
1755 call_attempt->MaybeCancelPerAttemptRecvTimer();
1759 bool is_lb_drop =
false;
1762 batch_data->batch_.payload->recv_trailing_metadata.recv_trailing_metadata;
1764 &server_pushback, &is_lb_drop, &stream_network_state);
1767 "chand=%p calld=%p attempt=%p: call finished, status=%s "
1768 "server_pushback=%s is_lb_drop=%d stream_network_state=%s",
1769 calld->chand_, calld, call_attempt,
1771 server_pushback.
has_value() ? server_pushback->ToString().c_str()
1780 enum { kNoRetry, kTransparentRetry, kConfigurableRetry }
retry = kNoRetry;
1782 if (stream_network_state.
has_value() && !calld->retry_committed_) {
1785 if (*stream_network_state == GrpcStreamNetworkState::kNotSentOnWire) {
1786 retry = kTransparentRetry;
1787 }
else if (*stream_network_state ==
1788 GrpcStreamNetworkState::kNotSeenByServer &&
1789 !calld->sent_transparent_retry_not_seen_by_server_) {
1790 calld->sent_transparent_retry_not_seen_by_server_ =
true;
1791 retry = kTransparentRetry;
1795 if (
retry == kNoRetry &&
1796 call_attempt->ShouldRetry(
status, server_pushback)) {
1797 retry = kConfigurableRetry;
1800 if (
retry != kNoRetry) {
1801 CallCombinerClosureList closures;
1803 call_attempt->MaybeAddBatchForCancelOp(
1813 if (
retry == kTransparentRetry) {
1814 calld->AddClosureToStartTransparentRetry(&closures);
1816 calld->StartRetryTimer(server_pushback);
1819 call_attempt->Abandon();
1821 closures.RunClosures(calld->call_combiner_);
1826 calld->RetryCommit(call_attempt);
1829 call_attempt->MaybeSwitchToFastPath();
1838 void RetryFilter::CallData::CallAttempt::BatchData::
1840 CallCombinerClosureList* closures) {
1842 PendingBatch*
pending = calld->PendingBatchFind(
1859 pending->batch->payload->send_message.stream_write_closed =
1864 "on_complete for pending batch");
1865 pending->batch->on_complete =
nullptr;
1866 calld->MaybeClearPendingBatch(
pending);
1869 void RetryFilter::CallData::CallAttempt::BatchData::
1870 AddClosuresForReplayOrPendingSendOps(CallCombinerClosureList* closures) {
1872 bool have_pending_send_ops =
call_attempt_->HaveSendOpsToReplay();
1876 if (!have_pending_send_ops) {
1878 PendingBatch*
pending = &calld->pending_batches_[
i];
1880 if (
batch ==
nullptr ||
pending->send_ops_cached)
continue;
1882 have_pending_send_ops =
true;
1887 if (have_pending_send_ops) {
1890 "chand=%p calld=%p attempt=%p: starting next batch for pending "
1898 void RetryFilter::CallData::CallAttempt::BatchData::OnComplete(
1900 RefCountedPtr<BatchData> batch_data(
static_cast<BatchData*
>(
arg));
1901 CallAttempt* call_attempt = batch_data->call_attempt_.get();
1902 CallData* calld = call_attempt->calld_;
1905 "chand=%p calld=%p attempt=%p batch_data=%p: "
1906 "got on_complete, error=%s, batch=%s",
1907 calld->chand_, calld, call_attempt, batch_data.get(),
1913 if (call_attempt->abandoned_) {
1915 "on_complete for abandoned attempt");
1923 !call_attempt->completed_recv_trailing_metadata_)) {
1925 gpr_log(
GPR_INFO,
"chand=%p calld=%p attempt=%p: deferring on_complete",
1926 calld->chand_, calld, call_attempt);
1928 call_attempt->on_complete_deferred_batches_.emplace_back(
1930 CallCombinerClosureList closures;
1932 if (!call_attempt->started_recv_trailing_metadata_) {
1935 call_attempt->AddBatchForInternalRecvTrailingMetadata(&closures);
1937 closures.RunClosures(calld->call_combiner_);
1941 if (batch_data->batch_.send_initial_metadata) {
1942 call_attempt->completed_send_initial_metadata_ =
true;
1944 if (batch_data->batch_.send_message) {
1945 ++call_attempt->completed_send_message_count_;
1947 if (batch_data->batch_.send_trailing_metadata) {
1948 call_attempt->completed_send_trailing_metadata_ =
true;
1952 if (calld->retry_committed_) {
1953 batch_data->FreeCachedSendOpDataForCompletedBatch();
1956 CallCombinerClosureList closures;
1962 if (!call_attempt->completed_recv_trailing_metadata_) {
1963 batch_data->AddClosuresForReplayOrPendingSendOps(&closures);
1968 call_attempt->MaybeSwitchToFastPath();
1971 closures.RunClosures(calld->call_combiner_);
1974 void RetryFilter::CallData::CallAttempt::BatchData::OnCompleteForCancelOp(
1976 RefCountedPtr<BatchData> batch_data(
static_cast<BatchData*
>(
arg));
1977 CallAttempt* call_attempt = batch_data->call_attempt_.get();
1978 CallData* calld = call_attempt->calld_;
1981 "chand=%p calld=%p attempt=%p batch_data=%p: "
1982 "got on_complete for cancel_stream batch, error=%s, batch=%s",
1983 calld->chand_, calld, call_attempt, batch_data.get(),
1988 calld->call_combiner_,
1989 "on_complete for internally generated cancel_stream op");
1996 void RetryFilter::CallData::CallAttempt::BatchData::
1997 AddRetriableSendInitialMetadataOp() {
2005 call_attempt_->send_initial_metadata_ = calld->send_initial_metadata_.Copy();
2006 if (
GPR_UNLIKELY(calld->num_attempts_completed_ > 0)) {
2007 call_attempt_->send_initial_metadata_.Set(GrpcPreviousRpcAttemptsMetadata(),
2008 calld->num_attempts_completed_);
2011 GrpcPreviousRpcAttemptsMetadata());
2018 calld->send_initial_metadata_flags_;
2022 void RetryFilter::CallData::CallAttempt::BatchData::
2023 AddRetriableSendMessageOp() {
2028 "chand=%p calld=%p attempt=%p: starting calld->send_messages[%" PRIuPTR
2033 CachedSendMessage cache =
2034 calld->send_messages_[
call_attempt_->started_send_message_count_];
2041 void RetryFilter::CallData::CallAttempt::BatchData::
2042 AddRetriableSendTrailingMetadataOp() {
2048 calld->send_trailing_metadata_.Copy();
2055 void RetryFilter::CallData::CallAttempt::BatchData::
2056 AddRetriableRecvInitialMetadataOp() {
2065 RecvInitialMetadataReady,
this, grpc_schedule_on_exec_ctx);
2070 void RetryFilter::CallData::CallAttempt::BatchData::
2071 AddRetriableRecvMessageOp() {
2078 grpc_schedule_on_exec_ctx);
2083 void RetryFilter::CallData::CallAttempt::BatchData::
2084 AddRetriableRecvTrailingMetadataOp() {
2093 RecvTrailingMetadataReady,
this, grpc_schedule_on_exec_ctx);
2098 void RetryFilter::CallData::CallAttempt::BatchData::AddCancelStreamOp(
2112 auto* chand =
static_cast<RetryFilter*
>(
elem->channel_data);
2113 new (
elem->call_data) CallData(chand, *
args);
2124 auto* calld =
static_cast<CallData*
>(
elem->call_data);
2127 RefCountedPtr<CallStackDestructionBarrier> call_stack_destruction_barrier =
2128 std::move(calld->call_stack_destruction_barrier_);
2134 call_stack_destruction_barrier->set_on_call_stack_destruction(
2135 then_schedule_closure);
2138 void RetryFilter::CallData::StartTransportStreamOpBatch(
2140 auto* calld =
static_cast<CallData*
>(
elem->call_data);
2141 calld->StartTransportStreamOpBatch(
batch);
2146 auto* calld =
static_cast<CallData*
>(
elem->call_data);
2147 calld->pollent_ = pollent;
2154 const RetryMethodConfig* RetryFilter::GetRetryPolicy(
2156 if (
context ==
nullptr)
return nullptr;
2157 auto* svc_cfg_call_data =
static_cast<ServiceConfigCallData*
>(
2159 if (svc_cfg_call_data ==
nullptr)
return nullptr;
2160 return static_cast<const RetryMethodConfig*
>(
2164 RetryFilter::CallData::CallData(RetryFilter* chand,
2188 arena_->
New<CallStackDestructionBarrier>()),
2197 RetryFilter::CallData::~CallData() {
2198 FreeAllCachedSendOpData();
2207 void RetryFilter::CallData::StartTransportStreamOpBatch(
2211 gpr_log(
GPR_INFO,
"chand=%p calld=%p: batch started from surface: %s",
2263 FreeAllCachedSendOpData();
2278 "added pending batch while retry timer pending");
2303 "chand=%p calld=%p: retry committed before first attempt; "
2308 auto* service_config_call_data =
2309 static_cast<ClientChannelServiceConfigCallData*
>(
2312 service_config_call_data->call_dispatch_controller(),
2325 CreateCallAttempt(
false);
2336 OrphanablePtr<ClientChannel::LoadBalancedCall>
2337 RetryFilter::CallData::CreateLoadBalancedCall(
2338 ConfigSelector::CallDispatchController* call_dispatch_controller,
2339 bool is_transparent_retry) {
2343 return chand_->client_channel_->CreateLoadBalancedCall(
2348 call_dispatch_controller, is_transparent_retry);
2351 void RetryFilter::CallData::CreateCallAttempt(
bool is_transparent_retry) {
2352 call_attempt_ = MakeRefCounted<CallAttempt>(
this, is_transparent_retry);
2360 void RetryFilter::CallData::MaybeCacheSendOpsForBatch(PendingBatch*
pending) {
2361 if (
pending->send_ops_cached)
return;
2362 pending->send_ops_cached =
true;
2389 void RetryFilter::CallData::FreeCachedSendInitialMetadata() {
2391 gpr_log(
GPR_INFO,
"chand=%p calld=%p: destroying send_initial_metadata",
2397 void RetryFilter::CallData::FreeCachedSendMessage(
size_t idx) {
2401 "chand=%p calld=%p: destroying send_messages[%" PRIuPTR
"]",
2408 void RetryFilter::CallData::FreeCachedSendTrailingMetadata() {
2410 gpr_log(
GPR_INFO,
"chand=%p calld=%p: destroying send_trailing_metadata",
2416 void RetryFilter::CallData::FreeAllCachedSendOpData() {
2418 FreeCachedSendInitialMetadata();
2421 FreeCachedSendMessage(
i);
2424 FreeCachedSendTrailingMetadata();
2432 size_t RetryFilter::CallData::GetBatchIndex(
2444 RetryFilter::CallData::PendingBatch* RetryFilter::CallData::PendingBatchesAdd(
2446 const size_t idx = GetBatchIndex(
batch);
2449 "chand=%p calld=%p: adding pending batch at index %" PRIuPTR,
2455 pending->send_ops_cached =
false;
2477 chand_->per_rpc_retry_buffer_size_)) {
2480 "chand=%p calld=%p: exceeded retry buffer size, committing",
2488 void RetryFilter::CallData::PendingBatchClear(PendingBatch*
pending) {
2489 if (
pending->batch->send_initial_metadata) {
2492 if (
pending->batch->send_message) {
2495 if (
pending->batch->send_trailing_metadata) {
2501 void RetryFilter::CallData::MaybeClearPendingBatch(PendingBatch*
pending) {
2523 void RetryFilter::CallData::FailPendingBatchInCallCombiner(
2537 size_t num_batches = 0;
2542 "chand=%p calld=%p: failing %" PRIuPTR
" pending batches: %s",
2545 CallCombinerClosureList closures;
2549 if (
batch !=
nullptr) {
2552 FailPendingBatchInCallCombiner,
batch,
2553 grpc_schedule_on_exec_ctx);
2555 "PendingBatchesFail");
2563 template <
typename Predicate>
2564 RetryFilter::CallData::PendingBatch* RetryFilter::CallData::PendingBatchFind(
2565 const char* log_message, Predicate predicate) {
2572 "chand=%p calld=%p: %s pending batch at index %" PRIuPTR,
2573 chand_,
this, log_message,
i);
2585 void RetryFilter::CallData::RetryCommit(CallAttempt* call_attempt) {
2591 if (call_attempt !=
nullptr) {
2598 if (call_attempt->lb_call_committed()) {
2599 auto* service_config_call_data =
2600 static_cast<ClientChannelServiceConfigCallData*
>(
2602 service_config_call_data->call_dispatch_controller()->Commit();
2605 call_attempt->FreeCachedSendOpDataAfterCommit();
2609 void RetryFilter::CallData::StartRetryTimer(
2616 GPR_ASSERT(*server_pushback >= Duration::Zero());
2617 next_attempt_time =
ExecCtx::Get()->Now() + *server_pushback;
2624 "chand=%p calld=%p: retrying failed call in %" PRId64
" ms",
chand_,
2635 auto* calld =
static_cast<CallData*
>(
arg);
2641 void RetryFilter::CallData::OnRetryTimerLocked(
void*
arg,
2643 auto* calld =
static_cast<CallData*
>(
arg);
2645 calld->retry_timer_pending_ =
false;
2646 calld->CreateCallAttempt(
false);
2653 void RetryFilter::CallData::AddClosureToStartTransparentRetry(
2654 CallCombinerClosureList* closures) {
2664 void RetryFilter::CallData::StartTransparentRetry(
void*
arg,
2666 auto* calld =
static_cast<CallData*
>(
arg);
2668 calld->CreateCallAttempt(
true);
2671 "call cancelled before transparent retry");
2679 RetryFilter::CallData::StartTransportStreamOpBatch,
2682 sizeof(RetryFilter::CallData),
2684 RetryFilter::CallData::SetPollent,
2686 sizeof(RetryFilter),