connection_delay_injector.cc
Go to the documentation of this file.
1 // Copyright 2016 gRPC authors.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
16 
17 #include <memory>
18 
19 #include "absl/memory/memory.h"
20 #include "absl/utility/utility.h"
21 
24 
25 // defined in tcp_client.cc
27 
28 namespace grpc {
29 namespace testing {
30 
31 //
32 // ConnectionAttemptInjector
33 //
34 
35 namespace {
36 
37 grpc_tcp_client_vtable* g_original_vtable = nullptr;
38 
39 grpc_core::Mutex* g_mu = nullptr;
40 ConnectionAttemptInjector* g_injector ABSL_GUARDED_BY(*g_mu) = nullptr;
41 
42 int64_t TcpConnectWithDelay(grpc_closure* closure, grpc_endpoint** ep,
43  grpc_pollset_set* interested_parties,
44  const grpc_channel_args* channel_args,
46  grpc_core::Timestamp deadline) {
48  if (g_injector == nullptr) {
49  g_original_vtable->connect(closure, ep, interested_parties, channel_args,
50  addr, deadline);
51  return 0;
52  }
53  g_injector->HandleConnection(closure, ep, interested_parties, channel_args,
54  addr, deadline);
55  return 0;
56 }
57 
58 // TODO(vigneshbabu): This method should check whether the connect attempt has
59 // actually been started, and if so, it should call
60 // g_original_vtable->cancel_connect(). If the attempt has not actually been
61 // started, it should mark the connect request as cancelled, so that when the
62 // request is resumed, it will not actually proceed.
63 bool TcpConnectCancel(int64_t /*connection_handle*/) { return false; }
64 
65 grpc_tcp_client_vtable kDelayedConnectVTable = {TcpConnectWithDelay,
66  TcpConnectCancel};
67 
68 } // namespace
69 
71  g_mu = new grpc_core::Mutex();
72  g_original_vtable = grpc_tcp_client_impl;
73  grpc_tcp_client_impl = &kDelayedConnectVTable;
74 }
75 
78  g_injector = nullptr;
79 }
80 
82  // Fail if ConnectionAttemptInjector::Init() was not called after
83  // grpc_init() to inject the vtable.
84  GPR_ASSERT(grpc_tcp_client_impl == &kDelayedConnectVTable);
86  GPR_ASSERT(g_injector == nullptr);
87  g_injector = this;
88 }
89 
92  grpc_pollset_set* interested_parties, const grpc_channel_args* channel_args,
94  g_original_vtable->connect(closure, ep, interested_parties, channel_args,
95  addr, deadline);
96 }
97 
98 //
99 // ConnectionAttemptInjector::InjectedDelay
100 //
101 
104  grpc_pollset_set* interested_parties, const grpc_channel_args* channel_args,
106  : attempt_(closure, ep, interested_parties, channel_args, addr, deadline) {
109  duration = std::min(duration, deadline - now);
110  grpc_timer_init(&timer_, now + duration, &timer_callback_);
111 }
112 
114  void* arg, grpc_error_handle /*error*/) {
115  auto* self = static_cast<InjectedDelay*>(arg);
116  self->BeforeResumingAction();
117  self->attempt_.Resume();
118  delete self;
119 }
120 
121 //
122 // ConnectionDelayInjector
123 //
124 
127  grpc_pollset_set* interested_parties, const grpc_channel_args* channel_args,
129  new InjectedDelay(duration_, closure, ep, interested_parties, channel_args,
130  addr, deadline);
131 }
132 
133 //
134 // ConnectionHoldInjector::Hold
135 //
136 
138  bool intercept_completion)
139  : injector_(injector),
140  port_(port),
141  intercept_completion_(intercept_completion) {}
142 
144  gpr_log(GPR_INFO, "=== WAITING FOR CONNECTION ATTEMPT ON PORT %d ===", port_);
145  grpc_core::MutexLock lock(&injector_->mu_);
146  while (queued_attempt_ == nullptr) {
147  start_cv_.Wait(&injector_->mu_);
148  }
149  gpr_log(GPR_INFO, "=== CONNECTION ATTEMPT STARTED ON PORT %d ===", port_);
150 }
151 
153  gpr_log(GPR_INFO, "=== RESUMING CONNECTION ATTEMPT ON PORT %d ===", port_);
155  std::unique_ptr<QueuedAttempt> attempt;
156  {
157  grpc_core::MutexLock lock(&injector_->mu_);
158  attempt = std::move(queued_attempt_);
159  }
160  attempt->Resume();
161 }
162 
164  gpr_log(GPR_INFO, "=== FAILING CONNECTION ATTEMPT ON PORT %d ===", port_);
166  std::unique_ptr<QueuedAttempt> attempt;
167  {
168  grpc_core::MutexLock lock(&injector_->mu_);
169  attempt = std::move(queued_attempt_);
170  }
171  attempt->Fail(error);
172 }
173 
176  "=== WAITING FOR CONNECTION COMPLETION ON PORT %d ===", port_);
177  grpc_core::MutexLock lock(&injector_->mu_);
178  while (original_on_complete_ != nullptr) {
179  complete_cv_.Wait(&injector_->mu_);
180  }
181  gpr_log(GPR_INFO, "=== CONNECTION COMPLETED ON PORT %d ===", port_);
182 }
183 
185  grpc_core::MutexLock lock(&injector_->mu_);
186  return !start_cv_.WaitWithDeadline(&injector_->mu_, absl::Now());
187 }
188 
191  auto* self = static_cast<Hold*>(arg);
192  grpc_closure* on_complete;
193  {
194  grpc_core::MutexLock lock(&self->injector_->mu_);
195  on_complete = self->original_on_complete_;
196  self->original_on_complete_ = nullptr;
197  self->complete_cv_.Signal();
198  }
200 }
201 
202 //
203 // ConnectionHoldInjector
204 //
205 
206 std::unique_ptr<ConnectionHoldInjector::Hold> ConnectionHoldInjector::AddHold(
207  int port, bool intercept_completion) {
208  grpc_core::MutexLock lock(&mu_);
209  auto hold = absl::make_unique<Hold>(this, port, intercept_completion);
210  holds_.push_back(hold.get());
211  return hold;
212 }
213 
216  grpc_pollset_set* interested_parties, const grpc_channel_args* channel_args,
218  const int port = grpc_sockaddr_get_port(addr);
219  gpr_log(GPR_INFO, "==> HandleConnection(): port=%d", port);
220  {
221  grpc_core::MutexLock lock(&mu_);
222  for (auto it = holds_.begin(); it != holds_.end(); ++it) {
223  Hold* hold = *it;
224  if (port == hold->port_) {
225  gpr_log(GPR_INFO, "*** INTERCEPTING CONNECTION ATTEMPT");
226  if (hold->intercept_completion_) {
229  hold, nullptr);
230  }
231  hold->queued_attempt_ = absl::make_unique<QueuedAttempt>(
232  closure, ep, interested_parties, channel_args, addr, deadline);
233  hold->start_cv_.Signal();
234  holds_.erase(it);
235  return;
236  }
237  }
238  }
239  // Anything we're not holding should proceed normally.
240  AttemptConnection(closure, ep, interested_parties, channel_args, addr,
241  deadline);
242 }
243 
244 } // namespace testing
245 } // namespace grpc
GRPC_CLOSURE_INIT
#define GRPC_CLOSURE_INIT(closure, cb, cb_arg, scheduler)
Definition: closure.h:115
grpc::testing::ConnectionHoldInjector::Hold::Resume
void Resume()
Definition: connection_delay_injector.cc:152
connection_delay_injector.h
GPR_INFO
#define GPR_INFO
Definition: include/grpc/impl/codegen/log.h:56
testing
Definition: aws_request_signer_test.cc:25
now
static double now(void)
Definition: test/core/fling/client.cc:130
grpc::testing::ConnectionHoldInjector::mu_
grpc_core::Mutex mu_
Definition: connection_delay_injector.h:159
regen-readme.it
it
Definition: regen-readme.py:15
sockaddr_utils.h
grpc
Definition: grpcpp/alarm.h:33
grpc::testing::ConnectionHoldInjector::HandleConnection
void HandleConnection(grpc_closure *closure, grpc_endpoint **ep, grpc_pollset_set *interested_parties, const grpc_channel_args *channel_args, const grpc_resolved_address *addr, grpc_core::Timestamp deadline) override
Definition: connection_delay_injector.cc:214
grpc::testing::ConnectionAttemptInjector::Init
static void Init()
Definition: connection_delay_injector.cc:70
grpc_core::MutexLock
Definition: src/core/lib/gprpp/sync.h:88
grpc_pollset_set
struct grpc_pollset_set grpc_pollset_set
Definition: iomgr_fwd.h:23
grpc_core::Timestamp
Definition: src/core/lib/gprpp/time.h:62
grpc_tcp_client_impl
grpc_tcp_client_vtable * grpc_tcp_client_impl
Definition: tcp_client.cc:23
grpc::testing::ConnectionAttemptInjector::InjectedDelay::InjectedDelay
InjectedDelay(grpc_core::Duration duration, grpc_closure *closure, grpc_endpoint **ep, grpc_pollset_set *interested_parties, const grpc_channel_args *channel_args, const grpc_resolved_address *addr, grpc_core::Timestamp deadline)
Definition: connection_delay_injector.cc:102
error
grpc_error_handle error
Definition: retry_filter.cc:499
grpc::testing::ConnectionHoldInjector::Hold::WaitForCompletion
void WaitForCompletion()
Definition: connection_delay_injector.cc:174
grpc_resolved_address
Definition: resolved_address.h:34
grpc::testing::ConnectionHoldInjector
Definition: connection_delay_injector.h:157
ABSL_GUARDED_BY
#define ABSL_GUARDED_BY(x)
Definition: abseil-cpp/absl/base/thread_annotations.h:62
grpc::testing::ConnectionDelayInjector::HandleConnection
void HandleConnection(grpc_closure *closure, grpc_endpoint **ep, grpc_pollset_set *interested_parties, const grpc_channel_args *channel_args, const grpc_resolved_address *addr, grpc_core::Timestamp deadline) override
Definition: connection_delay_injector.cc:125
grpc_channel_args
Definition: grpc_types.h:132
grpc::testing::ConnectionHoldInjector::Hold::port_
const int port_
Definition: connection_delay_injector.h:191
grpc::testing::ConnectionAttemptInjector::InjectedDelay::TimerCallback
static void TimerCallback(void *arg, grpc_error_handle)
Definition: connection_delay_injector.cc:113
DEBUG_LOCATION
#define DEBUG_LOCATION
Definition: debug_location.h:41
grpc::testing::ConnectionAttemptInjector::InjectedDelay::timer_callback_
grpc_closure timer_callback_
Definition: connection_delay_injector.h:129
grpc::testing::ConnectionHoldInjector::Hold::Hold
Hold(ConnectionHoldInjector *injector, int port, bool intercept_completion)
Definition: connection_delay_injector.cc:137
absl::move
constexpr absl::remove_reference_t< T > && move(T &&t) noexcept
Definition: abseil-cpp/absl/utility/utility.h:221
GPR_ASSERT
#define GPR_ASSERT(x)
Definition: include/grpc/impl/codegen/log.h:94
int64_t
signed __int64 int64_t
Definition: stdint-msvc2008.h:89
grpc::testing::ConnectionAttemptInjector::InjectedDelay::timer_
grpc_timer timer_
Definition: connection_delay_injector.h:128
gpr_log
GPRAPI void gpr_log(const char *file, int line, gpr_log_severity severity, const char *format,...) GPR_PRINT_FORMAT_CHECK(4
closure
grpc_closure closure
Definition: src/core/lib/surface/server.cc:466
grpc_core::CondVar::Signal
void Signal()
Definition: src/core/lib/gprpp/sync.h:134
grpc::testing::ConnectionHoldInjector::Hold::Wait
void Wait()
Definition: connection_delay_injector.cc:143
arg
Definition: cmdline.cc:40
grpc::testing::ConnectionAttemptInjector::~ConnectionAttemptInjector
virtual ~ConnectionAttemptInjector()
Definition: connection_delay_injector.cc:76
grpc::testing::ConnectionAttemptInjector::InjectedDelay::BeforeResumingAction
virtual void BeforeResumingAction()
Definition: connection_delay_injector.h:123
min
#define min(a, b)
Definition: qsort.h:83
grpc::testing::ConnectionHoldInjector::Hold::IsStarted
bool IsStarted()
Definition: connection_delay_injector.cc:184
grpc::testing::ConnectionAttemptInjector::AttemptConnection
static void AttemptConnection(grpc_closure *closure, grpc_endpoint **ep, grpc_pollset_set *interested_parties, const grpc_channel_args *channel_args, const grpc_resolved_address *addr, grpc_core::Timestamp deadline)
Definition: connection_delay_injector.cc:90
grpc_core::ExecCtx
Definition: exec_ctx.h:97
tests.unit._exit_scenarios.port
port
Definition: _exit_scenarios.py:179
grpc::testing::ConnectionHoldInjector::Hold
Definition: connection_delay_injector.h:162
grpc::testing::ConnectionAttemptInjector::InjectedDelay
Definition: connection_delay_injector.h:111
g_mu
static gpr_mu g_mu
Definition: iomgr.cc:55
grpc::testing::ConnectionHoldInjector::Hold::start_cv_
grpc_core::CondVar start_cv_
Definition: connection_delay_injector.h:195
grpc_sockaddr_get_port
int grpc_sockaddr_get_port(const grpc_resolved_address *resolved_addr)
Definition: sockaddr_utils.cc:303
absl::Now
ABSL_NAMESPACE_BEGIN Time Now()
Definition: abseil-cpp/absl/time/clock.cc:39
grpc_core::Mutex
Definition: src/core/lib/gprpp/sync.h:61
GRPC_ERROR_REF
#define GRPC_ERROR_REF(err)
Definition: error.h:261
grpc::testing::ConnectionAttemptInjector::Start
void Start()
Definition: connection_delay_injector.cc:81
exec_ctx
grpc_core::ExecCtx exec_ctx
Definition: end2end_binder_transport_test.cc:75
grpc::testing::ConnectionHoldInjector::Hold::original_on_complete_
grpc_closure * original_on_complete_
Definition: connection_delay_injector.h:197
grpc_tcp_client_vtable::connect
int64_t(* connect)(grpc_closure *on_connect, grpc_endpoint **endpoint, grpc_pollset_set *interested_parties, const grpc_channel_args *channel_args, const grpc_resolved_address *addr, grpc_core::Timestamp deadline)
Definition: tcp_client.h:33
google::protobuf.internal::Mutex
WrappedMutex Mutex
Definition: bloaty/third_party/protobuf/src/google/protobuf/stubs/mutex.h:113
arg
struct arg arg
grpc_timer_init
void grpc_timer_init(grpc_timer *timer, grpc_core::Timestamp deadline, grpc_closure *closure)
Definition: iomgr/timer.cc:31
closure
Definition: proxy.cc:59
grpc::testing::ConnectionHoldInjector::Hold::intercept_completion_
const bool intercept_completion_
Definition: connection_delay_injector.h:192
grpc::testing::ConnectionHoldInjector::holds_
std::vector< Hold * > holds_
Definition: connection_delay_injector.h:214
grpc::testing::ConnectionHoldInjector::Hold::Fail
void Fail(grpc_error_handle error)
Definition: connection_delay_injector.cc:163
grpc_tcp_client_vtable
Definition: tcp_client.h:32
grpc_core::ExecCtx::Now
Timestamp Now()
Definition: exec_ctx.cc:90
grpc::testing::ConnectionHoldInjector::Hold::on_complete_
grpc_closure on_complete_
Definition: connection_delay_injector.h:196
grpc_error
Definition: error_internal.h:42
grpc_core::Duration
Definition: src/core/lib/gprpp/time.h:122
self
PHP_PROTO_OBJECT_FREE_END PHP_PROTO_OBJECT_DTOR_END intern self
Definition: bloaty/third_party/protobuf/php/ext/google/protobuf/map.c:543
port_
int port_
Definition: streams_not_seen_test.cc:377
grpc::testing::ConnectionHoldInjector::AddHold
std::unique_ptr< Hold > AddHold(int port, bool intercept_completion=false)
Definition: connection_delay_injector.cc:206
grpc::testing::ConnectionHoldInjector::Hold::OnComplete
static void OnComplete(void *arg, grpc_error_handle error)
Definition: connection_delay_injector.cc:189
grpc_closure
Definition: closure.h:56
grpc_endpoint
Definition: endpoint.h:105
grpc_core::Closure::Run
static void Run(const DebugLocation &location, grpc_closure *closure, grpc_error_handle error)
Definition: closure.h:250
addr
struct sockaddr_in addr
Definition: libuv/docs/code/tcp-echo-server/main.c:10
sync.h
grpc_core::ExecCtx::Get
static ExecCtx * Get()
Definition: exec_ctx.h:205


grpc
Author(s):
autogenerated on Fri May 16 2025 02:58:01