connection_delay_injector.h
Go to the documentation of this file.
1 // Copyright 2016 gRPC authors.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #ifndef GRPC_TEST_CPP_END2END_CONNECTION_DELAY_INJECTOR_H
16 #define GRPC_TEST_CPP_END2END_CONNECTION_DELAY_INJECTOR_H
17 
18 #include <memory>
19 
24 
25 namespace grpc {
26 namespace testing {
27 
28 // Allows injecting connection-establishment delays into C-core.
29 // Typical usage:
30 //
31 // // At grpc_init() time.
32 // ConnectionAttemptInjector::Init();
33 //
34 // // When an injection is desired.
35 // ConnectionDelayInjector delay_injector(grpc_core::Duration::Seconds(10));
36 // delay_injector.Start();
37 //
38 // The injection is global, so there must be only one ConnectionAttemptInjector
39 // object at any one time.
41  public:
42  // Global initializer. Replaces the iomgr TCP client vtable.
43  // Must be called exactly once after grpc_init() but before any TCP
44  // connections are established.
45  static void Init();
46 
48 
49  // Must be called after instantiation.
50  void Start();
51 
52  // Invoked for every TCP connection attempt.
53  // Implementations must eventually either invoke the closure
54  // themselves or delegate to the iomgr implementation by calling
55  // AttemptConnection(). QueuedAttempt may be used to queue an attempt
56  // for asynchronous processing.
58  grpc_pollset_set* interested_parties,
59  const grpc_channel_args* channel_args,
61  grpc_core::Timestamp deadline) = 0;
62 
63  protected:
64  // Represents a queued attempt.
65  // The caller must invoke either Resume() or Fail() before destroying.
66  class QueuedAttempt {
67  public:
69  grpc_pollset_set* interested_parties,
70  const grpc_channel_args* channel_args,
72  grpc_core::Timestamp deadline)
73  : closure_(closure),
74  endpoint_(ep),
75  interested_parties_(interested_parties),
77  deadline_(deadline) {
78  memcpy(&address_, addr, sizeof(address_));
79  }
80 
82  GPR_ASSERT(closure_ == nullptr);
84  }
85 
86  // Caller must invoke this from a thread with an ExecCtx.
87  void Resume() {
88  GPR_ASSERT(closure_ != nullptr);
91  closure_ = nullptr;
92  }
93 
94  // Caller must invoke this from a thread with an ExecCtx.
96  GPR_ASSERT(closure_ != nullptr);
98  closure_ = nullptr;
99  }
100 
101  private:
108  };
109 
110  // Injects a delay before continuing a connection attempt.
112  public:
113  virtual ~InjectedDelay() = default;
114 
116  grpc_endpoint** ep, grpc_pollset_set* interested_parties,
117  const grpc_channel_args* channel_args,
119  grpc_core::Timestamp deadline);
120 
121  private:
122  // Subclasses can override to perform an action when the attempt resumes.
123  virtual void BeforeResumingAction() {}
124 
125  static void TimerCallback(void* arg, grpc_error_handle /*error*/);
126 
130  };
131 
133  grpc_pollset_set* interested_parties,
134  const grpc_channel_args* channel_args,
136  grpc_core::Timestamp deadline);
137 };
138 
139 // A concrete implementation that injects a fixed delay.
141  public:
143  : duration_(duration) {}
144 
146  grpc_pollset_set* interested_parties,
147  const grpc_channel_args* channel_args,
149  grpc_core::Timestamp deadline) override;
150 
151  private:
153 };
154 
155 // A concrete implementation that allows injecting holds for individual
156 // connection attemps, one at a time.
158  private:
159  grpc_core::Mutex mu_; // Needs to be declared up front.
160 
161  public:
162  class Hold {
163  public:
164  // Do not instantiate directly -- must be created via AddHold().
165  Hold(ConnectionHoldInjector* injector, int port, bool intercept_completion);
166 
167  // Waits for the connection attempt to start.
168  // After this returns, exactly one of Resume() or Fail() must be called.
169  void Wait();
170 
171  // Resumes a connection attempt. Must be called after Wait().
172  void Resume();
173 
174  // Fails a connection attempt. Must be called after Wait().
176 
177  // If the hold was created with intercept_completion=true, then this
178  // can be called after Resume() to wait for the connection attempt
179  // to complete.
180  void WaitForCompletion();
181 
182  // Returns true if the connection attempt has been started.
183  bool IsStarted();
184 
185  private:
187 
188  static void OnComplete(void* arg, grpc_error_handle error);
189 
191  const int port_;
193  std::unique_ptr<QueuedAttempt> queued_attempt_
199  };
200 
201  // Adds a hold for a given port. The caller may then use Wait() on
202  // the resulting Hold object to wait for the connection attempt to start.
203  // If intercept_completion is true, the caller can use WaitForCompletion()
204  // on the resulting Hold object.
205  std::unique_ptr<Hold> AddHold(int port, bool intercept_completion = false);
206 
207  private:
209  grpc_pollset_set* interested_parties,
210  const grpc_channel_args* channel_args,
212  grpc_core::Timestamp deadline) override;
213 
214  std::vector<Hold*> holds_;
215 };
216 
217 } // namespace testing
218 } // namespace grpc
219 
220 #endif // GRPC_TEST_CPP_END2END_CONNECTION_DELAY_INJECTOR_H
grpc::testing::ConnectionHoldInjector::Hold::Resume
void Resume()
Definition: connection_delay_injector.cc:152
testing
Definition: aws_request_signer_test.cc:25
grpc::testing::ConnectionHoldInjector::mu_
grpc_core::Mutex mu_
Definition: connection_delay_injector.h:159
grpc_core::CondVar
Definition: src/core/lib/gprpp/sync.h:126
grpc
Definition: grpcpp/alarm.h:33
grpc::testing::ConnectionHoldInjector::HandleConnection
void HandleConnection(grpc_closure *closure, grpc_endpoint **ep, grpc_pollset_set *interested_parties, const grpc_channel_args *channel_args, const grpc_resolved_address *addr, grpc_core::Timestamp deadline) override
Definition: connection_delay_injector.cc:214
grpc::testing::ConnectionAttemptInjector::Init
static void Init()
Definition: connection_delay_injector.cc:70
grpc::testing::ConnectionAttemptInjector::QueuedAttempt
Definition: connection_delay_injector.h:66
grpc_pollset_set
struct grpc_pollset_set grpc_pollset_set
Definition: iomgr_fwd.h:23
grpc::testing::ConnectionAttemptInjector::QueuedAttempt::closure_
grpc_closure * closure_
Definition: connection_delay_injector.h:102
grpc::testing::ConnectionDelayInjector::duration_
grpc_core::Duration duration_
Definition: connection_delay_injector.h:152
grpc::testing::ConnectionHoldInjector::Hold::complete_cv_
grpc_core::CondVar complete_cv_
Definition: connection_delay_injector.h:198
grpc::testing::ConnectionAttemptInjector::QueuedAttempt::~QueuedAttempt
~QueuedAttempt()
Definition: connection_delay_injector.h:81
grpc_core::Timestamp
Definition: src/core/lib/gprpp/time.h:62
grpc::testing::ConnectionAttemptInjector::QueuedAttempt::channel_args_
const grpc_channel_args * channel_args_
Definition: connection_delay_injector.h:105
grpc::testing::ConnectionAttemptInjector::InjectedDelay::InjectedDelay
InjectedDelay(grpc_core::Duration duration, grpc_closure *closure, grpc_endpoint **ep, grpc_pollset_set *interested_parties, const grpc_channel_args *channel_args, const grpc_resolved_address *addr, grpc_core::Timestamp deadline)
Definition: connection_delay_injector.cc:102
error
grpc_error_handle error
Definition: retry_filter.cc:499
grpc::testing::ConnectionHoldInjector::Hold::injector_
ConnectionHoldInjector * injector_
Definition: connection_delay_injector.h:190
grpc::testing::ConnectionHoldInjector::Hold::WaitForCompletion
void WaitForCompletion()
Definition: connection_delay_injector.cc:174
grpc_resolved_address
Definition: resolved_address.h:34
grpc::testing::ConnectionHoldInjector
Definition: connection_delay_injector.h:157
ABSL_GUARDED_BY
#define ABSL_GUARDED_BY(x)
Definition: abseil-cpp/absl/base/thread_annotations.h:62
grpc::testing::ConnectionAttemptInjector::QueuedAttempt::QueuedAttempt
QueuedAttempt(grpc_closure *closure, grpc_endpoint **ep, grpc_pollset_set *interested_parties, const grpc_channel_args *channel_args, const grpc_resolved_address *addr, grpc_core::Timestamp deadline)
Definition: connection_delay_injector.h:68
grpc::testing::ConnectionAttemptInjector::InjectedDelay::attempt_
QueuedAttempt attempt_
Definition: connection_delay_injector.h:127
grpc_timer
Definition: iomgr/timer.h:33
grpc::testing::ConnectionDelayInjector::HandleConnection
void HandleConnection(grpc_closure *closure, grpc_endpoint **ep, grpc_pollset_set *interested_parties, const grpc_channel_args *channel_args, const grpc_resolved_address *addr, grpc_core::Timestamp deadline) override
Definition: connection_delay_injector.cc:125
grpc_channel_args
Definition: grpc_types.h:132
grpc::testing::ConnectionAttemptInjector::InjectedDelay::~InjectedDelay
virtual ~InjectedDelay()=default
grpc::testing::ConnectionHoldInjector::Hold::port_
const int port_
Definition: connection_delay_injector.h:191
grpc::testing::ConnectionAttemptInjector::InjectedDelay::TimerCallback
static void TimerCallback(void *arg, grpc_error_handle)
Definition: connection_delay_injector.cc:113
DEBUG_LOCATION
#define DEBUG_LOCATION
Definition: debug_location.h:41
memcpy
memcpy(mem, inblock.get(), min(CONTAINING_RECORD(inblock.get(), MEMBLOCK, data) ->size, size))
grpc::testing::ConnectionAttemptInjector::InjectedDelay::timer_callback_
grpc_closure timer_callback_
Definition: connection_delay_injector.h:129
grpc::testing::ConnectionHoldInjector::Hold::Hold
Hold(ConnectionHoldInjector *injector, int port, bool intercept_completion)
Definition: connection_delay_injector.cc:137
GPR_ASSERT
#define GPR_ASSERT(x)
Definition: include/grpc/impl/codegen/log.h:94
grpc::testing::ConnectionAttemptInjector::QueuedAttempt::address_
grpc_resolved_address address_
Definition: connection_delay_injector.h:106
grpc::testing::ConnectionDelayInjector::ConnectionDelayInjector
ConnectionDelayInjector(grpc_core::Duration duration)
Definition: connection_delay_injector.h:142
grpc::testing::ConnectionAttemptInjector::InjectedDelay::timer_
grpc_timer timer_
Definition: connection_delay_injector.h:128
grpc::testing::ConnectionAttemptInjector::QueuedAttempt::endpoint_
grpc_endpoint ** endpoint_
Definition: connection_delay_injector.h:103
grpc::testing::ConnectionHoldInjector::Hold::Wait
void Wait()
Definition: connection_delay_injector.cc:143
grpc::testing::ConnectionAttemptInjector::QueuedAttempt::Fail
void Fail(grpc_error_handle error)
Definition: connection_delay_injector.h:95
grpc_channel_args_destroy
void grpc_channel_args_destroy(grpc_channel_args *a)
Definition: channel_args.cc:360
arg
Definition: cmdline.cc:40
grpc::testing::ConnectionAttemptInjector::QueuedAttempt::deadline_
grpc_core::Timestamp deadline_
Definition: connection_delay_injector.h:107
time.h
grpc_channel_args_copy
grpc_channel_args * grpc_channel_args_copy(const grpc_channel_args *src)
Definition: channel_args.cc:285
grpc::testing::ConnectionAttemptInjector::QueuedAttempt::Resume
void Resume()
Definition: connection_delay_injector.h:87
grpc::testing::ConnectionAttemptInjector::~ConnectionAttemptInjector
virtual ~ConnectionAttemptInjector()
Definition: connection_delay_injector.cc:76
grpc::testing::ConnectionAttemptInjector::InjectedDelay::BeforeResumingAction
virtual void BeforeResumingAction()
Definition: connection_delay_injector.h:123
grpc::testing::ConnectionHoldInjector::Hold::IsStarted
bool IsStarted()
Definition: connection_delay_injector.cc:184
grpc::testing::ConnectionAttemptInjector::AttemptConnection
static void AttemptConnection(grpc_closure *closure, grpc_endpoint **ep, grpc_pollset_set *interested_parties, const grpc_channel_args *channel_args, const grpc_resolved_address *addr, grpc_core::Timestamp deadline)
Definition: connection_delay_injector.cc:90
tcp_client.h
grpc::testing::ConnectionAttemptInjector
Definition: connection_delay_injector.h:40
grpc::testing::ConnectionAttemptInjector::HandleConnection
virtual void HandleConnection(grpc_closure *closure, grpc_endpoint **ep, grpc_pollset_set *interested_parties, const grpc_channel_args *channel_args, const grpc_resolved_address *addr, grpc_core::Timestamp deadline)=0
tests.unit._exit_scenarios.port
port
Definition: _exit_scenarios.py:179
grpc::testing::ConnectionHoldInjector::Hold
Definition: connection_delay_injector.h:162
grpc::testing::ConnectionAttemptInjector::InjectedDelay
Definition: connection_delay_injector.h:111
grpc::testing::ConnectionHoldInjector::Hold::start_cv_
grpc_core::CondVar start_cv_
Definition: connection_delay_injector.h:195
grpc_core::Mutex
Definition: src/core/lib/gprpp/sync.h:61
grpc::testing::ConnectionAttemptInjector::Start
void Start()
Definition: connection_delay_injector.cc:81
grpc::testing::ConnectionHoldInjector::Hold::original_on_complete_
grpc_closure * original_on_complete_
Definition: connection_delay_injector.h:197
grpc::testing::ConnectionAttemptInjector::QueuedAttempt::interested_parties_
grpc_pollset_set * interested_parties_
Definition: connection_delay_injector.h:104
closure
Definition: proxy.cc:59
grpc_core::ExecCtx::Run
static void Run(const DebugLocation &location, grpc_closure *closure, grpc_error_handle error)
Definition: exec_ctx.cc:98
channel_args.h
grpc::testing::ConnectionHoldInjector::Hold::intercept_completion_
const bool intercept_completion_
Definition: connection_delay_injector.h:192
timer.h
grpc::testing::ConnectionHoldInjector::holds_
std::vector< Hold * > holds_
Definition: connection_delay_injector.h:214
grpc::testing::ConnectionHoldInjector::Hold::Fail
void Fail(grpc_error_handle error)
Definition: connection_delay_injector.cc:163
grpc::testing::ConnectionHoldInjector::Hold::on_complete_
grpc_closure on_complete_
Definition: connection_delay_injector.h:196
grpc_error
Definition: error_internal.h:42
grpc_core::Duration
Definition: src/core/lib/gprpp/time.h:122
grpc::testing::ConnectionHoldInjector::AddHold
std::unique_ptr< Hold > AddHold(int port, bool intercept_completion=false)
Definition: connection_delay_injector.cc:206
grpc::testing::ConnectionHoldInjector::Hold::OnComplete
static void OnComplete(void *arg, grpc_error_handle error)
Definition: connection_delay_injector.cc:189
grpc_closure
Definition: closure.h:56
grpc_endpoint
Definition: endpoint.h:105
addr
struct sockaddr_in addr
Definition: libuv/docs/code/tcp-echo-server/main.c:10
grpc::testing::ConnectionDelayInjector
Definition: connection_delay_injector.h:140


grpc
Author(s):
autogenerated on Thu Mar 13 2025 02:58:54