port_sharing_end2end_test.cc
Go to the documentation of this file.
1 /*
2  *
3  * Copyright 2019 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
19 #include <mutex>
20 #include <thread>
21 
22 #include <gtest/gtest.h>
23 
24 #include <grpc/grpc.h>
25 #include <grpc/support/alloc.h>
26 #include <grpc/support/log.h>
27 #include <grpc/support/time.h>
28 #include <grpcpp/channel.h>
29 #include <grpcpp/client_context.h>
30 #include <grpcpp/create_channel.h>
33 #include <grpcpp/server.h>
34 #include <grpcpp/server_builder.h>
35 #include <grpcpp/server_context.h>
36 
37 #include "src/core/lib/gpr/env.h"
44 #include "src/proto/grpc/testing/echo.grpc.pb.h"
45 #include "test/core/util/port.h"
50 
51 #ifdef GRPC_POSIX_SOCKET_TCP_SERVER
52 
54 
55 namespace grpc {
56 namespace testing {
57 namespace {
58 
59 class TestScenario {
60  public:
61  TestScenario(bool server_port, bool pending_data,
62  const std::string& creds_type)
63  : server_has_port(server_port),
64  queue_pending_data(pending_data),
65  credentials_type(creds_type) {}
66  void Log() const;
67  // server has its own port or not
68  bool server_has_port;
69  // whether tcp server should read some data before handoff
70  bool queue_pending_data;
72 };
73 
74 std::ostream& operator<<(std::ostream& out, const TestScenario& scenario) {
75  return out << "TestScenario{server_has_port="
76  << (scenario.server_has_port ? "true" : "false")
77  << ", queue_pending_data="
78  << (scenario.queue_pending_data ? "true" : "false")
79  << ", credentials='" << scenario.credentials_type << "'}";
80 }
81 
82 void TestScenario::Log() const {
83  std::ostringstream out;
84  out << *this;
85  gpr_log(GPR_ERROR, "%s", out.str().c_str());
86 }
87 
88 // Set up a test tcp server which is in charge of accepting connections and
89 // handing off the connections as fds.
90 class TestTcpServer {
91  public:
92  TestTcpServer()
93  : shutdown_(false),
94  queue_data_(false),
96  std::ostringstream server_address;
97  server_address << "localhost:" << port_;
98  address_ = server_address.str();
99  test_tcp_server_init(&tcp_server_, &TestTcpServer::OnConnect, this);
100  GRPC_CLOSURE_INIT(&on_fd_released_, &TestTcpServer::OnFdReleased, this,
101  grpc_schedule_on_exec_ctx);
102  }
103 
104  ~TestTcpServer() {
105  running_thread_.join();
108  }
109 
110  // Read some data before handing off the connection.
111  void SetQueueData() { queue_data_ = true; }
112 
113  void Start() {
115  gpr_log(GPR_INFO, "Test TCP server started at %s", address_.c_str());
116  }
117 
118  const std::string& address() { return address_; }
119 
120  void SetAcceptor(
121  std::unique_ptr<experimental::ExternalConnectionAcceptor> acceptor) {
122  connection_acceptor_ = std::move(acceptor);
123  }
124 
125  void Run() {
126  running_thread_ = std::thread([this]() {
127  while (true) {
128  {
129  std::lock_guard<std::mutex> lock(mu_);
130  if (shutdown_) {
131  return;
132  }
133  }
135  }
136  });
137  }
138 
139  void Shutdown() {
140  std::lock_guard<std::mutex> lock(mu_);
141  shutdown_ = true;
142  }
143 
144  static void OnConnect(void* arg, grpc_endpoint* tcp,
145  grpc_pollset* accepting_pollset,
146  grpc_tcp_server_acceptor* acceptor) {
147  auto* self = static_cast<TestTcpServer*>(arg);
148  self->OnConnect(tcp, accepting_pollset, acceptor);
149  }
150 
151  static void OnFdReleased(void* arg, grpc_error_handle err) {
152  auto* self = static_cast<TestTcpServer*>(arg);
153  self->OnFdReleased(err);
154  }
155 
156  private:
157  void OnConnect(grpc_endpoint* tcp, grpc_pollset* /*accepting_pollset*/,
158  grpc_tcp_server_acceptor* acceptor) {
160  gpr_log(GPR_INFO, "Got incoming connection! from %s", peer.c_str());
162  listener_fd_ = grpc_tcp_server_port_fd(
163  acceptor->from_server, acceptor->port_index, acceptor->fd_index);
164  gpr_free(acceptor);
165  grpc_tcp_destroy_and_release_fd(tcp, &fd_, &on_fd_released_);
166  }
167 
168  void OnFdReleased(grpc_error_handle err) {
170  experimental::ExternalConnectionAcceptor::NewConnectionParameters p;
171  p.listener_fd = listener_fd_;
172  p.fd = fd_;
173  if (queue_data_) {
174  char buf[1024];
175  ssize_t read_bytes = 0;
176  while (read_bytes <= 0) {
177  read_bytes = read(fd_, buf, 1024);
178  }
179  Slice data(buf, read_bytes);
180  p.read_buffer = ByteBuffer(&data, 1);
181  }
182  gpr_log(GPR_INFO, "Handing off fd %d with data size %d from listener fd %d",
183  fd_, static_cast<int>(p.read_buffer.Length()), listener_fd_);
184  connection_acceptor_->HandleNewConnection(&p);
185  }
186 
187  std::mutex mu_;
188  bool shutdown_;
189 
190  int listener_fd_ = -1;
191  int fd_ = -1;
192  bool queue_data_ = false;
193 
194  grpc_closure on_fd_released_;
195  std::thread running_thread_;
196  int port_ = -1;
198  std::unique_ptr<experimental::ExternalConnectionAcceptor>
199  connection_acceptor_;
201 };
202 
203 class PortSharingEnd2endTest : public ::testing::TestWithParam<TestScenario> {
204  protected:
205  PortSharingEnd2endTest() : is_server_started_(false), first_picked_port_(0) {
206  GetParam().Log();
207  }
208 
209  void SetUp() override {
210  if (GetParam().queue_pending_data) {
211  tcp_server1_.SetQueueData();
212  tcp_server2_.SetQueueData();
213  }
214  tcp_server1_.Start();
215  tcp_server2_.Start();
216  ServerBuilder builder;
217  if (GetParam().server_has_port) {
220  server_address_ << "localhost:" << port;
222  GetParam().credentials_type);
223  builder.AddListeningPort(server_address_.str(), creds);
224  gpr_log(GPR_INFO, "gRPC server listening on %s",
225  server_address_.str().c_str());
226  }
227  auto server_creds = GetCredentialsProvider()->GetServerCredentials(
228  GetParam().credentials_type);
229  auto acceptor1 = builder.experimental().AddExternalConnectionAcceptor(
231  server_creds);
232  tcp_server1_.SetAcceptor(std::move(acceptor1));
233  auto acceptor2 = builder.experimental().AddExternalConnectionAcceptor(
235  server_creds);
236  tcp_server2_.SetAcceptor(std::move(acceptor2));
237  builder.RegisterService(&service_);
238  server_ = builder.BuildAndStart();
239  is_server_started_ = true;
240 
241  tcp_server1_.Run();
242  tcp_server2_.Run();
243  }
244 
245  void TearDown() override {
246  tcp_server1_.Shutdown();
247  tcp_server2_.Shutdown();
248  if (is_server_started_) {
249  server_->Shutdown();
250  }
251  if (first_picked_port_ > 0) {
253  }
254  }
255 
256  void ResetStubs() {
258  ChannelArguments args;
260  auto channel_creds = GetCredentialsProvider()->GetChannelCredentials(
261  GetParam().credentials_type, &args);
262  channel_handoff1_ =
263  CreateCustomChannel(tcp_server1_.address(), channel_creds, args);
264  stub_handoff1_ = EchoTestService::NewStub(channel_handoff1_);
265  channel_handoff2_ =
266  CreateCustomChannel(tcp_server2_.address(), channel_creds, args);
267  stub_handoff2_ = EchoTestService::NewStub(channel_handoff2_);
268  if (GetParam().server_has_port) {
269  ChannelArguments direct_args;
270  direct_args.SetInt(GRPC_ARG_USE_LOCAL_SUBCHANNEL_POOL, 1);
271  auto direct_creds = GetCredentialsProvider()->GetChannelCredentials(
272  GetParam().credentials_type, &direct_args);
273  channel_direct_ =
274  CreateCustomChannel(server_address_.str(), direct_creds, direct_args);
275  stub_direct_ = EchoTestService::NewStub(channel_direct_);
276  }
277  }
278 
279  bool is_server_started_;
280  // channel/stub to the test tcp server, the connection will be handed to the
281  // grpc server.
282  std::shared_ptr<Channel> channel_handoff1_;
283  std::unique_ptr<EchoTestService::Stub> stub_handoff1_;
284  std::shared_ptr<Channel> channel_handoff2_;
285  std::unique_ptr<EchoTestService::Stub> stub_handoff2_;
286  // channel/stub to talk to the grpc server directly, if applicable.
287  std::shared_ptr<Channel> channel_direct_;
288  std::unique_ptr<EchoTestService::Stub> stub_direct_;
289  std::unique_ptr<Server> server_;
290  std::ostringstream server_address_;
292  TestTcpServer tcp_server1_;
293  TestTcpServer tcp_server2_;
294  int first_picked_port_;
295 };
296 
297 void SendRpc(EchoTestService::Stub* stub, int num_rpcs) {
298  EchoRequest request;
299  EchoResponse response;
300  request.set_message("Hello hello hello hello");
301 
302  for (int i = 0; i < num_rpcs; ++i) {
303  ClientContext context;
304  Status s = stub->Echo(&context, request, &response);
305  EXPECT_EQ(response.message(), request.message());
306  EXPECT_TRUE(s.ok());
307  }
308 }
309 
310 std::vector<TestScenario> CreateTestScenarios() {
311  std::vector<TestScenario> scenarios;
312  std::vector<std::string> credentials_types;
313 
314 #if TARGET_OS_IPHONE
315  // Workaround Apple CFStream bug
316  gpr_setenv("grpc_cfstream", "0");
317 #endif
318 
319  credentials_types = GetCredentialsProvider()->GetSecureCredentialsTypeList();
320  // Only allow insecure credentials type when it is registered with the
321  // provider. User may create providers that do not have insecure.
322  if (GetCredentialsProvider()->GetChannelCredentials(kInsecureCredentialsType,
323  nullptr) != nullptr) {
324  credentials_types.push_back(kInsecureCredentialsType);
325  }
326 
327  GPR_ASSERT(!credentials_types.empty());
328  for (const auto& cred : credentials_types) {
329  for (auto server_has_port : {true, false}) {
330  for (auto queue_pending_data : {true, false}) {
331  scenarios.emplace_back(server_has_port, queue_pending_data, cred);
332  }
333  }
334  }
335  return scenarios;
336 }
337 
338 TEST_P(PortSharingEnd2endTest, HandoffAndDirectCalls) {
339  ResetStubs();
340  SendRpc(stub_handoff1_.get(), 5);
341  if (GetParam().server_has_port) {
342  SendRpc(stub_direct_.get(), 5);
343  }
344 }
345 
346 TEST_P(PortSharingEnd2endTest, MultipleHandoff) {
347  for (int i = 0; i < 3; i++) {
348  ResetStubs();
349  SendRpc(stub_handoff2_.get(), 1);
350  }
351 }
352 
353 TEST_P(PortSharingEnd2endTest, TwoHandoffPorts) {
354  for (int i = 0; i < 3; i++) {
355  ResetStubs();
356  SendRpc(stub_handoff1_.get(), 5);
357  SendRpc(stub_handoff2_.get(), 5);
358  }
359 }
360 
361 INSTANTIATE_TEST_SUITE_P(PortSharingEnd2end, PortSharingEnd2endTest,
363 
364 } // namespace
365 } // namespace testing
366 } // namespace grpc
367 
368 #endif // GRPC_POSIX_SOCKET_TCP_SERVER
369 
370 int main(int argc, char** argv) {
371  grpc::testing::TestEnvironment env(&argc, argv);
372  ::testing::InitGoogleTest(&argc, argv);
373  return RUN_ALL_TESTS();
374 }
main
int main(int argc, char **argv)
Definition: port_sharing_end2end_test.cc:370
GRPC_CLOSURE_INIT
#define GRPC_CLOSURE_INIT(closure, cb, cb_arg, scheduler)
Definition: closure.h:115
EXPECT_FALSE
#define EXPECT_FALSE(condition)
Definition: bloaty/third_party/googletest/googletest/include/gtest/gtest.h:1970
test_credentials_provider.h
grpc::gpr_setenv
gpr_setenv("STS_CREDENTIALS", creds_file_name)
GPR_INFO
#define GPR_INFO
Definition: include/grpc/impl/codegen/log.h:56
testing
Definition: aws_request_signer_test.cc:25
test_tcp_server_init
void test_tcp_server_init(test_tcp_server *server, grpc_tcp_server_cb on_connect, void *user_data)
Definition: test_tcp_server.cc:43
gen_build_yaml.out
dictionary out
Definition: src/benchmark/gen_build_yaml.py:24
pollset.h
GRPC_ERROR_NONE
#define GRPC_ERROR_NONE
Definition: error.h:234
log.h
grpc::testing::CredentialsProvider::GetChannelCredentials
virtual std::shared_ptr< ChannelCredentials > GetChannelCredentials(const std::string &type, ChannelArguments *args)=0
port.h
grpc::ServerBuilder::experimental_type::ExternalConnectionType::FROM_FD
@ FROM_FD
test_tcp_server_start
void test_tcp_server_start(test_tcp_server *server, int port)
Definition: test_tcp_server.cc:57
generate.env
env
Definition: generate.py:37
grpc::gpr_free
gpr_free(creds_file_name)
grpc_tcp_destroy_and_release_fd
void grpc_tcp_destroy_and_release_fd(grpc_endpoint *ep, int *fd, grpc_closure *done)
grpc
Definition: grpcpp/alarm.h:33
false
#define false
Definition: setup_once.h:323
testing::internal::Log
GTEST_API_ void Log(LogSeverity severity, const std::string &message, int stack_frames_to_skip)
Definition: bloaty/third_party/googletest/googlemock/src/gmock-internal-utils.cc:149
mutex
static uv_mutex_t mutex
Definition: threadpool.c:34
benchmark.request
request
Definition: benchmark.py:77
buf
voidpf void * buf
Definition: bloaty/third_party/zlib/contrib/minizip/ioapi.h:136
server_port
static int server_port
Definition: bad_server_response_test.cc:86
grpc_recycle_unused_port
void grpc_recycle_unused_port(int port)
testing::internal::string
::std::string string
Definition: bloaty/third_party/protobuf/third_party/googletest/googletest/include/gtest/internal/gtest-port.h:881
address_
ServerAddress address_
Definition: ring_hash.cc:194
grpc_endpoint_get_peer
absl::string_view grpc_endpoint_get_peer(grpc_endpoint *ep)
Definition: endpoint.cc:55
error_ref_leak.err
err
Definition: error_ref_leak.py:35
tcp
static uv_tcp_t tcp
Definition: test-connection-fail.c:29
grpc_tcp_server_acceptor::fd_index
unsigned fd_index
Definition: tcp_server.h:42
GRPC_ARG_USE_LOCAL_SUBCHANNEL_POOL
#define GRPC_ARG_USE_LOCAL_SUBCHANNEL_POOL
Definition: grpc_types.h:443
env.h
absl::FormatConversionChar::s
@ s
time.h
xds_manager.p
p
Definition: xds_manager.py:60
async_greeter_client.stub
stub
Definition: hellostreamingworld/async_greeter_client.py:26
test_tcp_server
Definition: test_tcp_server.h:30
credentials.h
server_address
std::string server_address("0.0.0.0:10000")
mu_
Mutex mu_
Definition: oob_backend_metric.cc:115
server_
Server *const server_
Definition: chttp2_server.cc:260
test_service_impl.h
scenario
Definition: test/core/fling/client.cc:135
testing::TestWithParam
Definition: bloaty/third_party/googletest/googletest/include/gtest/gtest.h:1883
test_tcp_server_destroy
void test_tcp_server_destroy(test_tcp_server *server)
Definition: test_tcp_server.cc:103
grpc::testing::kInsecureCredentialsType
const char kInsecureCredentialsType[]
Definition: test_credentials_provider.h:31
test_tcp_server.h
grpc::testing::CredentialsProvider::GetServerCredentials
virtual std::shared_ptr< ServerCredentials > GetServerCredentials(const std::string &type)=0
profile_analyzer.builder
builder
Definition: profile_analyzer.py:159
ssize_t
intptr_t ssize_t
Definition: win.h:27
asyncio_get_stats.args
args
Definition: asyncio_get_stats.py:40
absl::move
constexpr absl::remove_reference_t< T > && move(T &&t) noexcept
Definition: abseil-cpp/absl/utility/utility.h:221
GPR_ASSERT
#define GPR_ASSERT(x)
Definition: include/grpc/impl/codegen/log.h:94
gpr_log
GPRAPI void gpr_log(const char *file, int line, gpr_log_severity severity, const char *format,...) GPR_PRINT_FORMAT_CHECK(4
tcp_posix.h
grpc.h
test_tcp_server_poll
void test_tcp_server_poll(test_tcp_server *server, int milliseconds)
Definition: test_tcp_server.cc:87
channel.h
arg
Definition: cmdline.cc:40
read_bytes
static int read_bytes(int fd, char *buf, size_t read_size, int spin)
Definition: low_level_ping_pong.cc:71
grpc::testing::INSTANTIATE_TEST_SUITE_P
INSTANTIATE_TEST_SUITE_P(HistogramTestCases, HistogramTest, ::testing::Range< int >(0, GRPC_STATS_HISTOGRAM_COUNT))
scenarios
static const scenario scenarios[]
Definition: test/core/fling/client.cc:141
data
char data[kBufferLength]
Definition: abseil-cpp/absl/strings/internal/str_format/float_conversion.cc:1006
grpc_tcp_server_port_fd
int grpc_tcp_server_port_fd(grpc_tcp_server *s, unsigned port_index, unsigned fd_index)
Definition: tcp_server.cc:53
benchmark::Shutdown
void Shutdown()
Definition: benchmark/src/benchmark.cc:607
RUN_ALL_TESTS
int RUN_ALL_TESTS() GTEST_MUST_USE_RESULT_
Definition: bloaty/third_party/googletest/googletest/include/gtest/gtest.h:2471
grpc_tcp_server_acceptor::from_server
grpc_tcp_server * from_server
Definition: tcp_server.h:39
GPR_ERROR
#define GPR_ERROR
Definition: include/grpc/impl/codegen/log.h:57
grpc_pick_unused_port_or_die
int grpc_pick_unused_port_or_die(void)
first_picked_port_
int first_picked_port_
Definition: end2end_test.cc:478
server_credentials.h
tests.unit._exit_scenarios.port
port
Definition: _exit_scenarios.py:179
test_config.h
grpc_tcp_server_acceptor::port_index
unsigned port_index
Definition: tcp_server.h:41
is_server_started_
bool is_server_started_
Definition: client_callback_end2end_test.cc:335
shutdown_
bool shutdown_
Definition: pick_first.cc:173
read
int read(izstream &zs, T *x, Items items)
Definition: bloaty/third_party/zlib/contrib/iostream2/zstream.h:115
client_context.h
testing::InitGoogleTest
GTEST_API_ void InitGoogleTest(int *argc, char **argv)
Definition: bloaty/third_party/googletest/googletest/src/gtest.cc:6106
credentials.h
grpc_tcp_server_acceptor::external_connection
bool external_connection
Definition: tcp_server.h:44
grpc::testing::GetCredentialsProvider
CredentialsProvider * GetCredentialsProvider()
Definition: test_credentials_provider.cc:169
grpc_tcp_server_acceptor
Definition: tcp_server.h:36
port.h
alloc.h
credentials_type
const std::string credentials_type
Definition: async_end2end_test.cc:237
server_context.h
asyncio_get_stats.response
response
Definition: asyncio_get_stats.py:28
grpc::testing::TestEnvironment
Definition: test/core/util/test_config.h:54
grpc::CreateCustomChannel
std::shared_ptr< Channel > CreateCustomChannel(const grpc::string &target, const std::shared_ptr< ChannelCredentials > &creds, const ChannelArguments &args)
grpc::protobuf::util::Status
GRPC_CUSTOM_UTIL_STATUS Status
Definition: include/grpcpp/impl/codegen/config_protobuf.h:93
CreateTestScenarios
std::vector< std::string > CreateTestScenarios()
Definition: time_jump_test.cc:84
arg
struct arg arg
exec_ctx.h
server_address_
const char * server_address_
Definition: settings_timeout_test.cc:231
grpc::testing::EXPECT_EQ
EXPECT_EQ(options.token_exchange_service_uri, "https://foo/exchange")
grpc::testing::CredentialsProvider::GetSecureCredentialsTypeList
virtual std::vector< std::string > GetSecureCredentialsTypeList()=0
tcp_server_
grpc_tcp_server * tcp_server_
Definition: chttp2_server.cc:261
context
grpc::ClientContext context
Definition: istio_echo_server_lib.cc:61
grpc::testing::TEST_P
TEST_P(HistogramTest, IncHistogram)
Definition: stats_test.cc:87
grpc::testing::EXPECT_TRUE
EXPECT_TRUE(grpc::experimental::StsCredentialsOptionsFromJson(minimum_valid_json, &options) .ok())
testing::ValuesIn
internal::ParamGenerator< typename std::iterator_traits< ForwardIterator >::value_type > ValuesIn(ForwardIterator begin, ForwardIterator end)
Definition: bloaty/third_party/googletest/googletest/include/gtest/gtest-param-test.h:297
server.h
endpoint.h
googletest-break-on-failure-unittest.Run
def Run(command)
Definition: bloaty/third_party/googletest/googletest/test/googletest-break-on-failure-unittest.py:76
grpc_error
Definition: error_internal.h:42
grpc::operator<<
std::ostream & operator<<(std::ostream &out, const string_ref &string)
Definition: grpcpp/impl/codegen/string_ref.h:145
port_
int port_
Definition: streams_not_seen_test.cc:377
tcp_server.h
grpc_pollset
Definition: bm_cq_multiple_threads.cc:37
TestServiceImpl
Definition: interop_server.cc:139
grpc::testing::SendRpc
static void SendRpc(grpc::testing::EchoTestService::Stub *stub, int num_rpcs, bool allow_exhaustion, gpr_atm *errors)
Definition: thread_stress_test.cc:277
grpc_closure
Definition: closure.h:56
grpc_endpoint
Definition: endpoint.h:105
thread
static uv_thread_t thread
Definition: test-async-null-cb.c:29
server_builder.h
i
uint64_t i
Definition: abseil-cpp/absl/container/btree_benchmark.cc:230
create_channel.h
service_
std::unique_ptr< grpc::testing::TestServiceImpl > service_
Definition: end2end_binder_transport_test.cc:71


grpc
Author(s):
autogenerated on Fri May 16 2025 02:59:44