17 #ifndef GRPC_TEST_CPP_END2END_XDS_XDS_SERVER_H
18 #define GRPC_TEST_CPP_END2END_XDS_XDS_SERVER_H
26 #include <gmock/gmock.h>
27 #include <gtest/gtest.h>
29 #include "absl/types/optional.h"
35 #include "src/proto/grpc/testing/xds/ads_for_test.grpc.pb.h"
36 #include "src/proto/grpc/testing/xds/lrs_for_test.grpc.pb.h"
37 #include "src/proto/grpc/testing/xds/v3/ads.grpc.pb.h"
38 #include "src/proto/grpc/testing/xds/v3/cluster.grpc.pb.h"
39 #include "src/proto/grpc/testing/xds/v3/discovery.grpc.pb.h"
40 #include "src/proto/grpc/testing/xds/v3/discovery.pb.h"
41 #include "src/proto/grpc/testing/xds/v3/endpoint.grpc.pb.h"
42 #include "src/proto/grpc/testing/xds/v3/listener.grpc.pb.h"
43 #include "src/proto/grpc/testing/xds/v3/lrs.grpc.pb.h"
44 #include "src/proto/grpc/testing/xds/v3/route.grpc.pb.h"
52 "type.googleapis.com/envoy.config.listener.v3.Listener";
54 "type.googleapis.com/envoy.config.route.v3.RouteConfiguration";
56 "type.googleapis.com/envoy.config.cluster.v3.Cluster";
58 "type.googleapis.com/envoy.config.endpoint.v3.ClusterLoadAssignment";
60 constexpr
char kLdsV2TypeUrl[] =
"type.googleapis.com/envoy.api.v2.Listener";
62 "type.googleapis.com/envoy.api.v2.RouteConfiguration";
63 constexpr
char kCdsV2TypeUrl[] =
"type.googleapis.com/envoy.api.v2.Cluster";
65 "type.googleapis.com/envoy.api.v2.ClusterLoadAssignment";
87 ::envoy::service::discovery::v2::AggregatedDiscoveryService::Service*
92 ::envoy::service::discovery::v3::AggregatedDiscoveryService::Service*
99 wrap_resources_ = wrap_resources;
116 const ::envoy::config::route::v3::RouteConfiguration&
route) {
129 const ::envoy::config::endpoint::v3::ClusterLoadAssignment& assignment) {
139 resource_types_to_ignore_.emplace(
type_url);
154 return absl::nullopt;
157 resource_type_response_state_[
type_url].pop_front();
236 template <
class RpcApi,
class DiscoveryRequest,
class DiscoveryResponse>
249 if (
parent_->forced_ads_failure_.has_value()) {
251 "ADS[%p]: StreamAggregatedResources forcing early failure "
252 "with status code: %d, message: %s",
253 this,
parent_->forced_ads_failure_.value().error_code(),
254 parent_->forced_ads_failure_.value().error_message().c_str());
255 return parent_->forced_ads_failure_.value();
267 std::shared_ptr<AdsServiceImpl> ads_service_impl =
279 std::deque<DiscoveryRequest> requests;
280 bool stream_closed =
false;
282 &requests, &stream_closed));
288 bool did_work =
false;
295 if (stream_closed ||
parent_->ads_done_)
break;
297 if (!requests.empty()) {
299 requests.pop_front();
302 "ADS[%p]: Received request for type %s with content %s",
303 this,
request.type_url().c_str(),
304 request.DebugString().c_str());
307 SentState& sent_state = sent_state_map[v3_resource_type];
310 &subscription_map, &sent_state, &
response);
322 if (!update_queue.empty()) {
327 update_queue.pop_front();
329 SentState& sent_state = sent_state_map[resource_type];
330 ProcessUpdate(resource_type, resource_name, &subscription_map,
358 for (
auto& p : subscription_map) {
361 for (
auto& q : subscription_name_map) {
366 ResourceState& resource_state = resource_name_map[resource_name];
411 if (
request.response_nonce().empty()) {
412 int client_resource_type_version = 0;
413 if (!
request.version_info().empty()) {
415 &client_resource_type_version));
418 parent_->resource_type_min_versions_[v3_resource_type])
419 <<
"resource_type: " << v3_resource_type;
425 if (!
request.has_error_detail()) {
428 this,
request.type_url().c_str(),
429 request.version_info().c_str());
436 "ADS[%p]: client NACKed resource_type=%s version=%s: %s",
437 this,
request.type_url().c_str(),
438 request.version_info().c_str(),
441 parent_->resource_type_response_state_[v3_resource_type].emplace_back(
444 if (client_nonce < sent_state->nonce)
return;
447 if (
parent_->resource_types_to_ignore_.find(v3_resource_type) !=
448 parent_->resource_types_to_ignore_.end()) {
452 auto& subscription_name_map = (*subscription_map)[v3_resource_type];
453 auto& resource_type_state =
parent_->resource_map_[v3_resource_type];
454 auto& resource_name_map = resource_type_state.resource_name_map;
455 std::set<std::string> resources_in_current_request;
456 std::set<std::string> resources_added_to_response;
458 resources_in_current_request.emplace(resource_name);
459 auto& subscription_state = subscription_name_map[resource_name];
460 auto& resource_state = resource_name_map[resource_name];
466 &subscription_state, &resource_state,
469 sent_state->resource_type_version)) {
471 request.type_url().c_str(), resource_name.c_str());
472 resources_added_to_response.emplace(resource_name);
474 if (resource_state.resource.has_value()) {
475 auto* resource = (*response)->add_resources();
476 resource->CopyFrom(resource_state.resource.value());
478 resource->set_type_url(
request.type_url());
480 if (
parent_->wrap_resources_) {
482 *resource_wrapper.mutable_resource() =
std::move(*resource);
483 resource->PackFrom(resource_wrapper);
488 "ADS[%p]: client does not need update for type=%s name=%s",
489 this,
request.type_url().c_str(), resource_name.c_str());
495 v3_resource_type, resources_in_current_request,
496 &subscription_name_map, &resource_name_map);
498 if (!resources_added_to_response.empty()) {
500 v3_resource_type,
request.type_url(),
501 resource_type_state.resource_type_version, subscription_name_map,
502 resources_added_to_response, sent_state, &
response->value());
515 gpr_log(
GPR_INFO,
"ADS[%p]: Received update for type=%s name=%s",
this,
516 resource_type.c_str(), resource_name.c_str());
517 auto& subscription_name_map = (*subscription_map)[resource_type];
518 auto& resource_type_state =
parent_->resource_map_[resource_type];
519 auto& resource_name_map = resource_type_state.resource_name_map;
520 auto it = subscription_name_map.find(resource_name);
521 if (
it != subscription_name_map.end()) {
522 ResourceState& resource_state = resource_name_map[resource_name];
526 resource_type.c_str(), resource_name.c_str());
529 auto* resource = (*response)->add_resources();
532 resource->set_type_url(v2_resource_type);
536 resource_type, v2_resource_type,
537 resource_type_state.resource_type_version, subscription_name_map,
538 {resource_name}, sent_state, &
response->value());
545 bool* stream_closed) {
547 bool seen_first_request =
false;
549 if (!seen_first_request) {
553 "envoy.lb.does_not_support_overprovisioning",
554 "xds.config.resource-in-sotw"));
556 seen_first_request =
true;
565 *stream_closed =
true;
573 const std::set<std::string>& resources_added_to_response,
577 response->set_type_url(
is_v2_ ? v2_resource_type : resource_type);
583 for (
const auto& p : subscription_name_map) {
585 if (resources_added_to_response.find(resource_name) ==
586 resources_added_to_response.end()) {
588 parent_->resource_map_[resource_type].resource_name_map;
590 resource_name_map[resource_name];
592 auto* resource =
response->add_resources();
595 resource->set_type_url(v2_resource_type);
601 sent_state->resource_type_version =
version;
609 return resource_type;
617 return resource_type;
621 const ::envoy::api::v2::DiscoveryRequest&
request) {
626 const ::envoy::service::discovery::v3::DiscoveryRequest& ) {}
636 const ResourceState& resource_state,
int client_resource_type_version);
650 const std::set<std::string>& resources_in_current_request,
664 RpcService<::envoy::service::discovery::v2::AggregatedDiscoveryService,
665 ::envoy::api::v2::DiscoveryRequest,
666 ::envoy::api::v2::DiscoveryResponse>
668 RpcService<::envoy::service::discovery::v3::AggregatedDiscoveryService,
669 ::envoy::service::discovery::v3::DiscoveryRequest,
670 ::envoy::service::discovery::v3::DiscoveryResponse>
709 template <
class UpstreamLocalityStats>
711 const UpstreamLocalityStats& upstream_locality_stats)
738 template <
class ClusterStats>
743 for (
const auto& input_locality_stats :
744 cluster_stats.upstream_locality_stats()) {
748 for (
const auto& input_dropped_requests :
749 cluster_stats.dropped_requests()) {
751 input_dropped_requests.dropped_count());
782 std::set<std::string> cluster_names)
786 client_load_reporting_interval_seconds),
789 ::envoy::service::load_stats::v2::LoadReportingService::Service*
794 ::envoy::service::load_stats::v3::LoadReportingService::Service*
823 template <class RpcApi, class LoadStatsRequest, class LoadStatsResponse>
836 std::shared_ptr<LrsServiceImpl> lrs_service_impl =
844 request.node().client_features(),
848 if (
parent_->send_all_clusters_) {
849 response.set_send_all_clusters(
true);
855 response.mutable_load_reporting_interval()->set_seconds(
856 parent_->client_load_reporting_interval_seconds_);
863 this,
request.DebugString().c_str());
864 std::vector<ClientStats>
stats;
865 for (
const auto& cluster_stats :
request.cluster_stats()) {
866 stats.emplace_back(cluster_stats);
870 if (
parent_->load_report_cond_ !=
nullptr) {
871 parent_->load_report_cond_->Signal();
888 RpcService<::envoy::service::load_stats::v2::LoadReportingService,
889 ::envoy::service::load_stats::v2::LoadStatsRequest,
890 ::envoy::service::load_stats::v2::LoadStatsResponse>
892 RpcService<::envoy::service::load_stats::v3::LoadReportingService,
893 ::envoy::service::load_stats::v3::LoadStatsRequest,
894 ::envoy::service::load_stats::v3::LoadStatsResponse>
908 std::deque<std::vector<ClientStats>> result_queue_
915 #endif // GRPC_TEST_CPP_END2END_XDS_XDS_SERVER_H