tcp_connect_handshaker.cc
Go to the documentation of this file.
1 //
2 //
3 // Copyright 2022 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 // http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18 
20 
22 
23 #include <memory>
24 
25 #include "absl/base/thread_annotations.h"
26 #include "absl/memory/memory.h"
27 #include "absl/status/statusor.h"
28 
30 #include <grpc/slice.h>
31 #include <grpc/support/alloc.h>
32 #include <grpc/support/log.h>
33 
56 
57 namespace grpc_core {
58 
59 namespace {
60 
61 class TCPConnectHandshaker : public Handshaker {
62  public:
63  explicit TCPConnectHandshaker(grpc_pollset_set* pollset_set);
64  void Shutdown(grpc_error_handle why) override;
65  void DoHandshake(grpc_tcp_server_acceptor* /*acceptor*/,
67  HandshakerArgs* args) override;
68  const char* name() const override { return "tcp_connect"; }
69 
70  private:
71  ~TCPConnectHandshaker() override;
72  void CleanupArgsForFailureLocked() ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_);
74  static void Connected(void* arg, grpc_error_handle error);
75 
78  // Endpoint and read buffer to destroy after a shutdown.
84  HandshakerArgs* args_ = nullptr;
88 };
89 
90 TCPConnectHandshaker::TCPConnectHandshaker(grpc_pollset_set* pollset_set)
93  // Interested parties might be null for platforms like Apple.
94  // Explicitly check before adding/deleting from pollset_set to handle this
95  // use case.
96  if (interested_parties_ != nullptr) {
98  }
99  GRPC_CLOSURE_INIT(&connected_, Connected, this, grpc_schedule_on_exec_ctx);
100 }
101 
103  // TODO(anramach): After migration to EventEngine, cancel the in-progress
104  // TCP connection attempt.
105  {
106  MutexLock lock(&mu_);
107  if (!shutdown_) {
108  shutdown_ = true;
109  // If we are shutting down while connecting, respond back with
110  // handshake done.
111  // The callback from grpc_tcp_client_connect will perform
112  // the necessary clean up.
113  if (on_handshake_done_ != nullptr) {
114  CleanupArgsForFailureLocked();
115  FinishLocked(
116  GRPC_ERROR_CREATE_FROM_STATIC_STRING("tcp handshaker shutdown"));
117  }
118  }
119  }
120  GRPC_ERROR_UNREF(why);
121 }
122 
123 void TCPConnectHandshaker::DoHandshake(grpc_tcp_server_acceptor* /*acceptor*/,
125  HandshakerArgs* args) {
126  {
127  MutexLock lock(&mu_);
129  }
130  GPR_ASSERT(args->endpoint == nullptr);
131  args_ = args;
132  char* address = grpc_channel_args_find_string(
134  absl::StatusOr<URI> uri = URI::Parse(address);
135  if (!uri.ok() || !grpc_parse_uri(*uri, &addr_)) {
136  MutexLock lock(&mu_);
138  "Resolved address in invalid format"));
139  return;
140  }
143  const char* args_to_remove[] = {
146  // Update args to not contain the args relevant to TCP connect handshaker.
148  args->args, args_to_remove, GPR_ARRAY_SIZE(args_to_remove));
150  args->args = channel_args;
151  // In some implementations, the closure can be flushed before
152  // grpc_tcp_client_connect() returns, and since the closure requires access
153  // to mu_, this can result in a deadlock (see
154  // https://github.com/grpc/grpc/issues/16427 for details).
155  // grpc_tcp_client_connect() will fill endpoint_ with proper contents, and we
156  // make sure that we still exist at that point by taking a ref.
157  Ref().release(); // Ref held by callback.
158  // As we fake the TCP client connection failure when shutdown is called
159  // we don't want to pass args->endpoint directly.
160  // Instead pass endpoint_ and swap this endpoint to
161  // args endpoint on success.
163  interested_parties_, args->args, &addr_,
164  args->deadline);
165 }
166 
167 void TCPConnectHandshaker::Connected(void* arg, grpc_error_handle error) {
168  RefCountedPtr<TCPConnectHandshaker> self(
169  static_cast<TCPConnectHandshaker*>(arg));
170  {
171  MutexLock lock(&self->mu_);
172  if (!GRPC_ERROR_IS_NONE(error) || self->shutdown_) {
173  if (GRPC_ERROR_IS_NONE(error)) {
174  error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("tcp handshaker shutdown");
175  } else {
177  }
178  if (self->endpoint_to_destroy_ != nullptr) {
179  grpc_endpoint_shutdown(self->endpoint_to_destroy_,
181  }
182  if (!self->shutdown_) {
183  self->CleanupArgsForFailureLocked();
184  self->shutdown_ = true;
185  self->FinishLocked(error);
186  } else {
187  // The on_handshake_done_ is already as part of shutdown when connecting
188  // So nothing to be done here other than unrefing the error.
190  }
191  return;
192  }
193  GPR_ASSERT(self->endpoint_to_destroy_ != nullptr);
194  self->args_->endpoint = self->endpoint_to_destroy_;
195  self->endpoint_to_destroy_ = nullptr;
196  if (self->bind_endpoint_to_pollset_) {
197  grpc_endpoint_add_to_pollset_set(self->args_->endpoint,
198  self->interested_parties_);
199  }
200  self->FinishLocked(GRPC_ERROR_NONE);
201  }
202 }
203 
204 TCPConnectHandshaker::~TCPConnectHandshaker() {
205  if (endpoint_to_destroy_ != nullptr) {
207  }
208  if (read_buffer_to_destroy_ != nullptr) {
211  }
213 }
214 
215 void TCPConnectHandshaker::CleanupArgsForFailureLocked() {
216  read_buffer_to_destroy_ = args_->read_buffer;
217  args_->read_buffer = nullptr;
219  args_->args = nullptr;
220 }
221 
222 void TCPConnectHandshaker::FinishLocked(grpc_error_handle error) {
223  if (interested_parties_ != nullptr) {
225  }
227  on_handshake_done_ = nullptr;
228 }
229 
230 //
231 // TCPConnectHandshakerFactory
232 //
233 
234 class TCPConnectHandshakerFactory : public HandshakerFactory {
235  public:
236  void AddHandshakers(const grpc_channel_args* /*args*/,
237  grpc_pollset_set* interested_parties,
238  HandshakeManager* handshake_mgr) override {
239  handshake_mgr->Add(
240  MakeRefCounted<TCPConnectHandshaker>(interested_parties));
241  }
242  ~TCPConnectHandshakerFactory() override = default;
243 };
244 
245 } // namespace
246 
248  builder->handshaker_registry()->RegisterHandshakerFactory(
249  true /* at_start */, HANDSHAKER_CLIENT,
250  absl::make_unique<TCPConnectHandshakerFactory>());
251 }
252 
253 } // namespace grpc_core
GRPC_CLOSURE_INIT
#define GRPC_CLOSURE_INIT(closure, cb, cb_arg, scheduler)
Definition: closure.h:115
grpc_channel_args_find_string
char * grpc_channel_args_find_string(const grpc_channel_args *args, const char *name)
Definition: channel_args.cc:441
GRPC_ERROR_NONE
#define GRPC_ERROR_NONE
Definition: error.h:234
log.h
core_configuration.h
MutexLock
#define MutexLock(x)
Definition: bloaty/third_party/re2/util/mutex.h:125
on_handshake_done_
grpc_closure * on_handshake_done_
Definition: security_handshaker.cc:124
polling_entity.h
read_buffer_to_destroy_
grpc_slice_buffer * read_buffer_to_destroy_
Definition: security_handshaker.cc:120
grpc_channel_args_copy_and_remove
grpc_channel_args * grpc_channel_args_copy_and_remove(const grpc_channel_args *src, const char **to_remove, size_t num_to_remove)
Definition: channel_args.cc:231
slice.h
false
#define false
Definition: setup_once.h:323
mu_
Mutex mu_
Definition: tcp_connect_handshaker.cc:76
grpc_core
Definition: call_metric_recorder.h:31
grpc_core::CoreConfiguration::Builder
Definition: core_configuration.h:41
grpc_pollset_set
struct grpc_pollset_set grpc_pollset_set
Definition: iomgr_fwd.h:23
grpc_pollset_set_create
grpc_pollset_set * grpc_pollset_set_create()
Definition: pollset_set.cc:29
gpr_free
GPRAPI void gpr_free(void *ptr)
Definition: alloc.cc:51
useful.h
error
grpc_error_handle error
Definition: retry_filter.cc:499
grpc_resolved_address
Definition: resolved_address.h:34
closure.h
ABSL_GUARDED_BY
#define ABSL_GUARDED_BY(x)
Definition: abseil-cpp/absl/base/thread_annotations.h:62
setup.name
name
Definition: setup.py:542
grpc_tcp_client_connect
int64_t grpc_tcp_client_connect(grpc_closure *on_connect, grpc_endpoint **endpoint, grpc_pollset_set *interested_parties, const grpc_channel_args *channel_args, const grpc_resolved_address *addr, grpc_core::Timestamp deadline)
Definition: tcp_client.cc:25
resolved_address.h
bind_endpoint_to_pollset_
bool bind_endpoint_to_pollset_
Definition: tcp_connect_handshaker.cc:85
grpc_core::URI::Parse
static absl::StatusOr< URI > Parse(absl::string_view uri_text)
Definition: uri_parser.cc:209
grpc_channel_args_find_bool
bool grpc_channel_args_find_bool(const grpc_channel_args *args, const char *name, bool default_value)
Definition: channel_args.cc:465
grpc_channel_args
Definition: grpc_types.h:132
grpc_types.h
grpc_parse_uri
bool grpc_parse_uri(const grpc_core::URI &uri, grpc_resolved_address *resolved_addr)
Definition: parse_address.cc:293
DEBUG_LOCATION
#define DEBUG_LOCATION
Definition: debug_location.h:41
parse_address.h
profile_analyzer.builder
builder
Definition: profile_analyzer.py:159
grpc_core::HANDSHAKER_CLIENT
@ HANDSHAKER_CLIENT
Definition: handshaker_registry.h:35
asyncio_get_stats.args
args
Definition: asyncio_get_stats.py:40
GPR_ASSERT
#define GPR_ASSERT(x)
Definition: include/grpc/impl/codegen/log.h:94
grpc_core::RegisterTCPConnectHandshaker
void RegisterTCPConnectHandshaker(CoreConfiguration::Builder *builder)
Definition: tcp_connect_handshaker.cc:247
ABSL_EXCLUSIVE_LOCKS_REQUIRED
#define ABSL_EXCLUSIVE_LOCKS_REQUIRED(...)
Definition: abseil-cpp/absl/base/thread_annotations.h:145
grpc_pollset_set_destroy
void grpc_pollset_set_destroy(grpc_pollset_set *pollset_set)
Definition: pollset_set.cc:33
handshaker_registry.h
pollset_set.h
grpc_polling_entity_del_from_pollset_set
void grpc_polling_entity_del_from_pollset_set(grpc_polling_entity *pollent, grpc_pollset_set *pss_dst)
Definition: polling_entity.cc:78
grpc_channel_args_destroy
void grpc_channel_args_destroy(grpc_channel_args *a)
Definition: channel_args.cc:360
arg
Definition: cmdline.cc:40
args_
HandshakerArgs * args_
Definition: tcp_connect_handshaker.cc:84
endpoint_to_destroy_
grpc_endpoint * endpoint_to_destroy_
Definition: security_handshaker.cc:119
error.h
grpc_endpoint_shutdown
void grpc_endpoint_shutdown(grpc_endpoint *ep, grpc_error_handle why)
Definition: endpoint.cc:49
grpc_polling_entity
Definition: polling_entity.h:38
benchmark::Shutdown
void Shutdown()
Definition: benchmark/src/benchmark.cc:607
slice_internal.h
grpc_endpoint_destroy
void grpc_endpoint_destroy(grpc_endpoint *ep)
Definition: endpoint.cc:53
tcp_connect_handshaker.h
addr_
grpc_resolved_address addr_
Definition: tcp_connect_handshaker.cc:86
tcp_client.h
GRPC_ERROR_CREATE_FROM_STATIC_STRING
#define GRPC_ERROR_CREATE_FROM_STATIC_STRING(desc)
Definition: error.h:291
absl::StatusOr::ok
ABSL_MUST_USE_RESULT bool ok() const
Definition: abseil-cpp/absl/status/statusor.h:491
GPR_ARRAY_SIZE
#define GPR_ARRAY_SIZE(array)
Definition: useful.h:129
shutdown_
bool shutdown_
Definition: pick_first.cc:173
GRPC_ERROR_REF
#define GRPC_ERROR_REF(err)
Definition: error.h:261
GRPC_ARG_TCP_HANDSHAKER_RESOLVED_ADDRESS
#define GRPC_ARG_TCP_HANDSHAKER_RESOLVED_ADDRESS
Definition: tcp_connect_handshaker.h:25
debug_location.h
pollent_
grpc_polling_entity pollent_
Definition: tcp_connect_handshaker.cc:83
grpc_tcp_server_acceptor
Definition: tcp_server.h:36
grpc_endpoint_add_to_pollset_set
void grpc_endpoint_add_to_pollset_set(grpc_endpoint *ep, grpc_pollset_set *pollset_set)
Definition: endpoint.cc:39
handshaker_factory.h
alloc.h
google::protobuf.internal::Mutex
WrappedMutex Mutex
Definition: bloaty/third_party/protobuf/src/google/protobuf/stubs/mutex.h:113
grpc_polling_entity_add_to_pollset_set
void grpc_polling_entity_add_to_pollset_set(grpc_polling_entity *pollent, grpc_pollset_set *pss_dst)
Definition: polling_entity.cc:61
GRPC_ARG_TCP_HANDSHAKER_BIND_ENDPOINT_TO_POLLSET
#define GRPC_ARG_TCP_HANDSHAKER_BIND_ENDPOINT_TO_POLLSET
Definition: tcp_connect_handshaker.h:29
arg
struct arg arg
exec_ctx.h
handshaker.h
GRPC_ERROR_UNREF
#define GRPC_ERROR_UNREF(err)
Definition: error.h:262
grpc_core::ExecCtx::Run
static void Run(const DebugLocation &location, grpc_closure *closure, grpc_error_handle error)
Definition: exec_ctx.cc:98
ref_counted_ptr.h
connected_
grpc_closure connected_
Definition: tcp_connect_handshaker.cc:87
channel_args.h
grpc_slice_buffer_destroy_internal
void grpc_slice_buffer_destroy_internal(grpc_slice_buffer *sb)
Definition: slice/slice_buffer.cc:123
interested_parties_
grpc_pollset_set * interested_parties_
Definition: tcp_connect_handshaker.cc:82
grpc_slice_buffer
Definition: include/grpc/impl/codegen/slice.h:83
absl::StatusOr
Definition: abseil-cpp/absl/status/statusor.h:187
uri_parser.h
iomgr_fwd.h
endpoint.h
grpc_error
Definition: error_internal.h:42
grpc_polling_entity_create_from_pollset_set
grpc_polling_entity grpc_polling_entity_create_from_pollset_set(grpc_pollset_set *pollset_set)
Definition: polling_entity.cc:26
self
PHP_PROTO_OBJECT_FREE_END PHP_PROTO_OBJECT_DTOR_END intern self
Definition: bloaty/third_party/protobuf/php/ext/google/protobuf/map.c:543
testing::Ref
internal::RefMatcher< T & > Ref(T &x)
Definition: cares/cares/test/gmock-1.8.0/gmock/gmock.h:8628
tcp_server.h
grpc_closure
Definition: closure.h:56
grpc_endpoint
Definition: endpoint.h:105
on_handshake_done
static void on_handshake_done(void *arg, grpc_error_handle error)
Definition: ssl_server_fuzzer.cc:45
sync.h
GRPC_ERROR_IS_NONE
#define GRPC_ERROR_IS_NONE(err)
Definition: error.h:241
port_platform.h


grpc
Author(s):
autogenerated on Fri May 16 2025 03:00:25