22 #include <netinet/in.h>
25 #include <sys/socket.h>
27 #include <sys/types.h>
34 #include <gmock/gmock.h>
36 #include "absl/memory/memory.h"
37 #include "absl/strings/str_cat.h"
65 const int kFakeHandshakeServerMaxConcurrentStreams = 40;
76 const char* server_addr,
const char* fake_handshake_server_addr,
77 int reconnect_backoff_ms) {
82 fake_handshake_server_addr,
87 std::vector<grpc_arg> new_args;
90 if (reconnect_backoff_ms != 0) {
92 const_cast<char*
>(
"grpc.testing.fixed_reconnect_backoff_ms"),
93 reconnect_backoff_ms));
104 class FakeHandshakeServer {
106 explicit FakeHandshakeServer(
bool check_num_concurrent_rpcs) {
109 if (check_num_concurrent_rpcs) {
127 ~FakeHandshakeServer() {
131 const char* address() {
return address_.c_str(); }
135 std::unique_ptr<grpc::Service>
service_;
136 std::unique_ptr<grpc::Server>
server_;
141 explicit TestServer()
142 : fake_handshake_server_(
true ) {
147 alts_options, fake_handshake_server_.address(),
160 server_addr_.c_str());
161 server_thd_ = absl::make_unique<std::thread>(PollUntilShutdown,
this);
174 const char* address() {
return server_addr_.c_str(); }
176 static void PollUntilShutdown(
const TestServer*
self) {
187 std::unique_ptr<std::thread> server_thd_;
197 FakeHandshakeServer fake_handshake_server_;
200 class ConnectLoopRunner {
202 explicit ConnectLoopRunner(
203 const char*
server_address,
const char* fake_handshake_server_addr,
204 int per_connect_deadline_seconds,
size_t loops,
206 int reconnect_backoff_ms)
208 fake_handshake_server_addr_(
210 per_connect_deadline_seconds_(per_connect_deadline_seconds),
212 expected_connectivity_states_(expected_connectivity_states),
213 reconnect_backoff_ms_(reconnect_backoff_ms) {
214 thd_ = absl::make_unique<std::thread>(ConnectLoop,
this);
217 ~ConnectLoopRunner() { thd_->join(); }
219 static void ConnectLoop(
const ConnectLoopRunner*
self) {
220 for (
size_t i = 0;
i <
self->loops_;
i++) {
225 self->server_address_.get(),
self->fake_handshake_server_addr_.get(),
226 self->reconnect_backoff_ms_);
233 while (
state !=
self->expected_connectivity_states_) {
234 if (
self->expected_connectivity_states_ ==
245 <<
"connect_loop runner:" << std::hex <<
self
246 <<
" got ev.type:" << ev.
type <<
" i:" <<
i;
250 if (
self->expected_connectivity_states_ ==
276 int per_connect_deadline_seconds_;
279 std::unique_ptr<std::thread> thd_;
280 int reconnect_backoff_ms_;
285 TEST(AltsConcurrentConnectivityTest, TestBasicClientServerHandshakes) {
286 FakeHandshakeServer fake_handshake_server(
291 test_server.address(), fake_handshake_server.address(),
300 TEST(AltsConcurrentConnectivityTest, TestConcurrentClientServerHandshakes) {
301 FakeHandshakeServer fake_handshake_server(
307 size_t num_concurrent_connects = 50;
308 std::vector<std::unique_ptr<ConnectLoopRunner>> connect_loop_runners;
310 "start performing concurrent expected-to-succeed connects");
311 for (
size_t i = 0;
i < num_concurrent_connects;
i++) {
312 connect_loop_runners.push_back(absl::make_unique<ConnectLoopRunner>(
313 test_server.address(), fake_handshake_server.address(),
318 connect_loop_runners.clear();
320 "done performing concurrent expected-to-succeed connects");
333 TEST(AltsConcurrentConnectivityTest,
334 TestHandshakeFailsFastWhenPeerEndpointClosesConnectionAfterAccepting) {
342 FakeHandshakeServer fake_handshake_server(
348 kWaitForClientToSendFirstBytes,
350 CloseSocketUponReceivingBytesFromPeer);
353 std::vector<std::unique_ptr<ConnectLoopRunner>> connect_loop_runners;
354 size_t num_concurrent_connects = 100;
355 gpr_log(
GPR_DEBUG,
"start performing concurrent expected-to-fail connects");
356 for (
size_t i = 0;
i < num_concurrent_connects;
i++) {
357 connect_loop_runners.push_back(absl::make_unique<ConnectLoopRunner>(
358 fake_backend_server.address(), fake_handshake_server.address(),
363 connect_loop_runners.clear();
367 "Exceeded test deadline. ALTS handshakes might not be failing "
368 "fast when the peer endpoint closes the connection abruptly");
376 TEST(AltsConcurrentConnectivityTest,
377 TestHandshakeFailsFastWhenHandshakeServerClosesConnectionAfterAccepting) {
383 CloseSocketUponReceivingBytesFromPeer);
388 kWaitForClientToSendFirstBytes,
392 std::vector<std::unique_ptr<ConnectLoopRunner>> connect_loop_runners;
393 size_t num_concurrent_connects = 100;
394 gpr_log(
GPR_DEBUG,
"start performing concurrent expected-to-fail connects");
395 for (
size_t i = 0;
i < num_concurrent_connects;
i++) {
396 connect_loop_runners.push_back(absl::make_unique<ConnectLoopRunner>(
397 fake_backend_server.address(), fake_handshake_server.address(),
402 connect_loop_runners.clear();
406 "Exceeded test deadline. ALTS handshakes might not be failing "
407 "fast when the handshake server closes new connections");
416 TEST(AltsConcurrentConnectivityTest,
417 TestHandshakeFailsFastWhenHandshakeServerHangsAfterAccepting) {
427 kWaitForClientToSendFirstBytes,
431 std::vector<std::unique_ptr<ConnectLoopRunner>> connect_loop_runners;
432 size_t num_concurrent_connects = 100;
433 gpr_log(
GPR_DEBUG,
"start performing concurrent expected-to-fail connects");
434 for (
size_t i = 0;
i < num_concurrent_connects;
i++) {
435 connect_loop_runners.push_back(absl::make_unique<ConnectLoopRunner>(
436 fake_backend_server.address(), fake_handshake_server.address(),
441 connect_loop_runners.clear();
445 "Exceeded test deadline. ALTS handshakes might not be failing "
446 "fast when the handshake server is non-response timeout occurs");
454 int main(
int argc,
char** argv) {