xds_server.cc
Go to the documentation of this file.
1 //
2 // Copyright 2017 gRPC authors.
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 // http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 //
16 
18 
19 #include <deque>
20 #include <set>
21 #include <string>
22 #include <thread>
23 #include <vector>
24 
25 #include <gmock/gmock.h>
26 #include <gtest/gtest.h>
27 
28 #include "absl/types/optional.h"
29 
30 #include <grpc/support/log.h>
31 
34 #include "src/proto/grpc/testing/xds/ads_for_test.grpc.pb.h"
35 #include "src/proto/grpc/testing/xds/lrs_for_test.grpc.pb.h"
36 #include "src/proto/grpc/testing/xds/v3/ads.grpc.pb.h"
37 #include "src/proto/grpc/testing/xds/v3/discovery.grpc.pb.h"
38 #include "src/proto/grpc/testing/xds/v3/lrs.grpc.pb.h"
39 
40 namespace grpc {
41 namespace testing {
42 
43 //
44 // AdsServiceImpl
45 //
46 
48  const std::string& type_url,
49  const std::string& name) {
51  ResourceTypeState& resource_type_state = resource_map_[type_url];
52  ++resource_type_state.resource_type_version;
53  ResourceState& resource_state = resource_type_state.resource_name_map[name];
54  resource_state.resource_type_version =
55  resource_type_state.resource_type_version;
56  resource_state.resource = std::move(resource);
58  "ADS[%p]: Updating %s resource %s; resource_type_version now %u",
59  this, type_url.c_str(), name.c_str(),
60  resource_type_state.resource_type_version);
61  for (SubscriptionState* subscription : resource_state.subscriptions) {
62  subscription->update_queue->emplace_back(type_url, name);
63  }
64 }
65 
67  const std::string& name) {
69  ResourceTypeState& resource_type_state = resource_map_[type_url];
70  ++resource_type_state.resource_type_version;
71  ResourceState& resource_state = resource_type_state.resource_name_map[name];
72  resource_state.resource_type_version =
73  resource_type_state.resource_type_version;
74  resource_state.resource.reset();
76  "ADS[%p]: Unsetting %s resource %s; resource_type_version now %u",
77  this, type_url.c_str(), name.c_str(),
78  resource_type_state.resource_type_version);
79  for (SubscriptionState* subscription : resource_state.subscriptions) {
80  subscription->update_queue->emplace_back(type_url, name);
81  }
82 }
83 
84 // Checks whether the client needs to receive a newer version of
85 // the resource.
87  const ResourceTypeState& resource_type_state,
88  const ResourceState& resource_state, int client_resource_type_version) {
89  return client_resource_type_version <
90  resource_type_state.resource_type_version &&
91  resource_state.resource_type_version <=
92  resource_type_state.resource_type_version;
93 }
94 
95 // Subscribes to a resource if not already subscribed:
96 // 1. Sets the update_queue field in subscription_state.
97 // 2. Adds subscription_state to resource_state->subscriptions.
98 bool AdsServiceImpl::MaybeSubscribe(const std::string& resource_type,
99  const std::string& resource_name,
100  SubscriptionState* subscription_state,
101  ResourceState* resource_state,
102  UpdateQueue* update_queue) {
103  // The update_queue will be null if we were not previously subscribed.
104  if (subscription_state->update_queue != nullptr) return false;
105  subscription_state->update_queue = update_queue;
106  resource_state->subscriptions.emplace(subscription_state);
107  gpr_log(GPR_INFO, "ADS[%p]: subscribe to resource type %s name %s state %p",
108  this, resource_type.c_str(), resource_name.c_str(),
109  &subscription_state);
110  return true;
111 }
112 
113 // Removes subscriptions for resources no longer present in the
114 // current request.
116  const std::string& resource_type,
117  const std::set<std::string>& resources_in_current_request,
118  SubscriptionNameMap* subscription_name_map,
119  ResourceNameMap* resource_name_map) {
120  for (auto it = subscription_name_map->begin();
121  it != subscription_name_map->end();) {
122  const std::string& resource_name = it->first;
123  SubscriptionState& subscription_state = it->second;
124  if (resources_in_current_request.find(resource_name) !=
125  resources_in_current_request.end()) {
126  ++it;
127  continue;
128  }
129  gpr_log(GPR_INFO, "ADS[%p]: Unsubscribe to type=%s name=%s state=%p", this,
130  resource_type.c_str(), resource_name.c_str(), &subscription_state);
131  auto resource_it = resource_name_map->find(resource_name);
132  GPR_ASSERT(resource_it != resource_name_map->end());
133  auto& resource_state = resource_it->second;
134  resource_state.subscriptions.erase(&subscription_state);
135  if (resource_state.subscriptions.empty() &&
136  !resource_state.resource.has_value()) {
137  resource_name_map->erase(resource_it);
138  }
139  it = subscription_name_map->erase(it);
140  }
141 }
142 
145  ads_done_ = false;
146 }
147 
149  {
151  if (!ads_done_) {
152  ads_done_ = true;
154  }
155  resource_type_response_state_.clear();
156  }
157  gpr_log(GPR_INFO, "ADS[%p]: shut down", this);
158 }
159 
160 //
161 // LrsServiceImpl::ClientStats
162 //
163 
165  uint64_t sum = 0;
166  for (auto& p : locality_stats_) {
167  sum += p.second.total_successful_requests;
168  }
169  return sum;
170 }
171 
173  uint64_t sum = 0;
174  for (auto& p : locality_stats_) {
175  sum += p.second.total_requests_in_progress;
176  }
177  return sum;
178 }
179 
181  uint64_t sum = 0;
182  for (auto& p : locality_stats_) {
183  sum += p.second.total_error_requests;
184  }
185  return sum;
186 }
187 
189  uint64_t sum = 0;
190  for (auto& p : locality_stats_) {
191  sum += p.second.total_issued_requests;
192  }
193  return sum;
194 }
195 
197  const std::string& category) const {
198  auto iter = dropped_requests_.find(category);
199  GPR_ASSERT(iter != dropped_requests_.end());
200  return iter->second;
201 }
202 
204  const ClientStats& other) {
205  for (const auto& p : other.locality_stats_) {
206  locality_stats_[p.first] += p.second;
207  }
208  total_dropped_requests_ += other.total_dropped_requests_;
209  for (const auto& p : other.dropped_requests_) {
210  dropped_requests_[p.first] += p.second;
211  }
212  return *this;
213 }
214 
215 //
216 // LrsServiceImpl
217 //
218 
220  {
222  lrs_done_ = false;
223  }
224  {
226  result_queue_.clear();
227  }
228 }
229 
231  {
233  if (!lrs_done_) {
234  lrs_done_ = true;
235  lrs_cv_.SignalAll();
236  }
237  }
238  gpr_log(GPR_INFO, "LRS[%p]: shut down", this);
239 }
240 
241 std::vector<LrsServiceImpl::ClientStats> LrsServiceImpl::WaitForLoadReport() {
244  if (result_queue_.empty()) {
246  while (result_queue_.empty()) {
247  cv.Wait(&load_report_mu_);
248  }
249  load_report_cond_ = nullptr;
250  }
251  std::vector<ClientStats> result = std::move(result_queue_.front());
252  result_queue_.pop_front();
253  return result;
254 }
255 
256 } // namespace testing
257 } // namespace grpc
grpc::testing::LrsServiceImpl::ClientStats::locality_stats_
std::map< std::string, LocalityStats > locality_stats_
Definition: xds_server.h:776
Any
struct Any Any
Definition: bloaty/third_party/protobuf/php/ext/google/protobuf/protobuf.h:633
_gevent_test_main.result
result
Definition: _gevent_test_main.py:96
GPR_INFO
#define GPR_INFO
Definition: include/grpc/impl/codegen/log.h:56
testing
Definition: aws_request_signer_test.cc:25
grpc_core::CondVar
Definition: src/core/lib/gprpp/sync.h:126
grpc::testing::AdsServiceImpl::SetResource
void SetResource(google::protobuf::Any resource, const std::string &type_url, const std::string &name)
Definition: xds_server.cc:47
regen-readme.it
it
Definition: regen-readme.py:15
grpc::testing::AdsServiceImpl::SubscriptionState::update_queue
UpdateQueue * update_queue
Definition: xds_server.h:199
log.h
grpc::testing::LrsServiceImpl::lrs_cv_
grpc_core::CondVar lrs_cv_
Definition: xds_server.h:901
grpc::testing::LrsServiceImpl::ClientStats
Definition: xds_server.h:702
grpc::testing::AdsServiceImpl::ads_mu_
grpc_core::Mutex ads_mu_
Definition: xds_server.h:677
grpc
Definition: grpcpp/alarm.h:33
grpc::testing::AdsServiceImpl::ClientNeedsResourceUpdate
static bool ClientNeedsResourceUpdate(const ResourceTypeState &resource_type_state, const ResourceState &resource_state, int client_resource_type_version)
Definition: xds_server.cc:86
grpc::testing::LrsServiceImpl::ClientStats::operator+=
ClientStats & operator+=(const ClientStats &other)
Definition: xds_server.cc:203
grpc_core::MutexLock
Definition: src/core/lib/gprpp/sync.h:88
grpc::testing::LrsServiceImpl::ClientStats::total_successful_requests
uint64_t total_successful_requests() const
Definition: xds_server.cc:164
grpc::testing::sum
double sum(const T &container, F functor)
Definition: test/cpp/qps/stats.h:30
locality_stats_
RefCountedPtr< XdsClusterLocalityStats > locality_stats_
Definition: xds_cluster_impl.cc:208
testing::internal::string
::std::string string
Definition: bloaty/third_party/protobuf/third_party/googletest/googletest/include/gtest/internal/gtest-port.h:881
grpc::testing::LrsServiceImpl::ClientStats::total_requests_in_progress
uint64_t total_requests_in_progress() const
Definition: xds_server.cc:172
grpc::testing::LrsServiceImpl::WaitForLoadReport
std::vector< ClientStats > WaitForLoadReport()
Definition: xds_server.cc:241
setup.name
name
Definition: setup.py:542
grpc::testing::AdsServiceImpl::ads_cond_
grpc_core::CondVar ads_cond_
Definition: xds_server.h:676
load_report_cond_
grpc::internal::CondVar load_report_cond_
Definition: grpclb_end2end_test.cc:340
grpc::testing::AdsServiceImpl::ProcessUnsubscriptions
void ProcessUnsubscriptions(const std::string &resource_type, const std::set< std::string > &resources_in_current_request, SubscriptionNameMap *subscription_name_map, ResourceNameMap *resource_name_map)
Definition: xds_server.cc:115
grpc::testing::LrsServiceImpl::ClientStats::dropped_requests_
std::map< std::string, uint64_t > dropped_requests_
Definition: xds_server.h:778
grpc::testing::LrsServiceImpl::ClientStats::total_dropped_requests_
uint64_t total_dropped_requests_
Definition: xds_server.h:777
grpc::testing::AdsServiceImpl::MaybeSubscribe
bool MaybeSubscribe(const std::string &resource_type, const std::string &resource_name, SubscriptionState *subscription_state, ResourceState *resource_state, UpdateQueue *update_queue)
Definition: xds_server.cc:98
grpc::testing::AdsServiceImpl::Start
void Start()
Definition: xds_server.cc:143
grpc::testing::AdsServiceImpl::SubscriptionState
Definition: xds_server.h:197
grpc_core::CondVar::SignalAll
void SignalAll()
Definition: src/core/lib/gprpp/sync.h:135
parse_address.h
grpc::testing::AdsServiceImpl::ResourceState
Definition: xds_server.h:215
absl::move
constexpr absl::remove_reference_t< T > && move(T &&t) noexcept
Definition: abseil-cpp/absl/utility/utility.h:221
GPR_ASSERT
#define GPR_ASSERT(x)
Definition: include/grpc/impl/codegen/log.h:94
grpc::testing::AdsServiceImpl::ResourceState::resource_type_version
int resource_type_version
Definition: xds_server.h:219
gpr_log
GPRAPI void gpr_log(const char *file, int line, gpr_log_severity severity, const char *format,...) GPR_PRINT_FORMAT_CHECK(4
grpc::testing::AdsServiceImpl::Shutdown
void Shutdown()
Definition: xds_server.cc:148
uint64_t
unsigned __int64 uint64_t
Definition: stdint-msvc2008.h:90
grpc::testing::AdsServiceImpl::UnsetResource
void UnsetResource(const std::string &type_url, const std::string &name)
Definition: xds_server.cc:66
grpc::testing::LrsServiceImpl::Start
void Start() ABSL_LOCKS_EXCLUDED(lrs_mu_
Definition: xds_server.cc:219
grpc::testing::AdsServiceImpl::UpdateQueue
std::deque< std::pair< std::string, std::string > > UpdateQueue
Definition: xds_server.h:194
grpc::testing::LrsServiceImpl::ClientStats::total_issued_requests
uint64_t total_issued_requests() const
Definition: xds_server.cc:188
grpc::testing::AdsServiceImpl::ResourceNameMap
std::map< std::string, ResourceState > ResourceNameMap
Definition: xds_server.h:226
grpc::testing::AdsServiceImpl::ResourceState::resource
absl::optional< google::protobuf::Any > resource
Definition: xds_server.h:217
grpc::testing::AdsServiceImpl::SubscriptionNameMap
std::map< std::string, SubscriptionState > SubscriptionNameMap
Definition: xds_server.h:204
xds_server.h
grpc::testing::cv
static gpr_cv cv
Definition: bm_cq.cc:163
grpc::testing::AdsServiceImpl::ResourceTypeState
Definition: xds_server.h:228
absl::optional::reset
ABSL_ATTRIBUTE_REINITIALIZES void reset() noexcept
Definition: abseil-cpp/absl/types/optional.h:342
grpc::testing::LrsServiceImpl::ClientStats::total_error_requests
uint64_t total_error_requests() const
Definition: xds_server.cc:180
type_url
string * type_url
Definition: bloaty/third_party/protobuf/conformance/conformance_cpp.cc:72
grpc::testing::LrsServiceImpl::load_report_mu_
void load_report_mu_
Definition: xds_server.h:815
grpc::testing::AdsServiceImpl::ResourceTypeState::resource_type_version
int resource_type_version
Definition: xds_server.h:229
grpc::testing::LrsServiceImpl::ClientStats::dropped_requests
uint64_t dropped_requests(const std::string &category) const
Definition: xds_server.cc:196
iter
Definition: test_winkernel.cpp:47
grpc::testing::AdsServiceImpl::ResourceState::subscriptions
std::set< SubscriptionState * > subscriptions
Definition: xds_server.h:221
grpc::testing::LrsServiceImpl::lrs_mu_
grpc_core::Mutex lrs_mu_
Definition: xds_server.h:902
grpc::testing::AdsServiceImpl::ResourceTypeState::resource_name_map
ResourceNameMap resource_name_map
Definition: xds_server.h:230
sync.h
grpc::testing::LrsServiceImpl::Shutdown
void Shutdown()
Definition: xds_server.cc:230


grpc
Author(s):
autogenerated on Fri May 16 2025 03:01:00