26 #include <gmock/gmock.h>
27 #include <gtest/gtest.h>
29 #include "absl/memory/memory.h"
30 #include "absl/strings/str_cat.h"
31 #include "absl/strings/str_format.h"
32 #include "absl/strings/str_join.h"
33 #include "absl/strings/str_replace.h"
34 #include "absl/strings/string_view.h"
35 #include "absl/types/optional.h"
47 #include "src/proto/grpc/testing/xds/v3/router.grpc.pb.h"
54 using ::envoy::config::endpoint::v3::ClusterLoadAssignment;
55 using ::envoy::config::endpoint::v3::HealthStatus;
57 using ::envoy::extensions::filters::network::http_connection_manager::v3::
58 HttpConnectionManager;
60 using ::grpc::experimental::ExternalCertificateVerifier;
61 using ::grpc::experimental::IdentityKeyCertPair;
62 using ::grpc::experimental::StaticDataCertificateProvider;
71 status_map[uri] =
update.status;
80 while ((
it = status_map.find(uri)) == status_map.end() ||
81 it->second.error_code() != expected_status) {
94 void* ChannelArgsArgCopy(
void* p) {
98 void ChannelArgsArgDestroy(
void* p) {
102 int ChannelArgsArgCmp(
void* a,
void*
b) {
108 ChannelArgsArgCopy, ChannelArgsArgDestroy, ChannelArgsArgCmp};
121 args->SetPointerWithVtable(
127 std::vector<std::unique_ptr<grpc::ServerBuilderPlugin>>* )
148 thread_ = absl::make_unique<std::thread>(
183 if (
GetParam().bootstrap_source() ==
186 absl::make_unique<XdsChannelArgsServerBuilderOption>(
test_obj_));
189 builder.experimental().set_drain_grace_time(
224 std::shared_ptr<ServerCredentials>
226 if (
GetParam().use_xds_credentials()) {
227 if (use_xds_enabled_server()) {
237 auto certificate_provider =
238 std::make_shared<grpc::experimental::StaticDataCertificateProvider>(
241 certificate_provider);
243 options.watch_identity_key_cert_pairs();
254 builder->RegisterService(&backend_service_);
255 builder->RegisterService(&backend_service1_);
256 builder->RegisterService(&backend_service2_);
260 backend_service_.Start();
261 backend_service1_.Start();
262 backend_service2_.Start();
266 backend_service_.Shutdown();
267 backend_service1_.Shutdown();
268 backend_service2_.Shutdown();
286 builder->RegisterService(ads_service_->v2_rpc_service());
287 builder->RegisterService(ads_service_->v3_rpc_service());
288 builder->RegisterService(lrs_service_->v2_rpc_service());
289 builder->RegisterService(lrs_service_->v3_rpc_service());
293 ads_service_->Start();
294 lrs_service_->Start();
298 ads_service_->Shutdown();
299 lrs_service_->Shutdown();
307 std::vector<std::string>
fields;
308 fields.push_back(MakeXdsServersText(top_server_));
309 if (!client_default_listener_resource_name_template_.empty()) {
311 absl::StrCat(
" \"client_default_listener_resource_name_template\": \"",
312 client_default_listener_resource_name_template_,
"\""));
314 fields.push_back(MakeNodeText());
315 if (!server_listener_resource_name_template_.empty()) {
317 absl::StrCat(
" \"server_listener_resource_name_template\": \"",
318 server_listener_resource_name_template_,
"\""));
320 fields.push_back(MakeCertificateProviderText());
321 fields.push_back(MakeAuthorityText());
327 constexpr
char kXdsServerTemplate[] =
328 " \"xds_servers\": [\n"
330 " \"server_uri\": \"<SERVER_URI>\",\n"
331 " \"channel_creds\": [\n"
333 " \"type\": \"fake\"\n"
336 " \"server_features\": [<SERVER_FEATURES>]\n"
339 std::vector<std::string> server_features;
340 if (!v2_) server_features.push_back(
"\"xds_v3\"");
341 if (ignore_resource_deletion_) {
342 server_features.push_back(
"\"ignore_resource_deletion\"");
347 {
"<SERVER_FEATURES>",
absl::StrJoin(server_features,
", ")}});
351 constexpr
char kXdsNode[] =
353 " \"id\": \"xds_end2end_test\",\n"
354 " \"cluster\": \"test\",\n"
356 " \"foo\": \"bar\"\n"
359 " \"region\": \"corp\",\n"
360 " \"zone\": \"svl\",\n"
361 " \"sub_zone\": \"mp3\"\n"
368 std::vector<std::string> entries;
369 for (
const auto& p : plugins_) {
372 std::vector<std::string>
fields;
391 std::vector<std::string> entries;
392 for (
const auto& p : authorities_) {
395 std::vector<std::string>
fields = {
396 MakeXdsServersText(authority_info.
server)};
399 "\"client_listener_resource_name_template\": \"",
424 request->mutable_param()->mutable_expected_error()->set_code(
427 if (server_sleep_us != 0) {
428 request->mutable_param()->set_server_sleep_us(server_sleep_us);
430 if (client_cancel_after_us != 0) {
431 request->mutable_param()->set_client_cancel_after_us(
432 client_cancel_after_us);
434 if (skip_cancelled_check) {
435 request->mutable_param()->set_skip_cancelled_check(
true);
444 "xds_default_locality_region";
453 "default_server_route_config_name";
457 "src/core/tsi/test_creds/server1.pem";
459 "src/core/tsi/test_creds/server1.key";
464 bool localhost_resolves_to_ipv4 =
false;
465 bool localhost_resolves_to_ipv6 =
false;
467 &localhost_resolves_to_ipv6);
468 ipv6_only_ = !localhost_resolves_to_ipv4 && localhost_resolves_to_ipv6;
474 auto* filter = http_connection_manager.add_http_filters();
475 filter->set_name(
"router");
476 filter->mutable_typed_config()->PackFrom(
477 envoy::extensions::filters::http::router::v3::Router());
480 http_connection_manager);
484 virtual_host->add_domains(
"*");
485 auto*
route = virtual_host->add_routes();
486 route->mutable_match()->set_prefix(
"");
492 eds_config->mutable_eds_config()->mutable_self();
495 if (
GetParam().enable_load_reporting()) {
505 virtual_host->add_domains(
"*");
506 route = virtual_host->add_routes();
507 route->mutable_match()->set_prefix(
"");
508 route->mutable_non_forwarding_action();
511 ->mutable_socket_address()
512 ->set_address(
ipv6_only_ ?
"::1" :
"127.0.0.1");
515 ->mutable_typed_config()
516 ->PackFrom(http_connection_manager);
533 std::unique_ptr<XdsEnd2endTest::BalancerServerThread>
535 std::unique_ptr<BalancerServerThread> balancer =
536 absl::make_unique<BalancerServerThread>(
this);
542 return absl::StrCat(
"grpc/server?xds.resource.listening_address=",
548 Listener listener = listener_template;
550 listener.mutable_address()->mutable_socket_address()->set_port_value(
port);
557 listener.api_listener().api_listener().UnpackTo(&http_connection_manager);
558 return http_connection_manager;
563 auto* api_listener = listener->mutable_api_listener()->mutable_api_listener();
564 api_listener->PackFrom(hcm);
570 listener.default_filter_chain().filters().at(0).typed_config().UnpackTo(
571 &http_connection_manager);
572 return http_connection_manager;
577 listener->mutable_default_filter_chain()
580 .mutable_typed_config()
588 if (
GetParam().enable_rds_testing()) {
589 auto* rds = http_connection_manager.mutable_rds();
590 rds->set_route_config_name(route_config.name());
591 rds->mutable_config_source()->mutable_self();
594 *http_connection_manager.mutable_route_config() = route_config;
596 hcm_accessor.
Pack(http_connection_manager, &listener);
603 if (
GetParam().enable_rds_testing()) {
607 : *listener_to_copy);
609 listener.mutable_api_listener()->mutable_api_listener()->UnpackTo(
610 &http_connection_manager);
611 *(http_connection_manager.mutable_route_config()) = route_config;
612 listener.mutable_api_listener()->mutable_api_listener()->PackFrom(
613 http_connection_manager);
618 std::vector<XdsEnd2endTest::EdsResourceArgs::Endpoint>
621 HealthStatus health_status,
623 if (stop_index == 0) stop_index =
backends_.size();
624 std::vector<EdsResourceArgs::Endpoint> endpoints;
625 for (
size_t i = start_index;
i < stop_index; ++
i) {
635 for (
const auto& locality :
args.locality_list) {
636 auto* endpoints = assignment.add_endpoints();
637 endpoints->mutable_load_balancing_weight()->set_value(locality.lb_weight);
638 endpoints->set_priority(locality.priority);
641 endpoints->mutable_locality()->set_sub_zone(locality.sub_zone);
642 for (
size_t i = 0;
i < locality.endpoints.size(); ++
i) {
643 const int&
port = locality.endpoints[
i].port;
644 auto* lb_endpoints = endpoints->add_lb_endpoints();
645 if (locality.endpoints.size() >
i &&
646 locality.endpoints[
i].health_status != HealthStatus::UNKNOWN) {
647 lb_endpoints->set_health_status(locality.endpoints[
i].health_status);
649 if (locality.endpoints.size() >
i &&
650 locality.endpoints[
i].lb_weight >= 1) {
651 lb_endpoints->mutable_load_balancing_weight()->set_value(
652 locality.endpoints[
i].lb_weight);
654 auto* endpoint = lb_endpoints->mutable_endpoint();
655 auto* address = endpoint->mutable_address();
656 auto* socket_address = address->mutable_socket_address();
657 socket_address->set_address(
ipv6_only_ ?
"::1" :
"127.0.0.1");
658 socket_address->set_port_value(
port);
661 if (!
args.drop_categories.empty()) {
662 auto* policy = assignment.mutable_policy();
663 for (
const auto& p :
args.drop_categories) {
665 const uint32_t parts_per_million = p.second;
666 auto* drop_overload = policy->add_drop_overloads();
667 drop_overload->set_category(
name);
668 auto* drop_percentage = drop_overload->mutable_drop_percentage();
669 drop_percentage->set_numerator(parts_per_million);
670 drop_percentage->set_denominator(
args.drop_denominator);
678 if (stop_index == 0) stop_index =
backends_.size();
679 for (
size_t i = start_index;
i < stop_index; ++
i) {
680 backends_[
i]->backend_service()->ResetCounters();
681 backends_[
i]->backend_service1()->ResetCounters();
682 backends_[
i]->backend_service2()->ResetCounters();
688 switch (rpc_service) {
695 if (
backends_[backend_idx]->backend_service1()->request_count() == 0) {
700 if (
backends_[backend_idx]->backend_service2()->request_count() == 0) {
710 if (stop_index == 0) stop_index =
backends_.size();
711 for (
size_t i = start_index;
i < stop_index; ++
i) {
720 size_t stop_index)
const {
721 if (stop_index == 0) stop_index =
backends_.size();
722 std::vector<int> backend_ports;
723 for (
size_t i = start_index;
i < stop_index; ++
i) {
726 return backend_ports;
731 int xds_resource_does_not_exist_timeout_ms) {
732 if (xds_resource_does_not_exist_timeout_ms > 0) {
735 xds_resource_does_not_exist_timeout_ms));
737 if (!lb_expected_authority.empty()) {
738 constexpr
char authority_const[] =
"localhost:%d";
739 if (lb_expected_authority == authority_const) {
740 lb_expected_authority =
745 const_cast<char*
>(lb_expected_authority.c_str())));
787 int failover_timeout_ms,
const char*
server_name,
const char* xds_authority,
790 if (
args ==
nullptr)
args = &local_args;
793 if (failover_timeout_ms > 0) {
802 args->SetPointerWithVtable(
807 std::shared_ptr<ChannelCredentials> channel_creds =
810 : std::make_shared<SecureChannelCredentials>(
817 EchoResponse local_response;
822 auto*
error =
request.mutable_param()->mutable_expected_error();
856 if (!continue_predicate(
result))
return;
858 << debug_location.
file() <<
":" << debug_location.
line();
870 <<
"code=" <<
result.status.error_code()
871 <<
" message=" <<
result.status.error_message() <<
" at "
872 << debug_location.
file() <<
":" << debug_location.
line();
884 << debug_location.
file() <<
":" << debug_location.
line();
886 << debug_location.
file() <<
":" << debug_location.
line();
889 << debug_location.
file() <<
":" << debug_location.
line();
896 size_t num_failed = 0;
900 if (!
result.status.ok()) {
901 EXPECT_EQ(result.status.error_code(), expected_status)
902 << debug_location.file() <<
":" << debug_location.line();
903 EXPECT_THAT(result.status.error_message(),
904 ::testing::StartsWith(expected_message_prefix))
905 << debug_location.file() <<
":" << debug_location.line();
908 return ++
n < num_rpcs;
915 grpc::testing::EchoTestService::Stub*
stub,
const RpcOptions& rpc_options) {
926 if (sender_thread_.joinable()) sender_thread_.join();
930 if (sender_thread_.joinable()) sender_thread_.join();
936 grpc::testing::EchoTestService::Stub*
stub,
size_t num_rpcs,
939 std::vector<ConcurrentRpc> rpcs(num_rpcs);
944 size_t completed = 0;
946 for (
size_t i = 0;
i < num_rpcs;
i++) {
953 rpc->elapsed_time = NowFromCycleCounter() - t0;
956 grpc_core::MutexLock lock(&mu);
957 done = (++completed) == num_rpcs;
967 <<
" at " << debug_location.
file() <<
":" << debug_location.
line();
975 if (check_status ==
nullptr) {
978 <<
"code=" <<
result.status.error_code()
979 <<
" message=" <<
result.status.error_message() <<
" at "
980 << debug_location.
file() <<
":" << debug_location.
line();
984 "========= WAITING FOR BACKENDS [%" PRIuPTR
", %" PRIuPTR
986 start_index, stop_index);
1008 auto continue_predicate = [&]() {
1012 response_state = get_state();
1019 <<
"code=" <<
status.error_code()
1020 <<
" message=" <<
status.error_message() <<
" at "
1021 << debug_location.
file() <<
":" << debug_location.
line();
1022 }
while (continue_predicate());
1023 return response_state;
1032 return file_contents;
1036 const char* key_path,
const char* cert_path) {
1041 std::shared_ptr<ChannelCredentials>
1043 IdentityKeyCertPair key_cert_pair;
1048 auto certificate_provider = std::make_shared<StaticDataCertificateProvider>(
1053 options.watch_identity_key_cert_pairs();
1055 ExternalCertificateVerifier::Create<SyncCertificateVerifier>(
true);
1057 options.set_verify_server_certs(
true);
1058 options.set_check_call_host(
false);
1061 return channel_creds;