retry_filter.cc
Go to the documentation of this file.
1 //
2 // Copyright 2015 gRPC authors.
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 // http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 //
16 
18 
20 
21 #include <inttypes.h>
22 #include <limits.h>
23 #include <stddef.h>
24 
25 #include <memory>
26 #include <new>
27 #include <string>
28 #include <utility>
29 
30 #include "absl/container/inlined_vector.h"
31 #include "absl/status/statusor.h"
32 #include "absl/strings/str_cat.h"
33 #include "absl/strings/string_view.h"
34 #include "absl/strings/strip.h"
35 #include "absl/types/optional.h"
36 #include "absl/utility/utility.h"
37 
39 #include <grpc/slice.h>
40 #include <grpc/status.h>
41 #include <grpc/support/atm.h>
42 #include <grpc/support/log.h>
43 
76 
77 //
78 // Retry filter
79 //
80 
81 // This filter is intended to be used in the DynamicFilter stack in the
82 // client channel, which is situated between the name resolver and the
83 // LB policy. Normally, the last filter in the DynamicFilter stack is
84 // the DynamicTerminationFilter (see client_channel.cc), which creates a
85 // LoadBalancedCall and delegates to it. However, when retries are
86 // enabled, this filter is used instead of the DynamicTerminationFilter.
87 //
88 // In order to support retries, we act as a proxy for stream op batches.
89 // When we get a batch from the surface, we add it to our list of pending
90 // batches, and we then use those batches to construct separate "child"
91 // batches to be started on an LB call. When the child batches return, we
92 // then decide which pending batches have been completed and schedule their
93 // callbacks accordingly. If a call attempt fails and we want to retry it,
94 // we create a new LB call and start again, constructing new "child" batches
95 // for the new LB call.
96 //
97 // Note that retries are committed when receiving data from the server
98 // (except for Trailers-Only responses). However, there may be many
99 // send ops started before receiving any data, so we may have already
100 // completed some number of send ops (and returned the completions up to
101 // the surface) by the time we realize that we need to retry. To deal
102 // with this, we cache data for send ops, so that we can replay them on a
103 // different LB call even after we have completed the original batches.
104 //
105 // The code is structured as follows:
106 // - In CallData (in the parent channel), we maintain a list of pending
107 // ops and cached data for send ops.
108 // - There is a CallData::CallAttempt object for each retry attempt.
109 // This object contains the LB call for that attempt and state to indicate
110 // which ops from the CallData object have already been sent down to that
111 // LB call.
112 // - There is a CallData::CallAttempt::BatchData object for each "child"
113 // batch sent on the LB call.
114 //
115 // When constructing the "child" batches, we compare the state in the
116 // CallAttempt object against the state in the CallData object to see
117 // which batches need to be sent on the LB call for a given attempt.
118 
119 // TODO(roth): In subsequent PRs:
120 // - implement hedging
121 
122 // By default, we buffer 256 KiB per RPC for retries.
123 // TODO(roth): Do we have any data to suggest a better value?
124 #define DEFAULT_PER_RPC_RETRY_BUFFER_SIZE (256 << 10)
125 
126 // This value was picked arbitrarily. It can be changed if there is
127 // any even moderately compelling reason to do so.
128 #define RETRY_BACKOFF_JITTER 0.2
129 
130 namespace grpc_core {
131 
132 namespace {
133 
134 using internal::RetryGlobalConfig;
135 using internal::RetryMethodConfig;
136 using internal::RetryServiceConfigParser;
137 using internal::ServerRetryThrottleData;
138 
139 TraceFlag grpc_retry_trace(false, "retry");
140 
141 //
142 // RetryFilter
143 //
144 
145 class RetryFilter {
146  public:
147  class CallData;
148 
151  GPR_ASSERT(args->is_last);
152  GPR_ASSERT(elem->filter == &kRetryFilterVtable);
154  new (elem->channel_data) RetryFilter(args->channel_args, &error);
155  return error;
156  }
157 
158  static void Destroy(grpc_channel_element* elem) {
159  auto* chand = static_cast<RetryFilter*>(elem->channel_data);
160  chand->~RetryFilter();
161  }
162 
163  // Will never be called.
164  static void StartTransportOp(grpc_channel_element* /*elem*/,
165  grpc_transport_op* /*op*/) {}
166  static void GetChannelInfo(grpc_channel_element* /*elem*/,
167  const grpc_channel_info* /*info*/) {}
168 
169  private:
170  static size_t GetMaxPerRpcRetryBufferSize(const grpc_channel_args* args) {
171  return static_cast<size_t>(grpc_channel_args_find_integer(
173  {DEFAULT_PER_RPC_RETRY_BUFFER_SIZE, 0, INT_MAX}));
174  }
175 
176  RetryFilter(const grpc_channel_args* args, grpc_error_handle* error)
177  : client_channel_(grpc_channel_args_find_pointer<ClientChannel>(
179  per_rpc_retry_buffer_size_(GetMaxPerRpcRetryBufferSize(args)),
182  // Get retry throttling parameters from service config.
183  auto* service_config = grpc_channel_args_find_pointer<ServiceConfig>(
185  if (service_config == nullptr) return;
186  const auto* config = static_cast<const RetryGlobalConfig*>(
187  service_config->GetGlobalParsedConfig(
188  RetryServiceConfigParser::ParserIndex()));
189  if (config == nullptr) return;
190  // Get server name from target URI.
191  const char* server_uri =
193  if (server_uri == nullptr) {
195  "server URI channel arg missing or wrong type in client channel "
196  "filter");
197  return;
198  }
200  if (!uri.ok() || uri->path().empty()) {
202  "could not extract server name from target URI");
203  return;
204  }
205  std::string server_name(absl::StripPrefix(uri->path(), "/"));
206  // Get throttling config for server_name.
209  server_name, config->max_milli_tokens(),
210  config->milli_token_ratio());
211  }
212 
213  const RetryMethodConfig* GetRetryPolicy(
215 
216  ClientChannel* client_channel_;
218  RefCountedPtr<ServerRetryThrottleData> retry_throttle_data_;
220 };
221 
222 //
223 // RetryFilter::CallData
224 //
225 
226 class RetryFilter::CallData {
227  public:
230  static void Destroy(grpc_call_element* elem,
231  const grpc_call_final_info* /*final_info*/,
232  grpc_closure* then_schedule_closure);
233  static void StartTransportStreamOpBatch(
235  static void SetPollent(grpc_call_element* elem, grpc_polling_entity* pollent);
236 
237  private:
238  class CallStackDestructionBarrier;
239 
240  // Pending batches stored in call data.
241  struct PendingBatch {
242  // The pending batch. If nullptr, this slot is empty.
244  // Indicates whether payload for send ops has been cached in CallData.
245  bool send_ops_cached = false;
246  };
247 
248  // State associated with each call attempt.
249  class CallAttempt : public RefCounted<CallAttempt> {
250  public:
251  CallAttempt(CallData* calld, bool is_transparent_retry);
252  ~CallAttempt() override;
253 
254  bool lb_call_committed() const { return lb_call_committed_; }
255 
256  // Constructs and starts whatever batches are needed on this call
257  // attempt.
258  void StartRetriableBatches();
259 
260  // Frees cached send ops that have already been completed after
261  // committing the call.
262  void FreeCachedSendOpDataAfterCommit();
263 
264  // Cancels the call attempt.
265  void CancelFromSurface(grpc_transport_stream_op_batch* cancel_batch);
266 
267  private:
268  // State used for starting a retryable batch on the call attempt's LB call.
269  // This provides its own grpc_transport_stream_op_batch and other data
270  // structures needed to populate the ops in the batch.
271  // We allocate one struct on the arena for each attempt at starting a
272  // batch on a given LB call.
273  class BatchData
274  : public RefCounted<BatchData, PolymorphicRefCount, kUnrefCallDtor> {
275  public:
276  BatchData(RefCountedPtr<CallAttempt> call_attempt, int refcount,
277  bool set_on_complete);
278  ~BatchData() override;
279 
281 
282  // Adds retriable send_initial_metadata op.
283  void AddRetriableSendInitialMetadataOp();
284  // Adds retriable send_message op.
285  void AddRetriableSendMessageOp();
286  // Adds retriable send_trailing_metadata op.
287  void AddRetriableSendTrailingMetadataOp();
288  // Adds retriable recv_initial_metadata op.
289  void AddRetriableRecvInitialMetadataOp();
290  // Adds retriable recv_message op.
291  void AddRetriableRecvMessageOp();
292  // Adds retriable recv_trailing_metadata op.
293  void AddRetriableRecvTrailingMetadataOp();
294  // Adds cancel_stream op.
295  void AddCancelStreamOp(grpc_error_handle error);
296 
297  private:
298  // Frees cached send ops that were completed by the completed batch in
299  // batch_data. Used when batches are completed after the call is
300  // committed.
301  void FreeCachedSendOpDataForCompletedBatch();
302 
303  // If there is a pending recv_initial_metadata op, adds a closure
304  // to closures for recv_initial_metadata_ready.
305  void MaybeAddClosureForRecvInitialMetadataCallback(
306  grpc_error_handle error, CallCombinerClosureList* closures);
307  // Intercepts recv_initial_metadata_ready callback for retries.
308  // Commits the call and returns the initial metadata up the stack.
309  static void RecvInitialMetadataReady(void* arg, grpc_error_handle error);
310 
311  // If there is a pending recv_message op, adds a closure to closures
312  // for recv_message_ready.
313  void MaybeAddClosureForRecvMessageCallback(
314  grpc_error_handle error, CallCombinerClosureList* closures);
315  // Intercepts recv_message_ready callback for retries.
316  // Commits the call and returns the message up the stack.
317  static void RecvMessageReady(void* arg, grpc_error_handle error);
318 
319  // If there is a pending recv_trailing_metadata op, adds a closure to
320  // closures for recv_trailing_metadata_ready.
321  void MaybeAddClosureForRecvTrailingMetadataReady(
322  grpc_error_handle error, CallCombinerClosureList* closures);
323  // Adds any necessary closures for deferred batch completion
324  // callbacks to closures.
325  void AddClosuresForDeferredCompletionCallbacks(
326  CallCombinerClosureList* closures);
327  // For any pending batch containing an op that has not yet been started,
328  // adds the pending batch's completion closures to closures.
329  void AddClosuresToFailUnstartedPendingBatches(
330  grpc_error_handle error, CallCombinerClosureList* closures);
331  // Runs necessary closures upon completion of a call attempt.
332  void RunClosuresForCompletedCall(grpc_error_handle error);
333  // Intercepts recv_trailing_metadata_ready callback for retries.
334  // Commits the call and returns the trailing metadata up the stack.
335  static void RecvTrailingMetadataReady(void* arg, grpc_error_handle error);
336 
337  // Adds the on_complete closure for the pending batch completed in
338  // batch_data to closures.
339  void AddClosuresForCompletedPendingBatch(
340  grpc_error_handle error, CallCombinerClosureList* closures);
341 
342  // If there are any cached ops to replay or pending ops to start on the
343  // LB call, adds them to closures.
344  void AddClosuresForReplayOrPendingSendOps(
345  CallCombinerClosureList* closures);
346 
347  // Callback used to intercept on_complete from LB calls.
348  static void OnComplete(void* arg, grpc_error_handle error);
349 
350  // Callback used to handle on_complete for internally generated
351  // cancel_stream op.
352  static void OnCompleteForCancelOp(void* arg, grpc_error_handle error);
353 
354  RefCountedPtr<CallAttempt> call_attempt_;
355  // The batch to use in the LB call.
356  // Its payload field points to CallAttempt::batch_payload_.
358  // For intercepting on_complete.
360  };
361 
362  class AttemptDispatchController
363  : public ConfigSelector::CallDispatchController {
364  public:
365  explicit AttemptDispatchController(CallAttempt* call_attempt)
366  : call_attempt_(call_attempt) {}
367 
368  // Will never be called.
369  bool ShouldRetry() override { return false; }
370 
371  void Commit() override {
372  call_attempt_->lb_call_committed_ = true;
373  auto* calld = call_attempt_->calld_;
374  if (calld->retry_committed_) {
375  auto* service_config_call_data =
376  static_cast<ClientChannelServiceConfigCallData*>(
377  calld->call_context_[GRPC_CONTEXT_SERVICE_CONFIG_CALL_DATA]
378  .value);
379  service_config_call_data->call_dispatch_controller()->Commit();
380  }
381  }
382 
383  private:
384  CallAttempt* call_attempt_;
385  };
386 
387  // Creates a BatchData object on the call's arena with the
388  // specified refcount. If set_on_complete is true, the batch's
389  // on_complete callback will be set to point to on_complete();
390  // otherwise, the batch's on_complete callback will be null.
391  BatchData* CreateBatch(int refcount, bool set_on_complete) {
392  return calld_->arena_->New<BatchData>(Ref(DEBUG_LOCATION, "CreateBatch"),
393  refcount, set_on_complete);
394  }
395 
396  // If there are any cached send ops that need to be replayed on this
397  // call attempt, creates and returns a new batch to replay those ops.
398  // Otherwise, returns nullptr.
399  BatchData* MaybeCreateBatchForReplay();
400 
401  // Adds a closure to closures that will execute batch in the call combiner.
402  void AddClosureForBatch(grpc_transport_stream_op_batch* batch,
403  const char* reason,
404  CallCombinerClosureList* closures);
405 
406  // Helper function used to start a recv_trailing_metadata batch. This
407  // is used in the case where a recv_initial_metadata or recv_message
408  // op fails in a way that we know the call is over but when the application
409  // has not yet started its own recv_trailing_metadata op.
410  void AddBatchForInternalRecvTrailingMetadata(
411  CallCombinerClosureList* closures);
412 
413  // Adds a batch to closures to cancel this call attempt, if
414  // cancellation has not already been sent on the LB call.
415  void MaybeAddBatchForCancelOp(grpc_error_handle error,
416  CallCombinerClosureList* closures);
417 
418  // Adds batches for pending batches to closures.
419  void AddBatchesForPendingBatches(CallCombinerClosureList* closures);
420 
421  // Adds whatever batches are needed on this attempt to closures.
422  void AddRetriableBatches(CallCombinerClosureList* closures);
423 
424  // Returns true if any send op in the batch was not yet started on this
425  // attempt.
426  bool PendingBatchContainsUnstartedSendOps(PendingBatch* pending);
427 
428  // Returns true if there are cached send ops to replay.
429  bool HaveSendOpsToReplay();
430 
431  // If our retry state is no longer needed, switch to fast path by moving
432  // our LB call into calld_->committed_call_ and having calld_ drop
433  // its ref to us.
434  void MaybeSwitchToFastPath();
435 
436  // Returns true if the call should be retried.
437  bool ShouldRetry(absl::optional<grpc_status_code> status,
438  absl::optional<Duration> server_pushback_ms);
439 
440  // Abandons the call attempt. Unrefs any deferred batches.
441  void Abandon();
442 
443  static void OnPerAttemptRecvTimer(void* arg, grpc_error_handle error);
444  static void OnPerAttemptRecvTimerLocked(void* arg, grpc_error_handle error);
445  void MaybeCancelPerAttemptRecvTimer();
446 
447  CallData* calld_;
448  AttemptDispatchController attempt_dispatch_controller_;
449  OrphanablePtr<ClientChannel::LoadBalancedCall> lb_call_;
450  bool lb_call_committed_ = false;
451 
455 
456  // BatchData.batch.payload points to this.
458  // For send_initial_metadata.
460  // For send_trailing_metadata.
462  // For intercepting recv_initial_metadata.
466  // For intercepting recv_message.
470  // For intercepting recv_trailing_metadata.
474  // These fields indicate which ops have been started and completed on
475  // this call attempt.
489  // State for callback processing.
492  RefCountedPtr<BatchData> recv_message_ready_deferred_batch_;
494  struct OnCompleteDeferredBatch {
495  OnCompleteDeferredBatch(RefCountedPtr<BatchData> batch,
497  : batch(std::move(batch)), error(error) {}
498  RefCountedPtr<BatchData> batch;
500  };
501  // There cannot be more than 3 pending send op batches at a time.
504  RefCountedPtr<BatchData> recv_trailing_metadata_internal_batch_;
507  // NOTE: Do not move this next to the metadata bitfields above. That would
508  // save space but will also result in a data race because compiler
509  // will generate a 2 byte store which overwrites the meta-data
510  // fields upon setting this field.
511  bool abandoned_ : 1;
512  };
513 
514  CallData(RetryFilter* chand, const grpc_call_element_args& args);
515  ~CallData();
516 
517  void StartTransportStreamOpBatch(grpc_transport_stream_op_batch* batch);
518 
519  // Returns the index into pending_batches_ to be used for batch.
520  static size_t GetBatchIndex(grpc_transport_stream_op_batch* batch);
521  PendingBatch* PendingBatchesAdd(grpc_transport_stream_op_batch* batch);
522  void PendingBatchClear(PendingBatch* pending);
523  void MaybeClearPendingBatch(PendingBatch* pending);
524  static void FailPendingBatchInCallCombiner(void* arg,
526  // Fails all pending batches. Does NOT yield call combiner.
527  void PendingBatchesFail(grpc_error_handle error);
528  // Returns a pointer to the first pending batch for which predicate(batch)
529  // returns true, or null if not found.
530  template <typename Predicate>
531  PendingBatch* PendingBatchFind(const char* log_message, Predicate predicate);
532 
533  // Caches data for send ops so that it can be retried later, if not
534  // already cached.
535  void MaybeCacheSendOpsForBatch(PendingBatch* pending);
536  void FreeCachedSendInitialMetadata();
537  // Frees cached send_message at index idx.
538  void FreeCachedSendMessage(size_t idx);
539  void FreeCachedSendTrailingMetadata();
540  void FreeAllCachedSendOpData();
541 
542  // Commits the call so that no further retry attempts will be performed.
543  void RetryCommit(CallAttempt* call_attempt);
544 
545  // Starts a timer to retry after appropriate back-off.
546  // If server_pushback is nullopt, retry_backoff_ is used.
547  void StartRetryTimer(absl::optional<Duration> server_pushback);
548 
549  static void OnRetryTimer(void* arg, grpc_error_handle error);
550  static void OnRetryTimerLocked(void* arg, grpc_error_handle error);
551 
552  // Adds a closure to closures to start a transparent retry.
553  void AddClosureToStartTransparentRetry(CallCombinerClosureList* closures);
554  static void StartTransparentRetry(void* arg, grpc_error_handle error);
555 
556  OrphanablePtr<ClientChannel::LoadBalancedCall> CreateLoadBalancedCall(
557  ConfigSelector::CallDispatchController* call_dispatch_controller,
558  bool is_transparent_retry);
559 
560  void CreateCallAttempt(bool is_transparent_retry);
561 
562  RetryFilter* chand_;
564  RefCountedPtr<ServerRetryThrottleData> retry_throttle_data_;
565  const RetryMethodConfig* retry_policy_ = nullptr;
566  BackOff retry_backoff_;
567 
568  grpc_slice path_; // Request path.
572  CallCombiner* call_combiner_;
574 
576 
577  RefCountedPtr<CallStackDestructionBarrier> call_stack_destruction_barrier_;
578 
579  // TODO(roth): As part of implementing hedging, we will need to maintain a
580  // list of all pending attempts, so that we can cancel them all if the call
581  // gets cancelled.
582  RefCountedPtr<CallAttempt> call_attempt_;
583 
584  // LB call used when we've committed to a call attempt and the retry
585  // state for that attempt is no longer needed. This provides a fast
586  // path for long-running streaming calls that minimizes overhead.
587  OrphanablePtr<ClientChannel::LoadBalancedCall> committed_call_;
588 
589  // When are are not yet fully committed to a particular call (i.e.,
590  // either we might still retry or we have committed to the call but
591  // there are still some cached ops to be replayed on the call),
592  // batches received from above will be added to this list, and they
593  // will not be removed until we have invoked their completion callbacks.
599 
600  // Retry state.
608 
609  // Cached data for retrying send ops.
610  // send_initial_metadata
614  // TODO(roth): As part of implementing hedging, we'll probably need to
615  // have the LB call set a value in CallAttempt and then propagate it
616  // from CallAttempt to the parent call when we commit. Otherwise, we
617  // may leave this with a value for a peer other than the one we
618  // actually commit to. Alternatively, maybe see if there's a way to
619  // change the surface API such that the peer isn't available until
620  // after initial metadata is received? (Could even change the
621  // transport API to return this with the recv_initial_metadata op.)
623  // send_message
624  // When we get a send_message op, we replace the original byte stream
625  // with a CachingByteStream that caches the slices to a local buffer for
626  // use in retries.
627  // Note: We inline the cache for the first 3 send_message ops and use
628  // dynamic allocation after that. This number was essentially picked
629  // at random; it could be changed in the future to tune performance.
630  struct CachedSendMessage {
631  SliceBuffer* slices;
633  };
635  // send_trailing_metadata
638 };
639 
640 //
641 // RetryFilter::CallData::CallStackDestructionBarrier
642 //
643 
644 // A class to track the existence of LoadBalancedCall call stacks that
645 // we've created. We wait until all such call stacks have been
646 // destroyed before we return the on_call_stack_destruction closure up
647 // to the surface.
648 //
649 // The parent RetryFilter::CallData object holds a ref to this object.
650 // When it is destroyed, it will store the on_call_stack_destruction
651 // closure from the surface in this object and then release its ref.
652 // We also take a ref to this object for each LB call we create, and
653 // those refs are not released until the LB call stack is destroyed.
654 // When this object is destroyed, it will invoke the
655 // on_call_stack_destruction closure from the surface.
656 class RetryFilter::CallData::CallStackDestructionBarrier
657  : public RefCounted<CallStackDestructionBarrier, PolymorphicRefCount,
658  kUnrefCallDtor> {
659  public:
660  CallStackDestructionBarrier() {}
661 
662  ~CallStackDestructionBarrier() override {
663  // TODO(yashkt) : This can potentially be a Closure::Run
665  }
666 
667  // Set the closure from the surface. This closure will be invoked
668  // when this object is destroyed.
669  void set_on_call_stack_destruction(grpc_closure* on_call_stack_destruction) {
670  on_call_stack_destruction_ = on_call_stack_destruction;
671  }
672 
673  // Invoked to get an on_call_stack_destruction closure for a new LB call.
674  grpc_closure* MakeLbCallDestructionClosure(CallData* calld) {
675  Ref().release(); // Ref held by callback.
676  grpc_closure* on_lb_call_destruction_complete =
677  calld->arena_->New<grpc_closure>();
678  GRPC_CLOSURE_INIT(on_lb_call_destruction_complete,
679  OnLbCallDestructionComplete, this, nullptr);
680  return on_lb_call_destruction_complete;
681  }
682 
683  private:
684  static void OnLbCallDestructionComplete(void* arg,
685  grpc_error_handle /*error*/) {
686  auto* self = static_cast<CallStackDestructionBarrier*>(arg);
687  self->Unref();
688  }
689 
691 };
692 
693 //
694 // RetryFilter::CallData::CallAttempt
695 //
696 
697 RetryFilter::CallData::CallAttempt::CallAttempt(CallData* calld,
698  bool is_transparent_retry)
699  : RefCounted(GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace) ? "CallAttempt"
700  : nullptr),
701  calld_(calld),
714  abandoned_(false) {
715  lb_call_ = calld->CreateLoadBalancedCall(&attempt_dispatch_controller_,
716  is_transparent_retry);
717  if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) {
719  "chand=%p calld=%p attempt=%p: created attempt, lb_call=%p",
720  calld->chand_, calld, this, lb_call_.get());
721  }
722  // If per_attempt_recv_timeout is set, start a timer.
723  if (calld->retry_policy_ != nullptr &&
724  calld->retry_policy_->per_attempt_recv_timeout().has_value()) {
725  Timestamp per_attempt_recv_deadline =
726  ExecCtx::Get()->Now() +
727  *calld->retry_policy_->per_attempt_recv_timeout();
728  if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) {
730  "chand=%p calld=%p attempt=%p: per-attempt timeout in %" PRId64
731  " ms",
732  calld->chand_, calld, this,
733  calld->retry_policy_->per_attempt_recv_timeout()->millis());
734  }
735  // Schedule retry after computed delay.
736  GRPC_CLOSURE_INIT(&on_per_attempt_recv_timer_, OnPerAttemptRecvTimer, this,
737  nullptr);
738  GRPC_CALL_STACK_REF(calld->owning_call_, "OnPerAttemptRecvTimer");
739  Ref(DEBUG_LOCATION, "OnPerAttemptRecvTimer").release();
741  grpc_timer_init(&per_attempt_recv_timer_, per_attempt_recv_deadline,
743  }
744 }
745 
746 RetryFilter::CallData::CallAttempt::~CallAttempt() {
747  if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) {
748  gpr_log(GPR_INFO, "chand=%p calld=%p attempt=%p: destroying call attempt",
749  calld_->chand_, calld_, this);
750  }
751 }
752 
753 void RetryFilter::CallData::CallAttempt::FreeCachedSendOpDataAfterCommit() {
754  // TODO(roth): When we implement hedging, this logic will need to get
755  // a bit more complex, because there may be other (now abandoned) call
756  // attempts still using this data. We may need to do some sort of
757  // ref-counting instead.
759  calld_->FreeCachedSendInitialMetadata();
760  }
761  for (size_t i = 0; i < completed_send_message_count_; ++i) {
762  calld_->FreeCachedSendMessage(i);
763  }
765  calld_->FreeCachedSendTrailingMetadata();
766  }
767 }
768 
769 bool RetryFilter::CallData::CallAttempt::PendingBatchContainsUnstartedSendOps(
770  PendingBatch* pending) {
771  if (pending->batch->on_complete == nullptr) return false;
772  if (pending->batch->send_initial_metadata &&
774  return true;
775  }
776  if (pending->batch->send_message &&
777  started_send_message_count_ < calld_->send_messages_.size()) {
778  return true;
779  }
780  if (pending->batch->send_trailing_metadata &&
782  return true;
783  }
784  return false;
785 }
786 
787 bool RetryFilter::CallData::CallAttempt::HaveSendOpsToReplay() {
788  // We don't check send_initial_metadata here, because that op will always
789  // be started as soon as it is received from the surface, so it will
790  // never need to be started at this point.
791  return started_send_message_count_ < calld_->send_messages_.size() ||
792  (calld_->seen_send_trailing_metadata_ &&
794 }
795 
796 void RetryFilter::CallData::CallAttempt::MaybeSwitchToFastPath() {
797  // If we're not yet committed, we can't switch yet.
798  // TODO(roth): As part of implementing hedging, this logic needs to
799  // check that *this* call attempt is the one that we've committed to.
800  // Might need to replace abandoned_ with an enum indicating whether we're
801  // in flight, abandoned, or the winning call attempt.
802  if (!calld_->retry_committed_) return;
803  // If we've already switched to fast path, there's nothing to do here.
804  if (calld_->committed_call_ != nullptr) return;
805  // If the perAttemptRecvTimeout timer is pending, we can't switch yet.
807  // If there are still send ops to replay, we can't switch yet.
808  if (HaveSendOpsToReplay()) return;
809  // If we started an internal batch for recv_trailing_metadata but have not
810  // yet seen that op from the surface, we can't switch yet.
811  if (recv_trailing_metadata_internal_batch_ != nullptr) return;
812  // Switch to fast path.
813  if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) {
815  "chand=%p calld=%p attempt=%p: retry state no longer needed; "
816  "moving LB call to parent and unreffing the call attempt",
817  calld_->chand_, calld_, this);
818  }
819  calld_->committed_call_ = std::move(lb_call_);
820  calld_->call_attempt_.reset(DEBUG_LOCATION, "MaybeSwitchToFastPath");
821 }
822 
823 // If there are any cached send ops that need to be replayed on the
824 // current call attempt, creates and returns a new batch to replay those ops.
825 // Otherwise, returns nullptr.
826 RetryFilter::CallData::CallAttempt::BatchData*
827 RetryFilter::CallData::CallAttempt::MaybeCreateBatchForReplay() {
828  BatchData* replay_batch_data = nullptr;
829  // send_initial_metadata.
830  if (calld_->seen_send_initial_metadata_ && !started_send_initial_metadata_ &&
831  !calld_->pending_send_initial_metadata_) {
832  if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) {
834  "chand=%p calld=%p attempt=%p: replaying previously completed "
835  "send_initial_metadata op",
836  calld_->chand_, calld_, this);
837  }
838  replay_batch_data = CreateBatch(1, true /* set_on_complete */);
839  replay_batch_data->AddRetriableSendInitialMetadataOp();
840  }
841  // send_message.
842  // Note that we can only have one send_message op in flight at a time.
843  if (started_send_message_count_ < calld_->send_messages_.size() &&
845  !calld_->pending_send_message_) {
846  if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) {
848  "chand=%p calld=%p attempt=%p: replaying previously completed "
849  "send_message op",
850  calld_->chand_, calld_, this);
851  }
852  if (replay_batch_data == nullptr) {
853  replay_batch_data = CreateBatch(1, true /* set_on_complete */);
854  }
855  replay_batch_data->AddRetriableSendMessageOp();
856  }
857  // send_trailing_metadata.
858  // Note that we only add this op if we have no more send_message ops
859  // to start, since we can't send down any more send_message ops after
860  // send_trailing_metadata.
861  if (calld_->seen_send_trailing_metadata_ &&
862  started_send_message_count_ == calld_->send_messages_.size() &&
864  !calld_->pending_send_trailing_metadata_) {
865  if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) {
867  "chand=%p calld=%p attempt=%p: replaying previously completed "
868  "send_trailing_metadata op",
869  calld_->chand_, calld_, this);
870  }
871  if (replay_batch_data == nullptr) {
872  replay_batch_data = CreateBatch(1, true /* set_on_complete */);
873  }
874  replay_batch_data->AddRetriableSendTrailingMetadataOp();
875  }
876  return replay_batch_data;
877 }
878 
879 namespace {
880 
881 void StartBatchInCallCombiner(void* arg, grpc_error_handle /*ignored*/) {
883  static_cast<grpc_transport_stream_op_batch*>(arg);
884  auto* lb_call = static_cast<ClientChannel::LoadBalancedCall*>(
886  // Note: This will release the call combiner.
887  lb_call->StartTransportStreamOpBatch(batch);
888 }
889 
890 } // namespace
891 
892 void RetryFilter::CallData::CallAttempt::AddClosureForBatch(
893  grpc_transport_stream_op_batch* batch, const char* reason,
894  CallCombinerClosureList* closures) {
895  if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) {
896  gpr_log(GPR_INFO, "chand=%p calld=%p attempt=%p: adding batch (%s): %s",
897  calld_->chand_, calld_, this, reason,
899  }
901  GRPC_CLOSURE_INIT(&batch->handler_private.closure, StartBatchInCallCombiner,
902  batch, grpc_schedule_on_exec_ctx);
903  closures->Add(&batch->handler_private.closure, GRPC_ERROR_NONE, reason);
904 }
905 
906 void RetryFilter::CallData::CallAttempt::
907  AddBatchForInternalRecvTrailingMetadata(CallCombinerClosureList* closures) {
908  if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) {
910  "chand=%p calld=%p attempt=%p: call failed but "
911  "recv_trailing_metadata not started; starting it internally",
912  calld_->chand_, calld_, this);
913  }
914  // Create batch_data with 2 refs, since this batch will be unreffed twice:
915  // once for the recv_trailing_metadata_ready callback when the batch
916  // completes, and again when we actually get a recv_trailing_metadata
917  // op from the surface.
918  BatchData* batch_data = CreateBatch(2, false /* set_on_complete */);
919  batch_data->AddRetriableRecvTrailingMetadataOp();
920  recv_trailing_metadata_internal_batch_.reset(batch_data);
921  AddClosureForBatch(batch_data->batch(),
922  "starting internal recv_trailing_metadata", closures);
923 }
924 
925 void RetryFilter::CallData::CallAttempt::MaybeAddBatchForCancelOp(
926  grpc_error_handle error, CallCombinerClosureList* closures) {
927  if (sent_cancel_stream_) {
929  return;
930  }
931  sent_cancel_stream_ = true;
932  BatchData* cancel_batch_data = CreateBatch(1, /*set_on_complete=*/true);
933  cancel_batch_data->AddCancelStreamOp(error);
934  AddClosureForBatch(cancel_batch_data->batch(),
935  "start cancellation batch on call attempt", closures);
936 }
937 
938 void RetryFilter::CallData::CallAttempt::AddBatchesForPendingBatches(
939  CallCombinerClosureList* closures) {
940  for (size_t i = 0; i < GPR_ARRAY_SIZE(calld_->pending_batches_); ++i) {
941  PendingBatch* pending = &calld_->pending_batches_[i];
943  if (batch == nullptr) continue;
944  bool has_send_ops = false;
945  // Skip any batch that either (a) has already been started on this
946  // call attempt or (b) we can't start yet because we're still
947  // replaying send ops that need to be completed first.
948  // TODO(roth): Note that if any one op in the batch can't be sent
949  // yet due to ops that we're replaying, we don't start any of the ops
950  // in the batch. This is probably okay, but it could conceivably
951  // lead to increased latency in some cases -- e.g., we could delay
952  // starting a recv op due to it being in the same batch with a send
953  // op. If/when we revamp the callback protocol in
954  // transport_stream_op_batch, we may be able to fix this.
956  if (started_send_initial_metadata_) continue;
957  has_send_ops = true;
958  }
959  if (batch->send_message) {
960  // Cases where we can't start this send_message op:
961  // - We are currently replaying a previous cached send_message op.
962  // - We have already replayed all send_message ops, including this
963  // one. (This can happen if a send_message op is in the same
964  // batch as a recv op, the send_message op has already completed
965  // but the recv op hasn't, and then a subsequent batch with another
966  // recv op is started from the surface.)
969  (calld_->send_messages_.size() + !pending->send_ops_cached)) {
970  continue;
971  }
972  has_send_ops = true;
973  }
974  // Note that we only start send_trailing_metadata if we have no more
975  // send_message ops to start, since we can't send down any more
976  // send_message ops after send_trailing_metadata.
979  calld_->send_messages_.size() ||
981  continue;
982  }
983  has_send_ops = true;
984  }
985  int num_callbacks = has_send_ops; // All send ops share one callback.
987  if (started_recv_initial_metadata_) continue;
988  ++num_callbacks;
989  }
990  if (batch->recv_message) {
991  // Skip if the op is already in flight, or if it has already completed
992  // but the completion has not yet been sent to the surface.
995  continue;
996  }
997  ++num_callbacks;
998  }
1002  // If we previously completed a recv_trailing_metadata op
1003  // initiated by AddBatchForInternalRecvTrailingMetadata(), use the
1004  // result of that instead of trying to re-start this op.
1006  // If the batch completed, then trigger the completion callback
1007  // directly, so that we return the previously returned results to
1008  // the application. Otherwise, just unref the internally started
1009  // batch, since we'll propagate the completion when it completes.
1011  closures->Add(
1013  "re-executing recv_trailing_metadata_ready to propagate "
1014  "internally triggered result");
1015  // Ref will be released by callback.
1017  } else {
1020  "internally started recv_trailing_metadata batch pending and "
1021  "recv_trailing_metadata started from surface");
1023  }
1025  }
1026  // We don't want the fact that we've already started this op internally
1027  // to prevent us from adding a batch that may contain other ops.
1028  // Instead, we'll just skip adding this op below.
1029  if (num_callbacks == 0) continue;
1030  } else {
1031  ++num_callbacks;
1032  }
1033  }
1034  // If we're already committed and the following conditions are met,
1035  // just send the batch down as-is:
1036  // - The batch contains no cached send ops. (If it does, we need
1037  // the logic below to use the cached payloads.)
1038  // - The batch does not contain recv_trailing_metadata when we have
1039  // already started an internal recv_trailing_metadata batch. (If
1040  // we've already started an internal recv_trailing_metadata batch,
1041  // then we need the logic below to send all ops in the batch
1042  // *except* the recv_trailing_metadata op.)
1043  if (calld_->retry_committed_ && !pending->send_ops_cached &&
1045  AddClosureForBatch(
1046  batch,
1047  "start non-replayable pending batch on call attempt after commit",
1048  closures);
1049  calld_->PendingBatchClear(pending);
1050  continue;
1051  }
1052  // Create batch with the right number of callbacks.
1053  BatchData* batch_data =
1054  CreateBatch(num_callbacks, has_send_ops /* set_on_complete */);
1055  // Cache send ops if needed.
1056  calld_->MaybeCacheSendOpsForBatch(pending);
1057  // send_initial_metadata.
1059  batch_data->AddRetriableSendInitialMetadataOp();
1060  }
1061  // send_message.
1062  if (batch->send_message) {
1063  batch_data->AddRetriableSendMessageOp();
1064  }
1065  // send_trailing_metadata.
1067  batch_data->AddRetriableSendTrailingMetadataOp();
1068  }
1069  // recv_initial_metadata.
1071  // recv_flags is only used on the server side.
1072  GPR_ASSERT(batch->payload->recv_initial_metadata.recv_flags == nullptr);
1073  batch_data->AddRetriableRecvInitialMetadataOp();
1074  }
1075  // recv_message.
1076  if (batch->recv_message) {
1077  batch_data->AddRetriableRecvMessageOp();
1078  }
1079  // recv_trailing_metadata.
1081  batch_data->AddRetriableRecvTrailingMetadataOp();
1082  }
1083  AddClosureForBatch(batch_data->batch(),
1084  "start replayable pending batch on call attempt",
1085  closures);
1086  }
1087 }
1088 
1089 void RetryFilter::CallData::CallAttempt::AddRetriableBatches(
1090  CallCombinerClosureList* closures) {
1091  // Replay previously-returned send_* ops if needed.
1092  BatchData* replay_batch_data = MaybeCreateBatchForReplay();
1093  if (replay_batch_data != nullptr) {
1094  AddClosureForBatch(replay_batch_data->batch(),
1095  "start replay batch on call attempt", closures);
1096  }
1097  // Now add pending batches.
1098  AddBatchesForPendingBatches(closures);
1099 }
1100 
1101 void RetryFilter::CallData::CallAttempt::StartRetriableBatches() {
1102  if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) {
1103  gpr_log(GPR_INFO,
1104  "chand=%p calld=%p attempt=%p: constructing retriable batches",
1105  calld_->chand_, calld_, this);
1106  }
1107  // Construct list of closures to execute, one for each pending batch.
1108  CallCombinerClosureList closures;
1109  AddRetriableBatches(&closures);
1110  // Note: This will yield the call combiner.
1111  // Start batches on LB call.
1112  if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) {
1113  gpr_log(GPR_INFO,
1114  "chand=%p calld=%p attempt=%p: starting %" PRIuPTR
1115  " retriable batches on lb_call=%p",
1116  calld_->chand_, calld_, this, closures.size(), lb_call_.get());
1117  }
1118  closures.RunClosures(calld_->call_combiner_);
1119 }
1120 
1121 void RetryFilter::CallData::CallAttempt::CancelFromSurface(
1122  grpc_transport_stream_op_batch* cancel_batch) {
1123  MaybeCancelPerAttemptRecvTimer();
1124  Abandon();
1125  // Propagate cancellation to LB call.
1126  lb_call_->StartTransportStreamOpBatch(cancel_batch);
1127 }
1128 
1129 bool RetryFilter::CallData::CallAttempt::ShouldRetry(
1131  absl::optional<Duration> server_pushback) {
1132  // If no retry policy, don't retry.
1133  if (calld_->retry_policy_ == nullptr) return false;
1134  // Check status.
1135  if (status.has_value()) {
1136  if (GPR_LIKELY(*status == GRPC_STATUS_OK)) {
1137  if (calld_->retry_throttle_data_ != nullptr) {
1138  calld_->retry_throttle_data_->RecordSuccess();
1139  }
1140  if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) {
1141  gpr_log(GPR_INFO, "chand=%p calld=%p attempt=%p: call succeeded",
1142  calld_->chand_, calld_, this);
1143  }
1144  return false;
1145  }
1146  // Status is not OK. Check whether the status is retryable.
1147  if (!calld_->retry_policy_->retryable_status_codes().Contains(*status)) {
1148  if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) {
1149  gpr_log(GPR_INFO,
1150  "chand=%p calld=%p attempt=%p: status %s not configured as "
1151  "retryable",
1152  calld_->chand_, calld_, this,
1154  }
1155  return false;
1156  }
1157  }
1158  // Record the failure and check whether retries are throttled.
1159  // Note that it's important for this check to come after the status
1160  // code check above, since we should only record failures whose statuses
1161  // match the configured retryable status codes, so that we don't count
1162  // things like failures due to malformed requests (INVALID_ARGUMENT).
1163  // Conversely, it's important for this to come before the remaining
1164  // checks, so that we don't fail to record failures due to other factors.
1165  if (calld_->retry_throttle_data_ != nullptr &&
1166  !calld_->retry_throttle_data_->RecordFailure()) {
1167  if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) {
1168  gpr_log(GPR_INFO, "chand=%p calld=%p attempt=%p: retries throttled",
1169  calld_->chand_, calld_, this);
1170  }
1171  return false;
1172  }
1173  // Check whether the call is committed.
1174  if (calld_->retry_committed_) {
1175  if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) {
1176  gpr_log(GPR_INFO,
1177  "chand=%p calld=%p attempt=%p: retries already committed",
1178  calld_->chand_, calld_, this);
1179  }
1180  return false;
1181  }
1182  // Check whether we have retries remaining.
1183  ++calld_->num_attempts_completed_;
1184  if (calld_->num_attempts_completed_ >=
1185  calld_->retry_policy_->max_attempts()) {
1186  if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) {
1187  gpr_log(
1188  GPR_INFO, "chand=%p calld=%p attempt=%p: exceeded %d retry attempts",
1189  calld_->chand_, calld_, this, calld_->retry_policy_->max_attempts());
1190  }
1191  return false;
1192  }
1193  // Check server push-back.
1194  if (server_pushback.has_value()) {
1195  if (*server_pushback < Duration::Zero()) {
1196  if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) {
1197  gpr_log(GPR_INFO,
1198  "chand=%p calld=%p attempt=%p: not retrying due to server "
1199  "push-back",
1200  calld_->chand_, calld_, this);
1201  }
1202  return false;
1203  } else {
1204  if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) {
1205  gpr_log(
1206  GPR_INFO,
1207  "chand=%p calld=%p attempt=%p: server push-back: retry in %" PRIu64
1208  " ms",
1209  calld_->chand_, calld_, this, server_pushback->millis());
1210  }
1211  }
1212  }
1213  // Check with call dispatch controller.
1214  auto* service_config_call_data =
1215  static_cast<ClientChannelServiceConfigCallData*>(
1216  calld_->call_context_[GRPC_CONTEXT_SERVICE_CONFIG_CALL_DATA].value);
1217  if (!service_config_call_data->call_dispatch_controller()->ShouldRetry()) {
1218  if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) {
1219  gpr_log(
1220  GPR_INFO,
1221  "chand=%p calld=%p attempt=%p: call dispatch controller denied retry",
1222  calld_->chand_, calld_, this);
1223  }
1224  return false;
1225  }
1226  // We should retry.
1227  return true;
1228 }
1229 
1230 void RetryFilter::CallData::CallAttempt::Abandon() {
1231  abandoned_ = true;
1232  // Unref batches for deferred completion callbacks that will now never
1233  // be invoked.
1238  "unref internal recv_trailing_metadata_ready batch; attempt abandoned");
1239  }
1244  "unref deferred recv_initial_metadata_ready batch; attempt abandoned");
1249  "unref deferred recv_message_ready batch; attempt abandoned");
1252  for (auto& on_complete_deferred_batch : on_complete_deferred_batches_) {
1253  on_complete_deferred_batch.batch.reset(
1254  DEBUG_LOCATION, "unref deferred on_complete batch; attempt abandoned");
1255  GRPC_ERROR_UNREF(on_complete_deferred_batch.error);
1256  }
1258 }
1259 
1260 void RetryFilter::CallData::CallAttempt::OnPerAttemptRecvTimer(
1261  void* arg, grpc_error_handle error) {
1262  auto* call_attempt = static_cast<CallAttempt*>(arg);
1263  GRPC_CLOSURE_INIT(&call_attempt->on_per_attempt_recv_timer_,
1264  OnPerAttemptRecvTimerLocked, call_attempt, nullptr);
1265  GRPC_CALL_COMBINER_START(call_attempt->calld_->call_combiner_,
1266  &call_attempt->on_per_attempt_recv_timer_,
1267  GRPC_ERROR_REF(error), "per-attempt timer fired");
1268 }
1269 
1270 void RetryFilter::CallData::CallAttempt::OnPerAttemptRecvTimerLocked(
1271  void* arg, grpc_error_handle error) {
1272  auto* call_attempt = static_cast<CallAttempt*>(arg);
1273  auto* calld = call_attempt->calld_;
1274  if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) {
1275  gpr_log(GPR_INFO,
1276  "chand=%p calld=%p attempt=%p: perAttemptRecvTimeout timer fired: "
1277  "error=%s, per_attempt_recv_timer_pending_=%d",
1278  calld->chand_, calld, call_attempt,
1280  call_attempt->per_attempt_recv_timer_pending_);
1281  }
1282  CallCombinerClosureList closures;
1283  if (GRPC_ERROR_IS_NONE(error) &&
1284  call_attempt->per_attempt_recv_timer_pending_) {
1285  call_attempt->per_attempt_recv_timer_pending_ = false;
1286  // Cancel this attempt.
1287  // TODO(roth): When implementing hedging, we should not cancel the
1288  // current attempt.
1289  call_attempt->MaybeAddBatchForCancelOp(
1291  "retry perAttemptRecvTimeout exceeded"),
1293  &closures);
1294  // Check whether we should retry.
1295  if (call_attempt->ShouldRetry(/*status=*/absl::nullopt,
1296  /*server_pushback_ms=*/absl::nullopt)) {
1297  // Mark current attempt as abandoned.
1298  call_attempt->Abandon();
1299  // We are retrying. Start backoff timer.
1300  calld->StartRetryTimer(/*server_pushback=*/absl::nullopt);
1301  } else {
1302  // Not retrying, so commit the call.
1303  calld->RetryCommit(call_attempt);
1304  // If retry state is no longer needed, switch to fast path for
1305  // subsequent batches.
1306  call_attempt->MaybeSwitchToFastPath();
1307  }
1308  }
1309  closures.RunClosures(calld->call_combiner_);
1310  call_attempt->Unref(DEBUG_LOCATION, "OnPerAttemptRecvTimer");
1311  GRPC_CALL_STACK_UNREF(calld->owning_call_, "OnPerAttemptRecvTimer");
1312 }
1313 
1314 void RetryFilter::CallData::CallAttempt::MaybeCancelPerAttemptRecvTimer() {
1316  if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) {
1317  gpr_log(GPR_INFO,
1318  "chand=%p calld=%p attempt=%p: cancelling "
1319  "perAttemptRecvTimeout timer",
1320  calld_->chand_, calld_, this);
1321  }
1324  }
1325 }
1326 
1327 //
1328 // RetryFilter::CallData::CallAttempt::BatchData
1329 //
1330 
1331 RetryFilter::CallData::CallAttempt::BatchData::BatchData(
1332  RefCountedPtr<CallAttempt> attempt, int refcount, bool set_on_complete)
1333  : RefCounted(
1334  GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace) ? "BatchData" : nullptr,
1335  refcount),
1336  call_attempt_(std::move(attempt)) {
1337  if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) {
1338  gpr_log(GPR_INFO, "chand=%p calld=%p attempt=%p: creating batch %p",
1339  call_attempt_->calld_->chand_, call_attempt_->calld_,
1340  call_attempt_.get(), this);
1341  }
1342  // We hold a ref to the call stack for every batch sent on a call attempt.
1343  // This is because some batches on the call attempt may not complete
1344  // until after all of the batches are completed at the surface (because
1345  // each batch that is pending at the surface holds a ref). This
1346  // can happen for replayed send ops, and it can happen for
1347  // recv_initial_metadata and recv_message ops on a call attempt that has
1348  // been abandoned.
1349  GRPC_CALL_STACK_REF(call_attempt_->calld_->owning_call_, "Retry BatchData");
1350  batch_.payload = &call_attempt_->batch_payload_;
1351  if (set_on_complete) {
1352  GRPC_CLOSURE_INIT(&on_complete_, OnComplete, this, nullptr);
1354  }
1355 }
1356 
1357 RetryFilter::CallData::CallAttempt::BatchData::~BatchData() {
1358  if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) {
1359  gpr_log(GPR_INFO, "chand=%p calld=%p attempt=%p: destroying batch %p",
1360  call_attempt_->calld_->chand_, call_attempt_->calld_,
1361  call_attempt_.get(), this);
1362  }
1363  GRPC_CALL_STACK_UNREF(call_attempt_->calld_->owning_call_, "Retry BatchData");
1364  call_attempt_.reset(DEBUG_LOCATION, "~BatchData");
1365 }
1366 
1367 void RetryFilter::CallData::CallAttempt::BatchData::
1368  FreeCachedSendOpDataForCompletedBatch() {
1369  auto* calld = call_attempt_->calld_;
1370  // TODO(roth): When we implement hedging, this logic will need to get
1371  // a bit more complex, because there may be other (now abandoned) call
1372  // attempts still using this data. We may need to do some sort of
1373  // ref-counting instead.
1375  calld->FreeCachedSendInitialMetadata();
1376  }
1377  if (batch_.send_message) {
1378  calld->FreeCachedSendMessage(call_attempt_->completed_send_message_count_ -
1379  1);
1380  }
1382  calld->FreeCachedSendTrailingMetadata();
1383  }
1384 }
1385 
1386 //
1387 // recv_initial_metadata callback handling
1388 //
1389 
1390 void RetryFilter::CallData::CallAttempt::BatchData::
1391  MaybeAddClosureForRecvInitialMetadataCallback(
1392  grpc_error_handle error, CallCombinerClosureList* closures) {
1393  // Find pending batch.
1394  PendingBatch* pending = call_attempt_->calld_->PendingBatchFind(
1395  "invoking recv_initial_metadata_ready for",
1397  return batch->recv_initial_metadata &&
1399  .recv_initial_metadata_ready != nullptr;
1400  });
1401  if (pending == nullptr) {
1403  return;
1404  }
1405  // Return metadata.
1406  *pending->batch->payload->recv_initial_metadata.recv_initial_metadata =
1407  std::move(call_attempt_->recv_initial_metadata_);
1408  // Propagate trailing_metadata_available.
1409  *pending->batch->payload->recv_initial_metadata.trailing_metadata_available =
1410  call_attempt_->trailing_metadata_available_;
1411  // Update bookkeeping.
1412  // Note: Need to do this before invoking the callback, since invoking
1413  // the callback will result in yielding the call combiner.
1415  pending->batch->payload->recv_initial_metadata
1416  .recv_initial_metadata_ready;
1417  pending->batch->payload->recv_initial_metadata.recv_initial_metadata_ready =
1418  nullptr;
1419  call_attempt_->calld_->MaybeClearPendingBatch(pending);
1420  // Add callback to closures.
1421  closures->Add(recv_initial_metadata_ready, error,
1422  "recv_initial_metadata_ready for pending batch");
1423 }
1424 
1425 void RetryFilter::CallData::CallAttempt::BatchData::RecvInitialMetadataReady(
1426  void* arg, grpc_error_handle error) {
1427  RefCountedPtr<BatchData> batch_data(static_cast<BatchData*>(arg));
1428  CallAttempt* call_attempt = batch_data->call_attempt_.get();
1429  CallData* calld = call_attempt->calld_;
1430  if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) {
1431  gpr_log(GPR_INFO,
1432  "chand=%p calld=%p attempt=%p batch_data=%p: "
1433  "got recv_initial_metadata_ready, error=%s",
1434  calld->chand_, calld, call_attempt, batch_data.get(),
1436  }
1437  call_attempt->completed_recv_initial_metadata_ = true;
1438  // If this attempt has been abandoned, then we're not going to use the
1439  // result of this recv_initial_metadata op, so do nothing.
1440  if (call_attempt->abandoned_) {
1442  calld->call_combiner_,
1443  "recv_initial_metadata_ready for abandoned attempt");
1444  return;
1445  }
1446  // Cancel per-attempt recv timer, if any.
1447  call_attempt->MaybeCancelPerAttemptRecvTimer();
1448  // If we're not committed, check the response to see if we need to commit.
1449  if (!calld->retry_committed_) {
1450  // If we got an error or a Trailers-Only response and have not yet gotten
1451  // the recv_trailing_metadata_ready callback, then defer propagating this
1452  // callback back to the surface. We can evaluate whether to retry when
1453  // recv_trailing_metadata comes back.
1454  if (GPR_UNLIKELY((call_attempt->trailing_metadata_available_ ||
1456  !call_attempt->completed_recv_trailing_metadata_)) {
1457  if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) {
1458  gpr_log(GPR_INFO,
1459  "chand=%p calld=%p attempt=%p: deferring "
1460  "recv_initial_metadata_ready (Trailers-Only)",
1461  calld->chand_, calld, call_attempt);
1462  }
1463  call_attempt->recv_initial_metadata_ready_deferred_batch_ =
1464  std::move(batch_data);
1465  call_attempt->recv_initial_metadata_error_ = GRPC_ERROR_REF(error);
1466  CallCombinerClosureList closures;
1467  if (!GRPC_ERROR_IS_NONE(error)) {
1468  call_attempt->MaybeAddBatchForCancelOp(GRPC_ERROR_REF(error),
1469  &closures);
1470  }
1471  if (!call_attempt->started_recv_trailing_metadata_) {
1472  // recv_trailing_metadata not yet started by application; start it
1473  // ourselves to get status.
1474  call_attempt->AddBatchForInternalRecvTrailingMetadata(&closures);
1475  }
1476  closures.RunClosures(calld->call_combiner_);
1477  return;
1478  }
1479  // Received valid initial metadata, so commit the call.
1480  calld->RetryCommit(call_attempt);
1481  // If retry state is no longer needed, switch to fast path for
1482  // subsequent batches.
1483  call_attempt->MaybeSwitchToFastPath();
1484  }
1485  // Invoke the callback to return the result to the surface.
1486  CallCombinerClosureList closures;
1487  batch_data->MaybeAddClosureForRecvInitialMetadataCallback(
1488  GRPC_ERROR_REF(error), &closures);
1489  closures.RunClosures(calld->call_combiner_);
1490 }
1491 
1492 //
1493 // recv_message callback handling
1494 //
1495 
1496 void RetryFilter::CallData::CallAttempt::BatchData::
1497  MaybeAddClosureForRecvMessageCallback(grpc_error_handle error,
1498  CallCombinerClosureList* closures) {
1499  // Find pending op.
1500  PendingBatch* pending = call_attempt_->calld_->PendingBatchFind(
1501  "invoking recv_message_ready for",
1503  return batch->recv_message &&
1504  batch->payload->recv_message.recv_message_ready != nullptr;
1505  });
1506  if (pending == nullptr) {
1508  return;
1509  }
1510  // Return payload.
1511  *pending->batch->payload->recv_message.recv_message =
1512  std::move(call_attempt_->recv_message_);
1513  *pending->batch->payload->recv_message.flags =
1514  call_attempt_->recv_message_flags_;
1515  // Update bookkeeping.
1516  // Note: Need to do this before invoking the callback, since invoking
1517  // the callback will result in yielding the call combiner.
1519  pending->batch->payload->recv_message.recv_message_ready;
1520  pending->batch->payload->recv_message.recv_message_ready = nullptr;
1521  call_attempt_->calld_->MaybeClearPendingBatch(pending);
1522  // Add callback to closures.
1523  closures->Add(recv_message_ready, error,
1524  "recv_message_ready for pending batch");
1525 }
1526 
1527 void RetryFilter::CallData::CallAttempt::BatchData::RecvMessageReady(
1528  void* arg, grpc_error_handle error) {
1529  RefCountedPtr<BatchData> batch_data(static_cast<BatchData*>(arg));
1530  CallAttempt* call_attempt = batch_data->call_attempt_.get();
1531  CallData* calld = call_attempt->calld_;
1532  if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) {
1533  gpr_log(GPR_INFO,
1534  "chand=%p calld=%p attempt=%p batch_data=%p: "
1535  "got recv_message_ready, error=%s",
1536  calld->chand_, calld, call_attempt, batch_data.get(),
1538  }
1539  ++call_attempt->completed_recv_message_count_;
1540  // If this attempt has been abandoned, then we're not going to use the
1541  // result of this recv_message op, so do nothing.
1542  if (call_attempt->abandoned_) {
1543  // The transport will not invoke recv_trailing_metadata_ready until the byte
1544  // stream for any recv_message op is orphaned, so we do that here to ensure
1545  // that any pending recv_trailing_metadata op can complete.
1546  call_attempt->recv_message_.reset();
1547  GRPC_CALL_COMBINER_STOP(calld->call_combiner_,
1548  "recv_message_ready for abandoned attempt");
1549  return;
1550  }
1551  // Cancel per-attempt recv timer, if any.
1552  call_attempt->MaybeCancelPerAttemptRecvTimer();
1553  // If we're not committed, check the response to see if we need to commit.
1554  if (!calld->retry_committed_) {
1555  // If we got an error or the payload was nullptr and we have not yet gotten
1556  // the recv_trailing_metadata_ready callback, then defer propagating this
1557  // callback back to the surface. We can evaluate whether to retry when
1558  // recv_trailing_metadata comes back.
1559  if (GPR_UNLIKELY((!call_attempt->recv_message_.has_value() ||
1561  !call_attempt->completed_recv_trailing_metadata_)) {
1562  if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) {
1563  gpr_log(GPR_INFO,
1564  "chand=%p calld=%p attempt=%p: deferring recv_message_ready "
1565  "(nullptr message and recv_trailing_metadata pending)",
1566  calld->chand_, calld, call_attempt);
1567  }
1568  call_attempt->recv_message_ready_deferred_batch_ = std::move(batch_data);
1569  call_attempt->recv_message_error_ = GRPC_ERROR_REF(error);
1570  CallCombinerClosureList closures;
1571  if (!GRPC_ERROR_IS_NONE(error)) {
1572  call_attempt->MaybeAddBatchForCancelOp(GRPC_ERROR_REF(error),
1573  &closures);
1574  }
1575  if (!call_attempt->started_recv_trailing_metadata_) {
1576  // recv_trailing_metadata not yet started by application; start it
1577  // ourselves to get status.
1578  call_attempt->AddBatchForInternalRecvTrailingMetadata(&closures);
1579  }
1580  closures.RunClosures(calld->call_combiner_);
1581  return;
1582  }
1583  // Received a valid message, so commit the call.
1584  calld->RetryCommit(call_attempt);
1585  // If retry state is no longer needed, switch to fast path for
1586  // subsequent batches.
1587  call_attempt->MaybeSwitchToFastPath();
1588  }
1589  // Invoke the callback to return the result to the surface.
1590  CallCombinerClosureList closures;
1591  batch_data->MaybeAddClosureForRecvMessageCallback(GRPC_ERROR_REF(error),
1592  &closures);
1593  closures.RunClosures(calld->call_combiner_);
1594 }
1595 
1596 //
1597 // recv_trailing_metadata handling
1598 //
1599 
1600 namespace {
1601 
1602 // Sets *status, *server_pushback, and *is_lb_drop based on md_batch
1603 // and error.
1604 void GetCallStatus(
1607  bool* is_lb_drop,
1608  absl::optional<GrpcStreamNetworkState::ValueType>* stream_network_state) {
1609  if (!GRPC_ERROR_IS_NONE(error)) {
1610  grpc_error_get_status(error, deadline, status, nullptr, nullptr, nullptr);
1611  intptr_t value = 0;
1613  value != 0) {
1614  *is_lb_drop = true;
1615  }
1616  } else {
1617  *status = *md_batch->get(GrpcStatusMetadata());
1618  }
1619  *server_pushback = md_batch->get(GrpcRetryPushbackMsMetadata());
1620  *stream_network_state = md_batch->get(GrpcStreamNetworkState());
1622 }
1623 
1624 } // namespace
1625 
1626 void RetryFilter::CallData::CallAttempt::BatchData::
1627  MaybeAddClosureForRecvTrailingMetadataReady(
1628  grpc_error_handle error, CallCombinerClosureList* closures) {
1629  auto* calld = call_attempt_->calld_;
1630  // Find pending batch.
1631  PendingBatch* pending = calld->PendingBatchFind(
1632  "invoking recv_trailing_metadata_ready for",
1634  return batch->recv_trailing_metadata &&
1636  .recv_trailing_metadata_ready != nullptr;
1637  });
1638  // If we generated the recv_trailing_metadata op internally via
1639  // AddBatchForInternalRecvTrailingMetadata(), then there will be no
1640  // pending batch.
1641  if (pending == nullptr) {
1642  call_attempt_->recv_trailing_metadata_error_ = error;
1643  return;
1644  }
1645  // Copy transport stats to be delivered up to the surface.
1647  &call_attempt_->collect_stats_,
1648  pending->batch->payload->recv_trailing_metadata.collect_stats);
1649  // Return metadata.
1650  *pending->batch->payload->recv_trailing_metadata.recv_trailing_metadata =
1651  std::move(call_attempt_->recv_trailing_metadata_);
1652  // Add closure.
1653  closures->Add(pending->batch->payload->recv_trailing_metadata
1654  .recv_trailing_metadata_ready,
1655  error, "recv_trailing_metadata_ready for pending batch");
1656  // Update bookkeeping.
1657  pending->batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready =
1658  nullptr;
1659  calld->MaybeClearPendingBatch(pending);
1660 }
1661 
1662 void RetryFilter::CallData::CallAttempt::BatchData::
1663  AddClosuresForDeferredCompletionCallbacks(
1664  CallCombinerClosureList* closures) {
1665  // Add closure for deferred recv_initial_metadata_ready.
1666  if (GPR_UNLIKELY(call_attempt_->recv_initial_metadata_ready_deferred_batch_ !=
1667  nullptr)) {
1668  MaybeAddClosureForRecvInitialMetadataCallback(
1669  call_attempt_->recv_initial_metadata_error_, closures);
1670  call_attempt_->recv_initial_metadata_ready_deferred_batch_.reset(
1671  DEBUG_LOCATION, "resuming deferred recv_initial_metadata_ready");
1672  call_attempt_->recv_initial_metadata_error_ = GRPC_ERROR_NONE;
1673  }
1674  // Add closure for deferred recv_message_ready.
1675  if (GPR_UNLIKELY(call_attempt_->recv_message_ready_deferred_batch_ !=
1676  nullptr)) {
1677  MaybeAddClosureForRecvMessageCallback(call_attempt_->recv_message_error_,
1678  closures);
1679  call_attempt_->recv_message_ready_deferred_batch_.reset(
1680  DEBUG_LOCATION, "resuming deferred recv_message_ready");
1681  call_attempt_->recv_message_error_ = GRPC_ERROR_NONE;
1682  }
1683  // Add closures for deferred on_complete callbacks.
1684  for (auto& on_complete_deferred_batch :
1685  call_attempt_->on_complete_deferred_batches_) {
1686  closures->Add(&on_complete_deferred_batch.batch->on_complete_,
1687  on_complete_deferred_batch.error, "resuming on_complete");
1688  on_complete_deferred_batch.batch.release();
1689  }
1690  call_attempt_->on_complete_deferred_batches_.clear();
1691 }
1692 
1693 void RetryFilter::CallData::CallAttempt::BatchData::
1694  AddClosuresToFailUnstartedPendingBatches(
1695  grpc_error_handle error, CallCombinerClosureList* closures) {
1696  auto* calld = call_attempt_->calld_;
1697  for (size_t i = 0; i < GPR_ARRAY_SIZE(calld->pending_batches_); ++i) {
1698  PendingBatch* pending = &calld->pending_batches_[i];
1699  if (pending->batch == nullptr) continue;
1700  if (call_attempt_->PendingBatchContainsUnstartedSendOps(pending)) {
1701  closures->Add(pending->batch->on_complete, GRPC_ERROR_REF(error),
1702  "failing on_complete for pending batch");
1703  pending->batch->on_complete = nullptr;
1704  calld->MaybeClearPendingBatch(pending);
1705  }
1706  }
1708 }
1709 
1710 void RetryFilter::CallData::CallAttempt::BatchData::RunClosuresForCompletedCall(
1712  // Construct list of closures to execute.
1713  CallCombinerClosureList closures;
1714  // First, add closure for recv_trailing_metadata_ready.
1715  MaybeAddClosureForRecvTrailingMetadataReady(GRPC_ERROR_REF(error), &closures);
1716  // If there are deferred batch completion callbacks, add them to closures.
1717  AddClosuresForDeferredCompletionCallbacks(&closures);
1718  // Add closures to fail any pending batches that have not yet been started.
1719  AddClosuresToFailUnstartedPendingBatches(GRPC_ERROR_REF(error), &closures);
1720  // Schedule all of the closures identified above.
1721  // Note: This will release the call combiner.
1722  closures.RunClosures(call_attempt_->calld_->call_combiner_);
1724 }
1725 
1726 void RetryFilter::CallData::CallAttempt::BatchData::RecvTrailingMetadataReady(
1727  void* arg, grpc_error_handle error) {
1728  // Add a ref to the outer call stack. This works around a TSAN reported
1729  // failure that we do not understand, but since we expect the lifetime of
1730  // this code to be relatively short we think it's OK to work around for now.
1731  // TSAN failure:
1732  // https://source.cloud.google.com/results/invocations/5b122974-4977-4862-beb1-dc1af9fbbd1d/targets/%2F%2Ftest%2Fcore%2Fend2end:h2_census_test@max_concurrent_streams@poller%3Dpoll/log
1733  RefCountedPtr<grpc_call_stack> owning_call_stack =
1734  static_cast<BatchData*>(arg)->call_attempt_->calld_->owning_call_->Ref();
1735  RefCountedPtr<BatchData> batch_data(static_cast<BatchData*>(arg));
1736  CallAttempt* call_attempt = batch_data->call_attempt_.get();
1737  CallData* calld = call_attempt->calld_;
1738  if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) {
1739  gpr_log(GPR_INFO,
1740  "chand=%p calld=%p attempt=%p batch_data=%p: "
1741  "got recv_trailing_metadata_ready, error=%s",
1742  calld->chand_, calld, call_attempt, batch_data.get(),
1744  }
1745  call_attempt->completed_recv_trailing_metadata_ = true;
1746  // If this attempt has been abandoned, then we're not going to use the
1747  // result of this recv_trailing_metadata op, so do nothing.
1748  if (call_attempt->abandoned_) {
1750  calld->call_combiner_,
1751  "recv_trailing_metadata_ready for abandoned attempt");
1752  return;
1753  }
1754  // Cancel per-attempt recv timer, if any.
1755  call_attempt->MaybeCancelPerAttemptRecvTimer();
1756  // Get the call's status and check for server pushback metadata.
1758  absl::optional<Duration> server_pushback;
1759  bool is_lb_drop = false;
1761  grpc_metadata_batch* md_batch =
1762  batch_data->batch_.payload->recv_trailing_metadata.recv_trailing_metadata;
1763  GetCallStatus(calld->deadline_, md_batch, GRPC_ERROR_REF(error), &status,
1764  &server_pushback, &is_lb_drop, &stream_network_state);
1765  if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) {
1766  gpr_log(GPR_INFO,
1767  "chand=%p calld=%p attempt=%p: call finished, status=%s "
1768  "server_pushback=%s is_lb_drop=%d stream_network_state=%s",
1769  calld->chand_, calld, call_attempt,
1771  server_pushback.has_value() ? server_pushback->ToString().c_str()
1772  : "N/A",
1773  is_lb_drop,
1774  stream_network_state.has_value()
1775  ? absl::StrCat(*stream_network_state).c_str()
1776  : "N/A");
1777  }
1778  // Check if we should retry.
1779  if (!is_lb_drop) { // Never retry on LB drops.
1780  enum { kNoRetry, kTransparentRetry, kConfigurableRetry } retry = kNoRetry;
1781  // Handle transparent retries.
1782  if (stream_network_state.has_value() && !calld->retry_committed_) {
1783  // If not sent on wire, then always retry.
1784  // If sent on wire but not seen by server, retry exactly once.
1785  if (*stream_network_state == GrpcStreamNetworkState::kNotSentOnWire) {
1786  retry = kTransparentRetry;
1787  } else if (*stream_network_state ==
1788  GrpcStreamNetworkState::kNotSeenByServer &&
1789  !calld->sent_transparent_retry_not_seen_by_server_) {
1790  calld->sent_transparent_retry_not_seen_by_server_ = true;
1791  retry = kTransparentRetry;
1792  }
1793  }
1794  // If not transparently retrying, check for configurable retry.
1795  if (retry == kNoRetry &&
1796  call_attempt->ShouldRetry(status, server_pushback)) {
1797  retry = kConfigurableRetry;
1798  }
1799  // If we're retrying, do so.
1800  if (retry != kNoRetry) {
1801  CallCombinerClosureList closures;
1802  // Cancel call attempt.
1803  call_attempt->MaybeAddBatchForCancelOp(
1806  GRPC_ERROR_CREATE_FROM_STATIC_STRING("call attempt failed"),
1808  : GRPC_ERROR_REF(error),
1809  &closures);
1810  // For transparent retries, add a closure to immediately start a new
1811  // call attempt.
1812  // For configurable retries, start retry timer.
1813  if (retry == kTransparentRetry) {
1814  calld->AddClosureToStartTransparentRetry(&closures);
1815  } else {
1816  calld->StartRetryTimer(server_pushback);
1817  }
1818  // Record that this attempt has been abandoned.
1819  call_attempt->Abandon();
1820  // Yields call combiner.
1821  closures.RunClosures(calld->call_combiner_);
1822  return;
1823  }
1824  }
1825  // Not retrying, so commit the call.
1826  calld->RetryCommit(call_attempt);
1827  // If retry state is no longer needed, switch to fast path for
1828  // subsequent batches.
1829  call_attempt->MaybeSwitchToFastPath();
1830  // Run any necessary closures.
1831  batch_data->RunClosuresForCompletedCall(GRPC_ERROR_REF(error));
1832 }
1833 
1834 //
1835 // on_complete callback handling
1836 //
1837 
1838 void RetryFilter::CallData::CallAttempt::BatchData::
1839  AddClosuresForCompletedPendingBatch(grpc_error_handle error,
1840  CallCombinerClosureList* closures) {
1841  auto* calld = call_attempt_->calld_;
1842  PendingBatch* pending = calld->PendingBatchFind(
1843  "completed", [this](grpc_transport_stream_op_batch* batch) {
1844  // Match the pending batch with the same set of send ops as the
1845  // batch we've just completed.
1846  return batch->on_complete != nullptr &&
1850  });
1851  // If batch_data is a replay batch, then there will be no pending
1852  // batch to complete.
1853  if (pending == nullptr) {
1855  return;
1856  }
1857  // Propagate payload.
1858  if (batch_.send_message) {
1859  pending->batch->payload->send_message.stream_write_closed =
1860  batch_.payload->send_message.stream_write_closed;
1861  }
1862  // Add closure.
1863  closures->Add(pending->batch->on_complete, error,
1864  "on_complete for pending batch");
1865  pending->batch->on_complete = nullptr;
1866  calld->MaybeClearPendingBatch(pending);
1867 }
1868 
1869 void RetryFilter::CallData::CallAttempt::BatchData::
1870  AddClosuresForReplayOrPendingSendOps(CallCombinerClosureList* closures) {
1871  auto* calld = call_attempt_->calld_;
1872  bool have_pending_send_ops = call_attempt_->HaveSendOpsToReplay();
1873  // We don't check send_initial_metadata here, because that op will always
1874  // be started as soon as it is received from the surface, so it will
1875  // never need to be started at this point.
1876  if (!have_pending_send_ops) {
1877  for (size_t i = 0; i < GPR_ARRAY_SIZE(calld->pending_batches_); ++i) {
1878  PendingBatch* pending = &calld->pending_batches_[i];
1880  if (batch == nullptr || pending->send_ops_cached) continue;
1882  have_pending_send_ops = true;
1883  break;
1884  }
1885  }
1886  }
1887  if (have_pending_send_ops) {
1888  if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) {
1889  gpr_log(GPR_INFO,
1890  "chand=%p calld=%p attempt=%p: starting next batch for pending "
1891  "send op(s)",
1892  calld->chand_, calld, call_attempt_.get());
1893  }
1894  call_attempt_->AddRetriableBatches(closures);
1895  }
1896 }
1897 
1898 void RetryFilter::CallData::CallAttempt::BatchData::OnComplete(
1899  void* arg, grpc_error_handle error) {
1900  RefCountedPtr<BatchData> batch_data(static_cast<BatchData*>(arg));
1901  CallAttempt* call_attempt = batch_data->call_attempt_.get();
1902  CallData* calld = call_attempt->calld_;
1903  if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) {
1904  gpr_log(GPR_INFO,
1905  "chand=%p calld=%p attempt=%p batch_data=%p: "
1906  "got on_complete, error=%s, batch=%s",
1907  calld->chand_, calld, call_attempt, batch_data.get(),
1909  grpc_transport_stream_op_batch_string(&batch_data->batch_).c_str());
1910  }
1911  // If this attempt has been abandoned, then we're not going to propagate
1912  // the completion of this batch, so do nothing.
1913  if (call_attempt->abandoned_) {
1914  GRPC_CALL_COMBINER_STOP(calld->call_combiner_,
1915  "on_complete for abandoned attempt");
1916  return;
1917  }
1918  // If we got an error and have not yet gotten the
1919  // recv_trailing_metadata_ready callback, then defer propagating this
1920  // callback back to the surface. We can evaluate whether to retry when
1921  // recv_trailing_metadata comes back.
1922  if (GPR_UNLIKELY(!calld->retry_committed_ && !GRPC_ERROR_IS_NONE(error) &&
1923  !call_attempt->completed_recv_trailing_metadata_)) {
1924  if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) {
1925  gpr_log(GPR_INFO, "chand=%p calld=%p attempt=%p: deferring on_complete",
1926  calld->chand_, calld, call_attempt);
1927  }
1928  call_attempt->on_complete_deferred_batches_.emplace_back(
1929  std::move(batch_data), GRPC_ERROR_REF(error));
1930  CallCombinerClosureList closures;
1931  call_attempt->MaybeAddBatchForCancelOp(GRPC_ERROR_REF(error), &closures);
1932  if (!call_attempt->started_recv_trailing_metadata_) {
1933  // recv_trailing_metadata not yet started by application; start it
1934  // ourselves to get status.
1935  call_attempt->AddBatchForInternalRecvTrailingMetadata(&closures);
1936  }
1937  closures.RunClosures(calld->call_combiner_);
1938  return;
1939  }
1940  // Update bookkeeping in call_attempt.
1941  if (batch_data->batch_.send_initial_metadata) {
1942  call_attempt->completed_send_initial_metadata_ = true;
1943  }
1944  if (batch_data->batch_.send_message) {
1945  ++call_attempt->completed_send_message_count_;
1946  }
1947  if (batch_data->batch_.send_trailing_metadata) {
1948  call_attempt->completed_send_trailing_metadata_ = true;
1949  }
1950  // If the call is committed, free cached data for send ops that we've just
1951  // completed.
1952  if (calld->retry_committed_) {
1953  batch_data->FreeCachedSendOpDataForCompletedBatch();
1954  }
1955  // Construct list of closures to execute.
1956  CallCombinerClosureList closures;
1957  // Add closure for the completed pending batch, if any.
1958  batch_data->AddClosuresForCompletedPendingBatch(GRPC_ERROR_REF(error),
1959  &closures);
1960  // If needed, add a callback to start any replay or pending send ops on
1961  // the LB call.
1962  if (!call_attempt->completed_recv_trailing_metadata_) {
1963  batch_data->AddClosuresForReplayOrPendingSendOps(&closures);
1964  }
1965  // If retry state is no longer needed (i.e., we're committed and there
1966  // are no more send ops to replay), switch to fast path for subsequent
1967  // batches.
1968  call_attempt->MaybeSwitchToFastPath();
1969  // Schedule all of the closures identified above.
1970  // Note: This yields the call combiner.
1971  closures.RunClosures(calld->call_combiner_);
1972 }
1973 
1974 void RetryFilter::CallData::CallAttempt::BatchData::OnCompleteForCancelOp(
1975  void* arg, grpc_error_handle error) {
1976  RefCountedPtr<BatchData> batch_data(static_cast<BatchData*>(arg));
1977  CallAttempt* call_attempt = batch_data->call_attempt_.get();
1978  CallData* calld = call_attempt->calld_;
1979  if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) {
1980  gpr_log(GPR_INFO,
1981  "chand=%p calld=%p attempt=%p batch_data=%p: "
1982  "got on_complete for cancel_stream batch, error=%s, batch=%s",
1983  calld->chand_, calld, call_attempt, batch_data.get(),
1985  grpc_transport_stream_op_batch_string(&batch_data->batch_).c_str());
1986  }
1988  calld->call_combiner_,
1989  "on_complete for internally generated cancel_stream op");
1990 }
1991 
1992 //
1993 // retriable batch construction
1994 //
1995 
1996 void RetryFilter::CallData::CallAttempt::BatchData::
1997  AddRetriableSendInitialMetadataOp() {
1998  auto* calld = call_attempt_->calld_;
1999  // We need to make a copy of the metadata batch for each attempt, since
2000  // the filters in the subchannel stack may modify this batch, and we don't
2001  // want those modifications to be passed forward to subsequent attempts.
2002  //
2003  // If we've already completed one or more attempts, add the
2004  // grpc-retry-attempts header.
2005  call_attempt_->send_initial_metadata_ = calld->send_initial_metadata_.Copy();
2006  if (GPR_UNLIKELY(calld->num_attempts_completed_ > 0)) {
2007  call_attempt_->send_initial_metadata_.Set(GrpcPreviousRpcAttemptsMetadata(),
2008  calld->num_attempts_completed_);
2009  } else {
2010  call_attempt_->send_initial_metadata_.Remove(
2011  GrpcPreviousRpcAttemptsMetadata());
2012  }
2013  call_attempt_->started_send_initial_metadata_ = true;
2015  batch_.payload->send_initial_metadata.send_initial_metadata =
2016  &call_attempt_->send_initial_metadata_;
2017  batch_.payload->send_initial_metadata.send_initial_metadata_flags =
2018  calld->send_initial_metadata_flags_;
2019  batch_.payload->send_initial_metadata.peer_string = calld->peer_string_;
2020 }
2021 
2022 void RetryFilter::CallData::CallAttempt::BatchData::
2023  AddRetriableSendMessageOp() {
2024  auto* calld = call_attempt_->calld_;
2025  if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) {
2026  gpr_log(
2027  GPR_INFO,
2028  "chand=%p calld=%p attempt=%p: starting calld->send_messages[%" PRIuPTR
2029  "]",
2030  calld->chand_, calld, call_attempt_.get(),
2031  call_attempt_->started_send_message_count_);
2032  }
2033  CachedSendMessage cache =
2034  calld->send_messages_[call_attempt_->started_send_message_count_];
2035  ++call_attempt_->started_send_message_count_;
2036  batch_.send_message = true;
2037  batch_.payload->send_message.send_message = cache.slices;
2038  batch_.payload->send_message.flags = cache.flags;
2039 }
2040 
2041 void RetryFilter::CallData::CallAttempt::BatchData::
2042  AddRetriableSendTrailingMetadataOp() {
2043  auto* calld = call_attempt_->calld_;
2044  // We need to make a copy of the metadata batch for each attempt, since
2045  // the filters in the subchannel stack may modify this batch, and we don't
2046  // want those modifications to be passed forward to subsequent attempts.
2047  call_attempt_->send_trailing_metadata_ =
2048  calld->send_trailing_metadata_.Copy();
2049  call_attempt_->started_send_trailing_metadata_ = true;
2051  batch_.payload->send_trailing_metadata.send_trailing_metadata =
2052  &call_attempt_->send_trailing_metadata_;
2053 }
2054 
2055 void RetryFilter::CallData::CallAttempt::BatchData::
2056  AddRetriableRecvInitialMetadataOp() {
2057  call_attempt_->started_recv_initial_metadata_ = true;
2059  call_attempt_->recv_initial_metadata_.Clear();
2060  batch_.payload->recv_initial_metadata.recv_initial_metadata =
2061  &call_attempt_->recv_initial_metadata_;
2062  batch_.payload->recv_initial_metadata.trailing_metadata_available =
2063  &call_attempt_->trailing_metadata_available_;
2064  GRPC_CLOSURE_INIT(&call_attempt_->recv_initial_metadata_ready_,
2065  RecvInitialMetadataReady, this, grpc_schedule_on_exec_ctx);
2066  batch_.payload->recv_initial_metadata.recv_initial_metadata_ready =
2067  &call_attempt_->recv_initial_metadata_ready_;
2068 }
2069 
2070 void RetryFilter::CallData::CallAttempt::BatchData::
2071  AddRetriableRecvMessageOp() {
2072  ++call_attempt_->started_recv_message_count_;
2073  batch_.recv_message = true;
2074  batch_.payload->recv_message.recv_message = &call_attempt_->recv_message_;
2075  batch_.payload->recv_message.flags = &call_attempt_->recv_message_flags_;
2076  batch_.payload->recv_message.call_failed_before_recv_message = nullptr;
2077  GRPC_CLOSURE_INIT(&call_attempt_->recv_message_ready_, RecvMessageReady, this,
2078  grpc_schedule_on_exec_ctx);
2079  batch_.payload->recv_message.recv_message_ready =
2080  &call_attempt_->recv_message_ready_;
2081 }
2082 
2083 void RetryFilter::CallData::CallAttempt::BatchData::
2084  AddRetriableRecvTrailingMetadataOp() {
2085  call_attempt_->started_recv_trailing_metadata_ = true;
2087  call_attempt_->recv_trailing_metadata_.Clear();
2088  batch_.payload->recv_trailing_metadata.recv_trailing_metadata =
2089  &call_attempt_->recv_trailing_metadata_;
2090  batch_.payload->recv_trailing_metadata.collect_stats =
2091  &call_attempt_->collect_stats_;
2092  GRPC_CLOSURE_INIT(&call_attempt_->recv_trailing_metadata_ready_,
2093  RecvTrailingMetadataReady, this, grpc_schedule_on_exec_ctx);
2094  batch_.payload->recv_trailing_metadata.recv_trailing_metadata_ready =
2095  &call_attempt_->recv_trailing_metadata_ready_;
2096 }
2097 
2098 void RetryFilter::CallData::CallAttempt::BatchData::AddCancelStreamOp(
2100  batch_.cancel_stream = true;
2102  // Override on_complete callback.
2103  GRPC_CLOSURE_INIT(&on_complete_, OnCompleteForCancelOp, this, nullptr);
2104 }
2105 
2106 //
2107 // CallData vtable functions
2108 //
2109 
2112  auto* chand = static_cast<RetryFilter*>(elem->channel_data);
2113  new (elem->call_data) CallData(chand, *args);
2114  if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) {
2115  gpr_log(GPR_INFO, "chand=%p calld=%p: created call", chand,
2116  elem->call_data);
2117  }
2118  return GRPC_ERROR_NONE;
2119 }
2120 
2122  const grpc_call_final_info* /*final_info*/,
2123  grpc_closure* then_schedule_closure) {
2124  auto* calld = static_cast<CallData*>(elem->call_data);
2125  // Save our ref to the CallStackDestructionBarrier until after our
2126  // dtor is invoked.
2127  RefCountedPtr<CallStackDestructionBarrier> call_stack_destruction_barrier =
2128  std::move(calld->call_stack_destruction_barrier_);
2129  calld->~CallData();
2130  // Now set the callback in the CallStackDestructionBarrier object,
2131  // right before we release our ref to it (implicitly upon returning).
2132  // The callback will be invoked when the CallStackDestructionBarrier
2133  // is destroyed.
2134  call_stack_destruction_barrier->set_on_call_stack_destruction(
2135  then_schedule_closure);
2136 }
2137 
2138 void RetryFilter::CallData::StartTransportStreamOpBatch(
2140  auto* calld = static_cast<CallData*>(elem->call_data);
2141  calld->StartTransportStreamOpBatch(batch);
2142 }
2143 
2144 void RetryFilter::CallData::SetPollent(grpc_call_element* elem,
2145  grpc_polling_entity* pollent) {
2146  auto* calld = static_cast<CallData*>(elem->call_data);
2147  calld->pollent_ = pollent;
2148 }
2149 
2150 //
2151 // CallData implementation
2152 //
2153 
2154 const RetryMethodConfig* RetryFilter::GetRetryPolicy(
2156  if (context == nullptr) return nullptr;
2157  auto* svc_cfg_call_data = static_cast<ServiceConfigCallData*>(
2159  if (svc_cfg_call_data == nullptr) return nullptr;
2160  return static_cast<const RetryMethodConfig*>(
2161  svc_cfg_call_data->GetMethodParsedConfig(service_config_parser_index_));
2162 }
2163 
2164 RetryFilter::CallData::CallData(RetryFilter* chand,
2166  : chand_(chand),
2168  retry_policy_(chand->GetRetryPolicy(args.context)),
2170  BackOff::Options()
2171  .set_initial_backoff(retry_policy_ == nullptr
2172  ? Duration::Zero()
2173  : retry_policy_->initial_backoff())
2174  .set_multiplier(retry_policy_ == nullptr
2175  ? 0
2176  : retry_policy_->backoff_multiplier())
2177  .set_jitter(RETRY_BACKOFF_JITTER)
2178  .set_max_backoff(retry_policy_ == nullptr
2179  ? Duration::Zero()
2180  : retry_policy_->max_backoff())),
2182  deadline_(args.deadline),
2183  arena_(args.arena),
2184  owning_call_(args.call_stack),
2185  call_combiner_(args.call_combiner),
2188  arena_->New<CallStackDestructionBarrier>()),
2196 
2197 RetryFilter::CallData::~CallData() {
2198  FreeAllCachedSendOpData();
2200  // Make sure there are no remaining pending batches.
2201  for (size_t i = 0; i < GPR_ARRAY_SIZE(pending_batches_); ++i) {
2202  GPR_ASSERT(pending_batches_[i].batch == nullptr);
2203  }
2205 }
2206 
2207 void RetryFilter::CallData::StartTransportStreamOpBatch(
2209  if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace) &&
2211  gpr_log(GPR_INFO, "chand=%p calld=%p: batch started from surface: %s",
2213  }
2214  // If we have an LB call, delegate to the LB call.
2215  if (committed_call_ != nullptr) {
2216  // Note: This will release the call combiner.
2217  committed_call_->StartTransportStreamOpBatch(batch);
2218  return;
2219  }
2220  // If we were previously cancelled from the surface, fail this
2221  // batch immediately.
2223  // Note: This will release the call combiner.
2226  return;
2227  }
2228  // Handle cancellation.
2230  // Save cancel_error in case subsequent batches are started.
2234  if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) {
2235  gpr_log(GPR_INFO, "chand=%p calld=%p: cancelled from surface: %s", chand_,
2237  }
2238  // Fail any pending batches.
2239  PendingBatchesFail(GRPC_ERROR_REF(cancelled_from_surface_));
2240  // If we have a current call attempt, commit the call, then send
2241  // the cancellation down to that attempt. When the call fails, it
2242  // will not be retried, because we have committed it here.
2243  if (call_attempt_ != nullptr) {
2244  RetryCommit(call_attempt_.get());
2245  // TODO(roth): When implementing hedging, this will get more
2246  // complex, because instead of just passing the batch down to a
2247  // single call attempt, we'll need to cancel multiple call
2248  // attempts and wait for the cancellation on_complete from each call
2249  // attempt before we propagate the on_complete from this batch
2250  // back to the surface.
2251  // Note: This will release the call combiner.
2252  call_attempt_->CancelFromSurface(batch);
2253  return;
2254  }
2255  // Cancel retry timer if needed.
2256  if (retry_timer_pending_) {
2257  if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) {
2258  gpr_log(GPR_INFO, "chand=%p calld=%p: cancelling retry timer", chand_,
2259  this);
2260  }
2261  retry_timer_pending_ = false; // Lame timer callback.
2263  FreeAllCachedSendOpData();
2264  }
2265  // We have no call attempt, so there's nowhere to send the cancellation
2266  // batch. Return it back to the surface immediately.
2267  // Note: This will release the call combiner.
2270  return;
2271  }
2272  // Add the batch to the pending list.
2273  PendingBatch* pending = PendingBatchesAdd(batch);
2274  // If the timer is pending, yield the call combiner and wait for it to
2275  // run, since we don't want to start another call attempt until it does.
2276  if (retry_timer_pending_) {
2278  "added pending batch while retry timer pending");
2279  return;
2280  }
2281  // If we do not yet have a call attempt, create one.
2282  if (call_attempt_ == nullptr) {
2283  // If this is the first batch and retries are already committed
2284  // (e.g., if this batch put the call above the buffer size limit), then
2285  // immediately create an LB call and delegate the batch to it. This
2286  // avoids the overhead of unnecessarily allocating a CallAttempt
2287  // object or caching any of the send op data.
2288  // Note that we would ideally like to do this also on subsequent
2289  // attempts (e.g., if a batch puts the call above the buffer size
2290  // limit since the last attempt was complete), but in practice that's
2291  // not really worthwhile, because we will almost always have cached and
2292  // completed at least the send_initial_metadata op on the previous
2293  // attempt, which means that we'd need special logic to replay the
2294  // batch anyway, which is exactly what the CallAttempt object provides.
2295  // We also skip this optimization if perAttemptRecvTimeout is set in the
2296  // retry policy, because we need the code in CallAttempt to handle
2297  // the associated timer.
2299  (retry_policy_ == nullptr ||
2300  !retry_policy_->per_attempt_recv_timeout().has_value())) {
2301  if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) {
2302  gpr_log(GPR_INFO,
2303  "chand=%p calld=%p: retry committed before first attempt; "
2304  "creating LB call",
2305  chand_, this);
2306  }
2307  PendingBatchClear(pending);
2308  auto* service_config_call_data =
2309  static_cast<ClientChannelServiceConfigCallData*>(
2311  committed_call_ = CreateLoadBalancedCall(
2312  service_config_call_data->call_dispatch_controller(),
2313  /*is_transparent_retry=*/false);
2314  committed_call_->StartTransportStreamOpBatch(batch);
2315  return;
2316  }
2317  // Otherwise, create a call attempt.
2318  // The attempt will automatically start any necessary replays or
2319  // pending batches.
2320  if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) {
2321  gpr_log(GPR_INFO, "chand=%p calld=%p: creating call attempt", chand_,
2322  this);
2323  }
2324  retry_codepath_started_ = true;
2325  CreateCallAttempt(/*is_transparent_retry=*/false);
2326  return;
2327  }
2328  // Send batches to call attempt.
2329  if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) {
2330  gpr_log(GPR_INFO, "chand=%p calld=%p: starting batch on attempt=%p", chand_,
2331  this, call_attempt_.get());
2332  }
2333  call_attempt_->StartRetriableBatches();
2334 }
2335 
2336 OrphanablePtr<ClientChannel::LoadBalancedCall>
2337 RetryFilter::CallData::CreateLoadBalancedCall(
2338  ConfigSelector::CallDispatchController* call_dispatch_controller,
2339  bool is_transparent_retry) {
2341  path_, /*start_time=*/0, deadline_,
2343  return chand_->client_channel_->CreateLoadBalancedCall(
2344  args, pollent_,
2345  // This callback holds a ref to the CallStackDestructionBarrier
2346  // object until the LB call is destroyed.
2347  call_stack_destruction_barrier_->MakeLbCallDestructionClosure(this),
2348  call_dispatch_controller, is_transparent_retry);
2349 }
2350 
2351 void RetryFilter::CallData::CreateCallAttempt(bool is_transparent_retry) {
2352  call_attempt_ = MakeRefCounted<CallAttempt>(this, is_transparent_retry);
2353  call_attempt_->StartRetriableBatches();
2354 }
2355 
2356 //
2357 // send op data caching
2358 //
2359 
2360 void RetryFilter::CallData::MaybeCacheSendOpsForBatch(PendingBatch* pending) {
2361  if (pending->send_ops_cached) return;
2362  pending->send_ops_cached = true;
2364  // Save a copy of metadata for send_initial_metadata ops.
2368  batch->payload->send_initial_metadata.send_initial_metadata;
2371  batch->payload->send_initial_metadata.send_initial_metadata_flags;
2373  }
2374  // Set up cache for send_message ops.
2375  if (batch->send_message) {
2376  SliceBuffer* cache = arena_->New<SliceBuffer>(std::move(
2377  *absl::exchange(batch->payload->send_message.send_message, nullptr)));
2378  send_messages_.push_back({cache, batch->payload->send_message.flags});
2379  }
2380  // Save metadata batch for send_trailing_metadata ops.
2383  grpc_metadata_batch* send_trailing_metadata =
2384  batch->payload->send_trailing_metadata.send_trailing_metadata;
2385  send_trailing_metadata_ = send_trailing_metadata->Copy();
2386  }
2387 }
2388 
2389 void RetryFilter::CallData::FreeCachedSendInitialMetadata() {
2390  if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) {
2391  gpr_log(GPR_INFO, "chand=%p calld=%p: destroying send_initial_metadata",
2392  chand_, this);
2393  }
2395 }
2396 
2397 void RetryFilter::CallData::FreeCachedSendMessage(size_t idx) {
2398  if (send_messages_[idx].slices != nullptr) {
2399  if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) {
2400  gpr_log(GPR_INFO,
2401  "chand=%p calld=%p: destroying send_messages[%" PRIuPTR "]",
2402  chand_, this, idx);
2403  }
2405  }
2406 }
2407 
2408 void RetryFilter::CallData::FreeCachedSendTrailingMetadata() {
2409  if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) {
2410  gpr_log(GPR_INFO, "chand=%p calld=%p: destroying send_trailing_metadata",
2411  chand_, this);
2412  }
2414 }
2415 
2416 void RetryFilter::CallData::FreeAllCachedSendOpData() {
2418  FreeCachedSendInitialMetadata();
2419  }
2420  for (size_t i = 0; i < send_messages_.size(); ++i) {
2421  FreeCachedSendMessage(i);
2422  }
2424  FreeCachedSendTrailingMetadata();
2425  }
2426 }
2427 
2428 //
2429 // pending_batches management
2430 //
2431 
2432 size_t RetryFilter::CallData::GetBatchIndex(
2434  if (batch->send_initial_metadata) return 0;
2435  if (batch->send_message) return 1;
2436  if (batch->send_trailing_metadata) return 2;
2437  if (batch->recv_initial_metadata) return 3;
2438  if (batch->recv_message) return 4;
2439  if (batch->recv_trailing_metadata) return 5;
2440  GPR_UNREACHABLE_CODE(return (size_t)-1);
2441 }
2442 
2443 // This is called via the call combiner, so access to calld is synchronized.
2444 RetryFilter::CallData::PendingBatch* RetryFilter::CallData::PendingBatchesAdd(
2446  const size_t idx = GetBatchIndex(batch);
2447  if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) {
2448  gpr_log(GPR_INFO,
2449  "chand=%p calld=%p: adding pending batch at index %" PRIuPTR,
2450  chand_, this, idx);
2451  }
2452  PendingBatch* pending = &pending_batches_[idx];
2453  GPR_ASSERT(pending->batch == nullptr);
2454  pending->batch = batch;
2455  pending->send_ops_cached = false;
2456  // Update state in calld about pending batches.
2457  // Also check if the batch takes us over the retry buffer limit.
2458  // Note: We don't check the size of trailing metadata here, because
2459  // gRPC clients do not send trailing metadata.
2463  .send_initial_metadata->TransportSize();
2464  }
2465  if (batch->send_message) {
2466  pending_send_message_ = true;
2468  batch->payload->send_message.send_message->Length();
2469  }
2472  }
2473  // TODO(roth): When we implement hedging, if there are currently attempts
2474  // in flight, we will need to pick the one on which the max number of send
2475  // ops have already been sent, and we commit to that attempt.
2477  chand_->per_rpc_retry_buffer_size_)) {
2478  if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) {
2479  gpr_log(GPR_INFO,
2480  "chand=%p calld=%p: exceeded retry buffer size, committing",
2481  chand_, this);
2482  }
2483  RetryCommit(call_attempt_.get());
2484  }
2485  return pending;
2486 }
2487 
2488 void RetryFilter::CallData::PendingBatchClear(PendingBatch* pending) {
2489  if (pending->batch->send_initial_metadata) {
2491  }
2492  if (pending->batch->send_message) {
2493  pending_send_message_ = false;
2494  }
2495  if (pending->batch->send_trailing_metadata) {
2497  }
2498  pending->batch = nullptr;
2499 }
2500 
2501 void RetryFilter::CallData::MaybeClearPendingBatch(PendingBatch* pending) {
2503  // We clear the pending batch if all of its callbacks have been
2504  // scheduled and reset to nullptr.
2505  if (batch->on_complete == nullptr &&
2507  batch->payload->recv_initial_metadata.recv_initial_metadata_ready ==
2508  nullptr) &&
2509  (!batch->recv_message ||
2510  batch->payload->recv_message.recv_message_ready == nullptr) &&
2512  batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready ==
2513  nullptr)) {
2514  if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) {
2515  gpr_log(GPR_INFO, "chand=%p calld=%p: clearing pending batch", chand_,
2516  this);
2517  }
2518  PendingBatchClear(pending);
2519  }
2520 }
2521 
2522 // This is called via the call combiner, so access to calld is synchronized.
2523 void RetryFilter::CallData::FailPendingBatchInCallCombiner(
2524  void* arg, grpc_error_handle error) {
2526  static_cast<grpc_transport_stream_op_batch*>(arg);
2527  CallData* call = static_cast<CallData*>(batch->handler_private.extra_arg);
2528  // Note: This will release the call combiner.
2530  batch, GRPC_ERROR_REF(error), call->call_combiner_);
2531 }
2532 
2533 // This is called via the call combiner, so access to calld is synchronized.
2534 void RetryFilter::CallData::PendingBatchesFail(grpc_error_handle error) {
2536  if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) {
2537  size_t num_batches = 0;
2538  for (size_t i = 0; i < GPR_ARRAY_SIZE(pending_batches_); ++i) {
2539  if (pending_batches_[i].batch != nullptr) ++num_batches;
2540  }
2541  gpr_log(GPR_INFO,
2542  "chand=%p calld=%p: failing %" PRIuPTR " pending batches: %s",
2543  chand_, this, num_batches, grpc_error_std_string(error).c_str());
2544  }
2545  CallCombinerClosureList closures;
2546  for (size_t i = 0; i < GPR_ARRAY_SIZE(pending_batches_); ++i) {
2547  PendingBatch* pending = &pending_batches_[i];
2549  if (batch != nullptr) {
2552  FailPendingBatchInCallCombiner, batch,
2553  grpc_schedule_on_exec_ctx);
2555  "PendingBatchesFail");
2556  PendingBatchClear(pending);
2557  }
2558  }
2559  closures.RunClosuresWithoutYielding(call_combiner_);
2561 }
2562 
2563 template <typename Predicate>
2564 RetryFilter::CallData::PendingBatch* RetryFilter::CallData::PendingBatchFind(
2565  const char* log_message, Predicate predicate) {
2566  for (size_t i = 0; i < GPR_ARRAY_SIZE(pending_batches_); ++i) {
2567  PendingBatch* pending = &pending_batches_[i];
2569  if (batch != nullptr && predicate(batch)) {
2570  if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) {
2571  gpr_log(GPR_INFO,
2572  "chand=%p calld=%p: %s pending batch at index %" PRIuPTR,
2573  chand_, this, log_message, i);
2574  }
2575  return pending;
2576  }
2577  }
2578  return nullptr;
2579 }
2580 
2581 //
2582 // retry code
2583 //
2584 
2585 void RetryFilter::CallData::RetryCommit(CallAttempt* call_attempt) {
2586  if (retry_committed_) return;
2587  retry_committed_ = true;
2588  if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) {
2589  gpr_log(GPR_INFO, "chand=%p calld=%p: committing retries", chand_, this);
2590  }
2591  if (call_attempt != nullptr) {
2592  // If the call attempt's LB call has been committed, inform the call
2593  // dispatch controller that the call has been committed.
2594  // Note: If call_attempt is null, this is happening before the first
2595  // retry attempt is started, in which case we'll just pass the real
2596  // call dispatch controller down into the LB call, and it won't be
2597  // our problem anymore.
2598  if (call_attempt->lb_call_committed()) {
2599  auto* service_config_call_data =
2600  static_cast<ClientChannelServiceConfigCallData*>(
2602  service_config_call_data->call_dispatch_controller()->Commit();
2603  }
2604  // Free cached send ops.
2605  call_attempt->FreeCachedSendOpDataAfterCommit();
2606  }
2607 }
2608 
2609 void RetryFilter::CallData::StartRetryTimer(
2610  absl::optional<Duration> server_pushback) {
2611  // Reset call attempt.
2612  call_attempt_.reset(DEBUG_LOCATION, "StartRetryTimer");
2613  // Compute backoff delay.
2614  Timestamp next_attempt_time;
2615  if (server_pushback.has_value()) {
2616  GPR_ASSERT(*server_pushback >= Duration::Zero());
2617  next_attempt_time = ExecCtx::Get()->Now() + *server_pushback;
2618  retry_backoff_.Reset();
2619  } else {
2620  next_attempt_time = retry_backoff_.NextAttemptTime();
2621  }
2622  if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) {
2623  gpr_log(GPR_INFO,
2624  "chand=%p calld=%p: retrying failed call in %" PRId64 " ms", chand_,
2625  this, (next_attempt_time - ExecCtx::Get()->Now()).millis());
2626  }
2627  // Schedule retry after computed delay.
2628  GRPC_CLOSURE_INIT(&retry_closure_, OnRetryTimer, this, nullptr);
2629  GRPC_CALL_STACK_REF(owning_call_, "OnRetryTimer");
2630  retry_timer_pending_ = true;
2631  grpc_timer_init(&retry_timer_, next_attempt_time, &retry_closure_);
2632 }
2633 
2634 void RetryFilter::CallData::OnRetryTimer(void* arg, grpc_error_handle error) {
2635  auto* calld = static_cast<CallData*>(arg);
2636  GRPC_CLOSURE_INIT(&calld->retry_closure_, OnRetryTimerLocked, calld, nullptr);
2637  GRPC_CALL_COMBINER_START(calld->call_combiner_, &calld->retry_closure_,
2638  GRPC_ERROR_REF(error), "retry timer fired");
2639 }
2640 
2641 void RetryFilter::CallData::OnRetryTimerLocked(void* arg,
2643  auto* calld = static_cast<CallData*>(arg);
2644  if (GRPC_ERROR_IS_NONE(error) && calld->retry_timer_pending_) {
2645  calld->retry_timer_pending_ = false;
2646  calld->CreateCallAttempt(/*is_transparent_retry=*/false);
2647  } else {
2648  GRPC_CALL_COMBINER_STOP(calld->call_combiner_, "retry timer cancelled");
2649  }
2650  GRPC_CALL_STACK_UNREF(calld->owning_call_, "OnRetryTimer");
2651 }
2652 
2653 void RetryFilter::CallData::AddClosureToStartTransparentRetry(
2654  CallCombinerClosureList* closures) {
2655  if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) {
2656  gpr_log(GPR_INFO, "chand=%p calld=%p: scheduling transparent retry", chand_,
2657  this);
2658  }
2659  GRPC_CALL_STACK_REF(owning_call_, "OnRetryTimer");
2660  GRPC_CLOSURE_INIT(&retry_closure_, StartTransparentRetry, this, nullptr);
2661  closures->Add(&retry_closure_, GRPC_ERROR_NONE, "start transparent retry");
2662 }
2663 
2664 void RetryFilter::CallData::StartTransparentRetry(void* arg,
2665  grpc_error_handle /*error*/) {
2666  auto* calld = static_cast<CallData*>(arg);
2667  if (GRPC_ERROR_IS_NONE(calld->cancelled_from_surface_)) {
2668  calld->CreateCallAttempt(/*is_transparent_retry=*/true);
2669  } else {
2670  GRPC_CALL_COMBINER_STOP(calld->call_combiner_,
2671  "call cancelled before transparent retry");
2672  }
2673  GRPC_CALL_STACK_UNREF(calld->owning_call_, "OnRetryTimer");
2674 }
2675 
2676 } // namespace
2677 
2679  RetryFilter::CallData::StartTransportStreamOpBatch,
2680  nullptr,
2682  sizeof(RetryFilter::CallData),
2684  RetryFilter::CallData::SetPollent,
2686  sizeof(RetryFilter),
2691  "retry_filter",
2692 };
2693 
2694 } // namespace grpc_core
GRPC_CLOSURE_INIT
#define GRPC_CLOSURE_INIT(closure, cb, cb_arg, scheduler)
Definition: closure.h:115
seen_send_initial_metadata_
bool seen_send_initial_metadata_
Definition: retry_filter.cc:611
trace.h
grpc_channel_args_find_string
char * grpc_channel_args_find_string(const grpc_channel_args *args, const char *name)
Definition: channel_args.cc:441
MAX_PENDING_BATCHES
#define MAX_PENDING_BATCHES
Definition: client_channel.h:105
call_stack_destruction_barrier_
RefCountedPtr< CallStackDestructionBarrier > call_stack_destruction_barrier_
Definition: retry_filter.cc:577
GPR_INFO
#define GPR_INFO
Definition: include/grpc/impl/codegen/log.h:56
grpc_transport_move_stats
void grpc_transport_move_stats(grpc_transport_stream_stats *from, grpc_transport_stream_stats *to)
Definition: transport.cc:86
abandoned_
bool abandoned_
Definition: retry_filter.cc:511
orphanable.h
GRPC_ERROR_NONE
#define GRPC_ERROR_NONE
Definition: error.h:234
log.h
metadata_batch.h
backoff.h
client_channel_
ClientChannel * client_channel_
Definition: retry_filter.cc:216
grpc_slice_ref_internal
const grpc_slice & grpc_slice_ref_internal(const grpc_slice &slice)
Definition: slice_refcount.h:32
completed_send_message_count_
size_t completed_send_message_count_
Definition: retry_filter.cc:477
completed_send_trailing_metadata_
bool completed_send_trailing_metadata_
Definition: retry_filter.cc:483
recv_trailing_metadata_internal_batch_
RefCountedPtr< BatchData > recv_trailing_metadata_internal_batch_
Definition: retry_filter.cc:504
absl::StrCat
std::string StrCat(const AlphaNum &a, const AlphaNum &b)
Definition: abseil-cpp/absl/strings/str_cat.cc:98
chand_
RetryFilter * chand_
Definition: retry_filter.cc:562
retry_committed_
bool retry_committed_
Definition: retry_filter.cc:601
service_config_parser_index_
const size_t service_config_parser_index_
Definition: retry_filter.cc:219
grpc_core::MetadataMap::get
absl::optional< typename Which::ValueType > get(Which) const
Definition: metadata_batch.h:1067
polling_entity.h
Timestamp
Definition: bloaty/third_party/protobuf/src/google/protobuf/timestamp.pb.h:69
recv_initial_metadata_error_
grpc_error_handle recv_initial_metadata_error_
Definition: retry_filter.cc:491
grpc_transport_stream_op_batch::recv_message
bool recv_message
Definition: transport.h:322
path_
grpc_slice path_
Definition: retry_filter.cc:568
grpc_handler_private_op_data::closure
grpc_closure closure
Definition: transport.h:275
slice.h
false
#define false
Definition: setup_once.h:323
on_complete_deferred_batches_
absl::InlinedVector< OnCompleteDeferredBatch, 3 > on_complete_deferred_batches_
Definition: retry_filter.cc:503
GRPC_ERROR_INT_LB_POLICY_DROP
@ GRPC_ERROR_INT_LB_POLICY_DROP
LB policy drop.
Definition: error.h:97
grpc_core
Definition: call_metric_recorder.h:31
grpc_transport_stream_op_batch::on_complete
grpc_closure * on_complete
Definition: transport.h:304
calld_
CallData * calld_
Definition: retry_filter.cc:447
per_attempt_recv_timer_
grpc_timer per_attempt_recv_timer_
Definition: retry_filter.cc:452
status_util.h
GPR_LIKELY
#define GPR_LIKELY(x)
Definition: impl/codegen/port_platform.h:769
arena_
Arena * arena_
Definition: retry_filter.cc:570
recv_initial_metadata_ready
static void recv_initial_metadata_ready(void *arg, grpc_error_handle error)
Definition: client_load_reporting_filter.cc:71
Arena
Definition: arena.c:39
useful.h
grpc_transport_stream_op_batch_finish_with_failure
void grpc_transport_stream_op_batch_finish_with_failure(grpc_transport_stream_op_batch *batch, grpc_error_handle error, grpc_core::CallCombiner *call_combiner)
Definition: transport.cc:151
grpc_channel_element
Definition: channel_stack.h:186
testing::internal::string
::std::string string
Definition: bloaty/third_party/protobuf/third_party/googletest/googletest/include/gtest/internal/gtest-port.h:881
elem
Timer elem
Definition: event_engine/iomgr_event_engine/timer_heap_test.cc:109
error
grpc_error_handle error
Definition: retry_filter.cc:499
started_recv_trailing_metadata_
bool started_recv_trailing_metadata_
Definition: retry_filter.cc:486
grpc_status_code
grpc_status_code
Definition: include/grpc/impl/codegen/status.h:28
client_channel.h
arena.h
New
T * New(Args &&... args)
Definition: third_party/boringssl-with-bazel/src/ssl/internal.h:195
config_selector.h
closure.h
phony_filter::GetChannelInfo
void GetChannelInfo(grpc_channel_element *, const grpc_channel_info *)
Definition: bm_call_create.cc:391
status
absl::Status status
Definition: rls.cc:251
grpc_handler_private_op_data::extra_arg
void * extra_arg
Definition: transport.h:274
google::protobuf::python::cmessage::Init
static int Init(CMessage *self, PyObject *args, PyObject *kwargs)
Definition: bloaty/third_party/protobuf/python/google/protobuf/pyext/message.cc:1287
GRPC_ARG_PER_RPC_RETRY_BUFFER_SIZE
#define GRPC_ARG_PER_RPC_RETRY_BUFFER_SIZE
Definition: grpc_types.h:407
GRPC_STATUS_CANCELLED
@ GRPC_STATUS_CANCELLED
Definition: include/grpc/impl/codegen/status.h:33
grpc_transport_stream_op_batch_payload::send_initial_metadata
grpc_metadata_batch * send_initial_metadata
Definition: transport.h:346
check_documentation.path
path
Definition: check_documentation.py:57
absl::StripPrefix
ABSL_MUST_USE_RESULT absl::string_view StripPrefix(absl::string_view str, absl::string_view prefix)
Definition: abseil-cpp/absl/strings/strip.h:73
run_xds_tests.server_uri
string server_uri
Definition: run_xds_tests.py:3320
lb_call_
OrphanablePtr< ClientChannel::LoadBalancedCall > lb_call_
Definition: retry_filter.cc:449
grpc_call_element
Definition: channel_stack.h:194
grpc_core::URI::Parse
static absl::StatusOr< URI > Parse(absl::string_view uri_text)
Definition: uri_parser.cc:209
grpc_timer
Definition: iomgr/timer.h:33
call_attempt_
RefCountedPtr< CallAttempt > call_attempt_
Definition: retry_filter.cc:354
recv_message_ready_deferred_batch_
RefCountedPtr< BatchData > recv_message_ready_deferred_batch_
Definition: retry_filter.cc:492
grpc_channel_args
Definition: grpc_types.h:132
GRPC_TRACE_FLAG_ENABLED
#define GRPC_TRACE_FLAG_ENABLED(f)
Definition: debug/trace.h:114
grpc_core::pending
P< T > pending()
Definition: try_join_test.cc:50
send_initial_metadata_
grpc_metadata_batch send_initial_metadata_
Definition: retry_filter.cc:459
GRPC_ARG_CLIENT_CHANNEL
#define GRPC_ARG_CLIENT_CHANNEL
Definition: client_channel.h:92
grpc_transport_op
Definition: transport.h:452
call_context_
grpc_call_context_element * call_context_
Definition: retry_filter.cc:573
arena
grpc_core::ScopedArenaPtr arena
Definition: binder_transport_test.cc:237
call
FilterStackCall * call
Definition: call.cc:750
batch_payload_
grpc_transport_stream_op_batch_payload batch_payload_
Definition: retry_filter.cc:457
status.h
retry
void retry(grpc_end2end_test_config config)
Definition: retry.cc:319
grpc_types.h
absl::synchronization_internal::Get
static GraphId Get(const IdMap &id, int num)
Definition: abseil-cpp/absl/synchronization/internal/graphcycles_test.cc:44
uint32_t
unsigned int uint32_t
Definition: stdint-msvc2008.h:80
retry_codepath_started_
bool retry_codepath_started_
Definition: retry_filter.cc:603
recv_trailing_metadata_error_
grpc_error_handle recv_trailing_metadata_error_
Definition: retry_filter.cc:505
recv_trailing_metadata_
grpc_metadata_batch recv_trailing_metadata_
Definition: retry_filter.cc:471
retry_timer_pending_
bool retry_timer_pending_
Definition: retry_filter.cc:602
grpc_core::MetadataMap::TransportSize
size_t TransportSize() const
Definition: metadata_batch.h:1220
DEBUG_LOCATION
#define DEBUG_LOCATION
Definition: debug_location.h:41
pending_send_message_
bool pending_send_message_
Definition: retry_filter.cc:597
num_attempts_completed_
int num_attempts_completed_
Definition: retry_filter.cc:605
refcount
size_t refcount
Definition: abseil-cpp/absl/strings/internal/cordz_info.cc:122
context.h
GRPC_CALL_STACK_UNREF
#define GRPC_CALL_STACK_UNREF(call_stack, reason)
Definition: channel_stack.h:295
grpc_transport_stream_op_batch::cancel_stream
bool cancel_stream
Definition: transport.h:329
grpc_core::internal::ServerRetryThrottleMap::Get
static ServerRetryThrottleMap * Get()
Definition: retry_throttle.cc:115
grpc_core::MetadataMap::Copy
Derived Copy() const
Definition: metadata_batch.h:1227
grpc_transport_stream_op_batch::handler_private
grpc_handler_private_op_data handler_private
Definition: transport.h:338
asyncio_get_stats.args
args
Definition: asyncio_get_stats.py:40
completed_recv_message_count_
size_t completed_recv_message_count_
Definition: retry_filter.cc:479
GRPC_STATUS_OK
@ GRPC_STATUS_OK
Definition: include/grpc/impl/codegen/status.h:30
retry_filter.h
absl::move
constexpr absl::remove_reference_t< T > && move(T &&t) noexcept
Definition: abseil-cpp/absl/utility/utility.h:221
GPR_ASSERT
#define GPR_ASSERT(x)
Definition: include/grpc/impl/codegen/log.h:94
trailing_metadata_available_
bool trailing_metadata_available_
Definition: retry_filter.cc:465
absl::optional::has_value
constexpr bool has_value() const noexcept
Definition: abseil-cpp/absl/types/optional.h:461
started_recv_initial_metadata_
bool started_recv_initial_metadata_
Definition: retry_filter.cc:484
recv_message_ready
static void recv_message_ready(void *user_data, grpc_error_handle error)
Definition: message_size_filter.cc:209
gen_stats_data.c_str
def c_str(s, encoding='ascii')
Definition: gen_stats_data.py:38
GRPC_ARG_SERVER_URI
#define GRPC_ARG_SERVER_URI
Definition: client_channel.h:89
grpc_channel_stack_no_post_init
void grpc_channel_stack_no_post_init(grpc_channel_stack *, grpc_channel_element *)
Definition: channel_stack.cc:282
channel_stack.h
grpc_call_stack
Definition: channel_stack.h:233
started_send_message_count_
size_t started_send_message_count_
Definition: retry_filter.cc:476
gpr_log
GPRAPI void gpr_log(const char *file, int line, gpr_log_severity severity, const char *format,...) GPR_PRINT_FORMAT_CHECK(4
recv_message_error_
grpc_error_handle recv_message_error_
Definition: retry_filter.cc:493
GPR_UNLIKELY
#define GPR_UNLIKELY(x)
Definition: impl/codegen/port_platform.h:770
send_messages_
absl::InlinedVector< CachedSendMessage, 3 > send_messages_
Definition: retry_filter.cc:634
RETRY_BACKOFF_JITTER
#define RETRY_BACKOFF_JITTER
Definition: retry_filter.cc:128
retry_policy_
const RetryMethodConfig * retry_policy_
Definition: retry_filter.cc:565
grpc_transport_stream_op_batch_payload::cancel_error
grpc_error_handle cancel_error
Definition: transport.h:444
per_rpc_retry_buffer_size_
size_t per_rpc_retry_buffer_size_
Definition: retry_filter.cc:217
call_combiner.h
GRPC_CALL_COMBINER_STOP
#define GRPC_CALL_COMBINER_STOP(call_combiner, reason)
Definition: call_combiner.h:58
phony_filter::StartTransportOp
static void StartTransportOp(grpc_channel_element *, grpc_transport_op *)
Definition: bm_call_create.cc:369
absl::optional
Definition: abseil-cpp/absl/types/internal/optional.h:61
pending_send_initial_metadata_
bool pending_send_initial_metadata_
Definition: retry_filter.cc:596
retry_service_config.h
arg
Definition: cmdline.cc:40
grpc_error_get_int
bool grpc_error_get_int(grpc_error_handle err, grpc_error_ints which, intptr_t *p)
Definition: error.cc:635
grpc_transport_stream_op_batch::payload
grpc_transport_stream_op_batch_payload * payload
Definition: transport.h:307
service_config_call_data.h
grpc_transport_stream_op_batch_payload::send_trailing_metadata
grpc_metadata_batch * send_trailing_metadata
Definition: transport.h:359
time.h
grpc_slice
Definition: include/grpc/impl/codegen/slice.h:65
intptr_t
_W64 signed int intptr_t
Definition: stdint-msvc2008.h:118
per_attempt_recv_timer_pending_
bool per_attempt_recv_timer_pending_
Definition: retry_filter.cc:454
lb_call_committed_
bool lb_call_committed_
Definition: retry_filter.cc:450
error.h
grpc_core::MetadataMap::Clear
void Clear()
Definition: metadata_batch.h:1214
committed_call_
OrphanablePtr< ClientChannel::LoadBalancedCall > committed_call_
Definition: retry_filter.cc:587
grpc_core::SliceBuffer::Length
size_t Length() const
The total number of bytes held by the SliceBuffer.
Definition: src/core/lib/slice/slice_buffer.h:97
grpc_core::internal::ServerRetryThrottleMap::GetDataForServer
RefCountedPtr< ServerRetryThrottleData > GetDataForServer(const std::string &server_name, intptr_t max_milli_tokens, intptr_t milli_token_ratio)
Definition: retry_throttle.cc:120
grpc_polling_entity
Definition: polling_entity.h:38
grpc_call_element_args
Definition: channel_stack.h:80
batch
grpc_transport_stream_op_batch * batch
Definition: retry_filter.cc:243
GPR_UNREACHABLE_CODE
#define GPR_UNREACHABLE_CODE(STATEMENT)
Definition: impl/codegen/port_platform.h:652
GRPC_CALL_COMBINER_START
#define GRPC_CALL_COMBINER_START(call_combiner, closure, error, reason)
Definition: call_combiner.h:56
peer_string_
gpr_atm * peer_string_
Definition: retry_filter.cc:622
cancelled_from_surface_
grpc_error_handle cancelled_from_surface_
Definition: retry_filter.cc:575
DEFAULT_PER_RPC_RETRY_BUFFER_SIZE
#define DEFAULT_PER_RPC_RETRY_BUFFER_SIZE
Definition: retry_filter.cc:124
setup.idx
idx
Definition: third_party/bloaty/third_party/capstone/bindings/python/setup.py:197
grpc_channel_filter
Definition: channel_stack.h:111
GRPC_ERROR_CREATE_FROM_STATIC_STRING
#define GRPC_ERROR_CREATE_FROM_STATIC_STRING(desc)
Definition: error.h:291
grpc_core::kRetryFilterVtable
const grpc_channel_filter kRetryFilterVtable
Definition: retry_filter.cc:2678
started_send_trailing_metadata_
bool started_send_trailing_metadata_
Definition: retry_filter.cc:482
send_initial_metadata_flags_
uint32_t send_initial_metadata_flags_
Definition: retry_filter.cc:613
on_complete_
grpc_closure on_complete_
Definition: retry_filter.cc:359
grpc_call_context_element::value
void * value
Definition: core/lib/channel/context.h:52
retry_throttle.h
retry_throttle_data_
RefCountedPtr< ServerRetryThrottleData > retry_throttle_data_
Definition: retry_filter.cc:218
value
const char * value
Definition: hpack_parser_table.cc:165
grpc_trace_channel
grpc_core::TraceFlag grpc_trace_channel(false, "channel")
grpc_transport_stream_op_batch::recv_initial_metadata
bool recv_initial_metadata
Definition: transport.h:319
retry_closure_
grpc_closure retry_closure_
Definition: retry_filter.cc:607
grpc_transport_stream_op_batch_string
std::string grpc_transport_stream_op_batch_string(grpc_transport_stream_op_batch *op)
Definition: transport_op_string.cc:44
gpr_atm
intptr_t gpr_atm
Definition: impl/codegen/atm_gcc_atomic.h:32
grpc_transport_stream_op_batch_payload
Definition: transport.h:341
grpc_timer_cancel
void grpc_timer_cancel(grpc_timer *timer)
Definition: iomgr/timer.cc:36
completed_recv_trailing_metadata_
bool completed_recv_trailing_metadata_
Definition: retry_filter.cc:487
grpc_core::Destruct
void Destruct(T *p)
Definition: construct_destruct.h:27
absl::Now
ABSL_NAMESPACE_BEGIN Time Now()
Definition: abseil-cpp/absl/time/clock.cc:39
absl::StatusOr::ok
ABSL_MUST_USE_RESULT bool ok() const
Definition: abseil-cpp/absl/status/statusor.h:491
GPR_ARRAY_SIZE
#define GPR_ARRAY_SIZE(array)
Definition: useful.h:129
sent_transparent_retry_not_seen_by_server_
bool sent_transparent_retry_not_seen_by_server_
Definition: retry_filter.cc:604
grpc_transport_stream_op_batch::send_trailing_metadata
bool send_trailing_metadata
Definition: transport.h:313
call_combiner_
CallCombiner * call_combiner_
Definition: retry_filter.cc:572
GRPC_ERROR_REF
#define GRPC_ERROR_REF(err)
Definition: error.h:261
debug_location.h
seen_recv_trailing_metadata_from_surface_
bool seen_recv_trailing_metadata_from_surface_
Definition: retry_filter.cc:506
collect_stats_
grpc_transport_stream_stats collect_stats_
Definition: retry_filter.cc:472
phony_transport::Destroy
void Destroy(grpc_transport *)
Definition: bm_call_create.cc:443
grpc_transport_stream_op_batch::send_message
bool send_message
Definition: transport.h:316
grpc_error_set_int
grpc_error_handle grpc_error_set_int(grpc_error_handle src, grpc_error_ints which, intptr_t value)
Definition: error.cc:613
recv_initial_metadata_ready_
grpc_closure recv_initial_metadata_ready_
Definition: retry_filter.cc:464
ref_counted.h
on_per_attempt_recv_timer_
grpc_closure on_per_attempt_recv_timer_
Definition: retry_filter.cc:453
send_ops_cached
bool send_ops_cached
Definition: retry_filter.cc:245
grpc_transport_stream_stats
Definition: transport.h:249
GRPC_CONTEXT_SERVICE_CONFIG_CALL_DATA
@ GRPC_CONTEXT_SERVICE_CONFIG_CALL_DATA
Holds a pointer to ServiceConfigCallData associated with this call.
Definition: core/lib/channel/context.h:46
grpc_error_std_string
std::string grpc_error_std_string(grpc_error_handle error)
Definition: error.cc:944
retry_backoff_
BackOff retry_backoff_
Definition: retry_filter.cc:566
completed_send_initial_metadata_
bool completed_send_initial_metadata_
Definition: retry_filter.cc:481
std
Definition: grpcpp/impl/codegen/async_unary_call.h:407
grpc_transport_stream_op_batch_payload::send_message
grpc_core::SliceBuffer * send_message
Definition: transport.h:373
grpc_call_context_element
Definition: core/lib/channel/context.h:51
owning_call_
grpc_call_stack * owning_call_
Definition: retry_filter.cc:571
send_trailing_metadata_
grpc_metadata_batch send_trailing_metadata_
Definition: retry_filter.cc:461
grpc_transport_stream_op_batch_payload::recv_initial_metadata
grpc_metadata_batch * recv_initial_metadata
Definition: transport.h:390
recv_trailing_metadata_ready_
grpc_closure recv_trailing_metadata_ready_
Definition: retry_filter.cc:473
slices
SliceBuffer * slices
Definition: retry_filter.cc:631
started_recv_message_count_
size_t started_recv_message_count_
Definition: retry_filter.cc:478
grpc_transport_stream_op_batch::send_initial_metadata
bool send_initial_metadata
Definition: transport.h:310
arg
struct arg arg
recv_message_
absl::optional< SliceBuffer > recv_message_
Definition: retry_filter.cc:468
grpc_transport_stream_op_batch_payload::recv_message
absl::optional< grpc_core::SliceBuffer > * recv_message
Definition: transport.h:416
exec_ctx.h
grpc_timer_init
void grpc_timer_init(grpc_timer *timer, grpc_core::Timestamp deadline, grpc_closure *closure)
Definition: iomgr/timer.cc:31
slice_refcount.h
GRPC_ERROR_UNREF
#define GRPC_ERROR_UNREF(err)
Definition: error.h:262
grpc_core::ExecCtx::Run
static void Run(const DebugLocation &location, grpc_closure *closure, grpc_error_handle error)
Definition: exec_ctx.cc:98
pending_batches_
PendingBatch pending_batches_[MAX_PENDING_BATCHES]
Definition: retry_filter.cc:595
ref_counted_ptr.h
config_s
Definition: bloaty/third_party/zlib/deflate.c:120
transport.h
grpc_core::internal::RetryServiceConfigParser::ParserIndex
static size_t ParserIndex()
Definition: retry_service_config.cc:48
pollent_
grpc_polling_entity * pollent_
Definition: retry_filter.cc:563
completed_recv_initial_metadata_
bool completed_recv_initial_metadata_
Definition: retry_filter.cc:485
channel_args.h
timer.h
grpc_transport_stream_op_batch::recv_trailing_metadata
bool recv_trailing_metadata
Definition: transport.h:326
pending_send_trailing_metadata_
bool pending_send_trailing_metadata_
Definition: retry_filter.cc:598
service_config.h
GRPC_ARG_SERVICE_CONFIG_OBJ
#define GRPC_ARG_SERVICE_CONFIG_OBJ
Definition: client_channel.h:95
grpc_error_get_status
void grpc_error_get_status(grpc_error_handle error, grpc_core::Timestamp deadline, grpc_status_code *code, std::string *message, grpc_http2_error_code *http_error, const char **error_string)
Definition: error_utils.cc:67
grpc_channel_element_args
Definition: channel_stack.h:74
context
grpc::ClientContext context
Definition: istio_echo_server_lib.cc:61
Duration
Definition: bloaty/third_party/protobuf/src/google/protobuf/duration.pb.h:69
seen_send_trailing_metadata_
bool seen_send_trailing_metadata_
Definition: retry_filter.cc:636
flags
uint32_t flags
Definition: retry_filter.cc:632
atm.h
grpc_call_final_info
Definition: channel_stack.h:95
grpc_transport_stream_op_batch_payload::recv_trailing_metadata
grpc_metadata_batch * recv_trailing_metadata
Definition: transport.h:425
absl::StatusOr
Definition: abseil-cpp/absl/status/statusor.h:187
uri_parser.h
bytes_buffered_for_retry_
size_t bytes_buffered_for_retry_
Definition: retry_filter.cc:594
absl::InlinedVector
Definition: abseil-cpp/absl/container/inlined_vector.h:69
grpc_error
Definition: error_internal.h:42
recv_initial_metadata_ready_deferred_batch_
RefCountedPtr< BatchData > recv_initial_metadata_ready_deferred_batch_
Definition: retry_filter.cc:490
batch_
grpc_transport_stream_op_batch batch_
Definition: retry_filter.cc:357
recv_initial_metadata_
grpc_metadata_batch recv_initial_metadata_
Definition: retry_filter.cc:463
attempt_dispatch_controller_
AttemptDispatchController attempt_dispatch_controller_
Definition: retry_filter.cc:448
GRPC_CALL_STACK_REF
#define GRPC_CALL_STACK_REF(call_stack, reason)
Definition: channel_stack.h:293
recv_message_flags_
uint32_t recv_message_flags_
Definition: retry_filter.cc:469
grpc_channel_args_find_integer
int grpc_channel_args_find_integer(const grpc_channel_args *args, const char *name, const grpc_integer_options options)
Definition: channel_args.cc:425
testing::Ref
internal::RefMatcher< T & > Ref(T &x)
Definition: cares/cares/test/gmock-1.8.0/gmock/gmock.h:8628
started_send_initial_metadata_
bool started_send_initial_metadata_
Definition: retry_filter.cc:480
grpc_metadata_batch
Definition: metadata_batch.h:1259
on_call_stack_destruction_
grpc_closure * on_call_stack_destruction_
Definition: retry_filter.cc:690
construct_destruct.h
grpc_channel_info
Definition: grpc_types.h:720
grpc_transport_stream_op_batch
Definition: transport.h:284
recv_message_ready_
grpc_closure recv_message_ready_
Definition: retry_filter.cc:467
grpc_closure
Definition: closure.h:56
slice_buffer.h
send_initial_metadata
static void send_initial_metadata(void)
Definition: test/core/fling/server.cc:121
deadline_
Timestamp deadline_
Definition: retry_filter.cc:569
grpc_status_code_to_string
const char * grpc_status_code_to_string(grpc_status_code status)
Returns the string form of status, or "UNKNOWN" if invalid.
Definition: status_util.cc:62
sent_cancel_stream_
bool sent_cancel_stream_
Definition: retry_filter.cc:488
i
uint64_t i
Definition: abseil-cpp/absl/container/btree_benchmark.cc:230
GRPC_ERROR_INT_GRPC_STATUS
@ GRPC_ERROR_INT_GRPC_STATUS
grpc status code representing this error
Definition: error.h:66
absl::exchange
T exchange(T &obj, U &&new_value)
Definition: abseil-cpp/absl/utility/utility.h:314
grpc_slice_unref_internal
void grpc_slice_unref_internal(const grpc_slice &slice)
Definition: slice_refcount.h:39
error_utils.h
grpc_transport_stream_op_batch_payload::cancel_stream
struct grpc_transport_stream_op_batch_payload::@46 cancel_stream
retry_timer_
grpc_timer retry_timer_
Definition: retry_filter.cc:606
GRPC_ERROR_IS_NONE
#define GRPC_ERROR_IS_NONE(err)
Definition: error.h:241
run_interop_tests.server_name
server_name
Definition: run_interop_tests.py:1510
port_platform.h


grpc
Author(s):
autogenerated on Fri May 16 2025 03:00:05