xds_server.h
Go to the documentation of this file.
1 //
2 // Copyright 2017 gRPC authors.
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 // http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 //
16 
17 #ifndef GRPC_TEST_CPP_END2END_XDS_XDS_SERVER_H
18 #define GRPC_TEST_CPP_END2END_XDS_XDS_SERVER_H
19 
20 #include <deque>
21 #include <set>
22 #include <string>
23 #include <thread>
24 #include <vector>
25 
26 #include <gmock/gmock.h>
27 #include <gtest/gtest.h>
28 
29 #include "absl/types/optional.h"
30 
31 #include <grpc/support/log.h>
32 
35 #include "src/proto/grpc/testing/xds/ads_for_test.grpc.pb.h"
36 #include "src/proto/grpc/testing/xds/lrs_for_test.grpc.pb.h"
37 #include "src/proto/grpc/testing/xds/v3/ads.grpc.pb.h"
38 #include "src/proto/grpc/testing/xds/v3/cluster.grpc.pb.h"
39 #include "src/proto/grpc/testing/xds/v3/discovery.grpc.pb.h"
40 #include "src/proto/grpc/testing/xds/v3/discovery.pb.h"
41 #include "src/proto/grpc/testing/xds/v3/endpoint.grpc.pb.h"
42 #include "src/proto/grpc/testing/xds/v3/listener.grpc.pb.h"
43 #include "src/proto/grpc/testing/xds/v3/lrs.grpc.pb.h"
44 #include "src/proto/grpc/testing/xds/v3/route.grpc.pb.h"
47 
48 namespace grpc {
49 namespace testing {
50 
51 constexpr char kLdsTypeUrl[] =
52  "type.googleapis.com/envoy.config.listener.v3.Listener";
53 constexpr char kRdsTypeUrl[] =
54  "type.googleapis.com/envoy.config.route.v3.RouteConfiguration";
55 constexpr char kCdsTypeUrl[] =
56  "type.googleapis.com/envoy.config.cluster.v3.Cluster";
57 constexpr char kEdsTypeUrl[] =
58  "type.googleapis.com/envoy.config.endpoint.v3.ClusterLoadAssignment";
59 
60 constexpr char kLdsV2TypeUrl[] = "type.googleapis.com/envoy.api.v2.Listener";
61 constexpr char kRdsV2TypeUrl[] =
62  "type.googleapis.com/envoy.api.v2.RouteConfiguration";
63 constexpr char kCdsV2TypeUrl[] = "type.googleapis.com/envoy.api.v2.Cluster";
64 constexpr char kEdsV2TypeUrl[] =
65  "type.googleapis.com/envoy.api.v2.ClusterLoadAssignment";
66 
67 // An ADS service implementation.
68 class AdsServiceImpl : public std::enable_shared_from_this<AdsServiceImpl> {
69  public:
70  // State for a given xDS resource type.
71  struct ResponseState {
72  enum State {
73  ACKED, // ACK received.
74  NACKED, // NACK received; error_message will contain the error.
75  };
78  };
79 
81  : v2_rpc_service_(this, /*is_v2=*/true),
82  v3_rpc_service_(this, /*is_v2=*/false) {}
83 
84  bool seen_v2_client() const { return seen_v2_client_; }
85  bool seen_v3_client() const { return seen_v3_client_; }
86 
87  ::envoy::service::discovery::v2::AggregatedDiscoveryService::Service*
89  return &v2_rpc_service_;
90  }
91 
92  ::envoy::service::discovery::v3::AggregatedDiscoveryService::Service*
94  return &v3_rpc_service_;
95  }
96 
97  void set_wrap_resources(bool wrap_resources) {
99  wrap_resources_ = wrap_resources;
100  }
101 
102  // Sets a resource to a particular value, overwriting any previous value.
104  const std::string& name);
105 
106  // Removes a resource from the server's state.
107  void UnsetResource(const std::string& type_url, const std::string& name);
108 
110  google::protobuf::Any resource;
111  resource.PackFrom(listener);
112  SetResource(std::move(resource), kLdsTypeUrl, listener.name());
113  }
114 
116  const ::envoy::config::route::v3::RouteConfiguration& route) {
117  google::protobuf::Any resource;
118  resource.PackFrom(route);
119  SetResource(std::move(resource), kRdsTypeUrl, route.name());
120  }
121 
122  void SetCdsResource(const ::envoy::config::cluster::v3::Cluster& cluster) {
123  google::protobuf::Any resource;
124  resource.PackFrom(cluster);
125  SetResource(std::move(resource), kCdsTypeUrl, cluster.name());
126  }
127 
129  const ::envoy::config::endpoint::v3::ClusterLoadAssignment& assignment) {
130  google::protobuf::Any resource;
131  resource.PackFrom(assignment);
132  SetResource(std::move(resource), kEdsTypeUrl, assignment.cluster_name());
133  }
134 
135  // Tells the server to ignore requests from the client for a given
136  // resource type.
139  resource_types_to_ignore_.emplace(type_url);
140  }
141 
142  // Sets the minimum version that the server will accept for a given
143  // resource type. Will cause a gmock expectation failure if we see a
144  // lower version.
147  resource_type_min_versions_[type_url] = version;
148  }
149 
150  // Get the list of response state for each resource type.
153  if (resource_type_response_state_[type_url].empty()) {
154  return absl::nullopt;
155  }
156  auto response = resource_type_response_state_[type_url].front();
157  resource_type_response_state_[type_url].pop_front();
158  return response;
159  }
162  }
165  }
168  }
171  }
172 
173  // Starts the service.
174  void Start();
175 
176  // Shuts down the service.
177  void Shutdown();
178 
179  // Returns the peer names of clients currently connected to the service.
180  std::set<std::string> clients() {
182  return clients_;
183  }
184 
187  forced_ads_failure_ = std::move(status);
188  }
189 
190  private:
191  // A queue of resource type/name pairs that have changed since the client
192  // subscribed to them.
193  using UpdateQueue = std::deque<
194  std::pair<std::string /* type url */, std::string /* resource name */>>;
195 
196  // A struct representing a client's subscription to a particular resource.
198  // The queue upon which to place updates when the resource is updated.
200  };
201 
202  // A struct representing the a client's subscription to all the resources.
203  using SubscriptionNameMap =
204  std::map<std::string /* resource_name */, SubscriptionState>;
205  using SubscriptionMap =
207 
208  // Sent state for a given resource type.
209  struct SentState {
210  int nonce = 0;
212  };
213 
214  // A struct representing the current state for an individual resource.
215  struct ResourceState {
216  // The resource itself, if present.
218  // The resource type version that this resource was last updated in.
220  // A list of subscriptions to this resource.
221  std::set<SubscriptionState*> subscriptions;
222  };
223 
224  // The current state for all individual resources of a given type.
225  using ResourceNameMap =
226  std::map<std::string /* resource_name */, ResourceState>;
227 
231  };
232 
234 
235  // Templated RPC service implementation, works for both v2 and v3.
236  template <class RpcApi, class DiscoveryRequest, class DiscoveryResponse>
237  class RpcService : public RpcApi::Service {
238  public:
240 
241  RpcService(AdsServiceImpl* parent, bool is_v2)
242  : parent_(parent), is_v2_(is_v2) {}
243 
245  Stream* stream) override {
246  gpr_log(GPR_INFO, "ADS[%p]: StreamAggregatedResources starts", this);
247  {
249  if (parent_->forced_ads_failure_.has_value()) {
251  "ADS[%p]: StreamAggregatedResources forcing early failure "
252  "with status code: %d, message: %s",
253  this, parent_->forced_ads_failure_.value().error_code(),
254  parent_->forced_ads_failure_.value().error_message().c_str());
255  return parent_->forced_ads_failure_.value();
256  }
257  }
259  if (is_v2_) {
260  parent_->seen_v2_client_ = true;
261  } else {
262  parent_->seen_v3_client_ = true;
263  }
264  // Take a reference of the AdsServiceImpl object, which will go
265  // out of scope when this request handler returns. This ensures
266  // that the parent won't be destroyed until this stream is complete.
267  std::shared_ptr<AdsServiceImpl> ads_service_impl =
268  parent_->shared_from_this();
269  // Resources (type/name pairs) that have changed since the client
270  // subscribed to them.
271  UpdateQueue update_queue;
272  // Resources that the client will be subscribed to keyed by resource type
273  // url.
274  SubscriptionMap subscription_map;
275  // Sent state for each resource type.
276  std::map<std::string /*type_url*/, SentState> sent_state_map;
277  // Spawn a thread to read requests from the stream.
278  // Requests will be delivered to this thread in a queue.
279  std::deque<DiscoveryRequest> requests;
280  bool stream_closed = false;
282  &requests, &stream_closed));
283  // Main loop to process requests and updates.
284  while (true) {
285  // Boolean to keep track if the loop received any work to do: a
286  // request or an update; regardless whether a response was actually
287  // sent out.
288  bool did_work = false;
289  // Look for new requests and and decide what to handle.
291  {
293  // If the stream has been closed or our parent is being shut
294  // down, stop immediately.
295  if (stream_closed || parent_->ads_done_) break;
296  // Otherwise, see if there's a request to read from the queue.
297  if (!requests.empty()) {
298  DiscoveryRequest request = std::move(requests.front());
299  requests.pop_front();
300  did_work = true;
302  "ADS[%p]: Received request for type %s with content %s",
303  this, request.type_url().c_str(),
304  request.DebugString().c_str());
305  const std::string v3_resource_type =
306  TypeUrlToV3(request.type_url());
307  SentState& sent_state = sent_state_map[v3_resource_type];
308  // Process request.
309  ProcessRequest(request, v3_resource_type, &update_queue,
310  &subscription_map, &sent_state, &response);
311  }
312  }
313  if (response.has_value()) {
314  gpr_log(GPR_INFO, "ADS[%p]: Sending response: %s", this,
315  response->DebugString().c_str());
316  stream->Write(response.value());
317  }
318  response.reset();
319  // Look for updates and decide what to handle.
320  {
322  if (!update_queue.empty()) {
323  const std::string resource_type =
324  std::move(update_queue.front().first);
325  const std::string resource_name =
326  std::move(update_queue.front().second);
327  update_queue.pop_front();
328  did_work = true;
329  SentState& sent_state = sent_state_map[resource_type];
330  ProcessUpdate(resource_type, resource_name, &subscription_map,
331  &sent_state, &response);
332  }
333  }
334  if (response.has_value()) {
335  gpr_log(GPR_INFO, "ADS[%p]: Sending update response: %s", this,
336  response->DebugString().c_str());
337  stream->Write(response.value());
338  }
339  {
341  if (parent_->ads_done_) {
342  break;
343  }
344  }
345  // If we didn't find anything to do, delay before the next loop
346  // iteration; otherwise, check whether we should exit and then
347  // immediately continue.
349  grpc_timeout_milliseconds_to_deadline(did_work ? 0 : 10));
350  }
351  // Done with main loop. Clean up before returning.
352  // Join reader thread.
353  reader.join();
354  // Clean up any subscriptions that were still active when the call
355  // finished.
356  {
358  for (auto& p : subscription_map) {
359  const std::string& type_url = p.first;
360  SubscriptionNameMap& subscription_name_map = p.second;
361  for (auto& q : subscription_name_map) {
362  const std::string& resource_name = q.first;
363  SubscriptionState& subscription_state = q.second;
364  ResourceNameMap& resource_name_map =
365  parent_->resource_map_[type_url].resource_name_map;
366  ResourceState& resource_state = resource_name_map[resource_name];
367  resource_state.subscriptions.erase(&subscription_state);
368  }
369  }
370  }
371  gpr_log(GPR_INFO, "ADS[%p]: StreamAggregatedResources done", this);
373  return Status::OK;
374  }
375 
376  private:
377  // NB: clang's annotalysis is confused by the use of inner template
378  // classes here and *ignores* the exclusive lock annotation on some
379  // functions. See https://bugs.llvm.org/show_bug.cgi?id=51368.
380  //
381  // This class is used for a dual purpose:
382  // - it convinces clang that the lock is held in a given scope
383  // - when used in a function that is annotated to require the inner lock it
384  // will cause compilation to fail if the upstream bug is fixed!
385  //
386  // If you arrive here because of a compilation failure, that might mean the
387  // clang bug is fixed! Please report that on the ticket.
388  //
389  // Since the buggy compiler will still need to be supported, consider
390  // wrapping this class in a compiler version #if and replace its usage
391  // with a macro whose expansion is conditional on the compiler version. In
392  // time (years? decades?) this code can be deleted altogether.
394  public:
398  };
399  // Processes a response read from the client.
400  // Populates response if needed.
401  void ProcessRequest(const DiscoveryRequest& request,
402  const std::string& v3_resource_type,
403  UpdateQueue* update_queue,
404  SubscriptionMap* subscription_map,
405  SentState* sent_state,
408  NoopMutexLock mu(parent_->ads_mu_);
409  // Check the nonce sent by the client, if any.
410  // (This will be absent on the first request on a stream.)
411  if (request.response_nonce().empty()) {
412  int client_resource_type_version = 0;
413  if (!request.version_info().empty()) {
414  GPR_ASSERT(absl::SimpleAtoi(request.version_info(),
415  &client_resource_type_version));
416  }
417  EXPECT_GE(client_resource_type_version,
418  parent_->resource_type_min_versions_[v3_resource_type])
419  << "resource_type: " << v3_resource_type;
420  } else {
421  int client_nonce;
422  GPR_ASSERT(absl::SimpleAtoi(request.response_nonce(), &client_nonce));
423  // Check for ACK or NACK.
424  ResponseState response_state;
425  if (!request.has_error_detail()) {
426  response_state.state = ResponseState::ACKED;
427  gpr_log(GPR_INFO, "ADS[%p]: client ACKed resource_type=%s version=%s",
428  this, request.type_url().c_str(),
429  request.version_info().c_str());
430  } else {
431  response_state.state = ResponseState::NACKED;
432  EXPECT_EQ(request.error_detail().code(),
434  response_state.error_message = request.error_detail().message();
436  "ADS[%p]: client NACKed resource_type=%s version=%s: %s",
437  this, request.type_url().c_str(),
438  request.version_info().c_str(),
439  response_state.error_message.c_str());
440  }
441  parent_->resource_type_response_state_[v3_resource_type].emplace_back(
442  std::move(response_state));
443  // Ignore requests with stale nonces.
444  if (client_nonce < sent_state->nonce) return;
445  }
446  // Ignore resource types as requested by tests.
447  if (parent_->resource_types_to_ignore_.find(v3_resource_type) !=
448  parent_->resource_types_to_ignore_.end()) {
449  return;
450  }
451  // Look at all the resource names in the request.
452  auto& subscription_name_map = (*subscription_map)[v3_resource_type];
453  auto& resource_type_state = parent_->resource_map_[v3_resource_type];
454  auto& resource_name_map = resource_type_state.resource_name_map;
455  std::set<std::string> resources_in_current_request;
456  std::set<std::string> resources_added_to_response;
457  for (const std::string& resource_name : request.resource_names()) {
458  resources_in_current_request.emplace(resource_name);
459  auto& subscription_state = subscription_name_map[resource_name];
460  auto& resource_state = resource_name_map[resource_name];
461  // Subscribe if needed.
462  // Send the resource in the response if either (a) this is
463  // a new subscription or (b) there is an updated version of
464  // this resource to send.
465  if (parent_->MaybeSubscribe(v3_resource_type, resource_name,
466  &subscription_state, &resource_state,
467  update_queue) ||
468  ClientNeedsResourceUpdate(resource_type_state, resource_state,
469  sent_state->resource_type_version)) {
470  gpr_log(GPR_INFO, "ADS[%p]: Sending update for type=%s name=%s", this,
471  request.type_url().c_str(), resource_name.c_str());
472  resources_added_to_response.emplace(resource_name);
473  if (!response->has_value()) response->emplace();
474  if (resource_state.resource.has_value()) {
475  auto* resource = (*response)->add_resources();
476  resource->CopyFrom(resource_state.resource.value());
477  if (is_v2_) {
478  resource->set_type_url(request.type_url());
479  }
480  if (parent_->wrap_resources_) {
482  *resource_wrapper.mutable_resource() = std::move(*resource);
483  resource->PackFrom(resource_wrapper);
484  }
485  }
486  } else {
488  "ADS[%p]: client does not need update for type=%s name=%s",
489  this, request.type_url().c_str(), resource_name.c_str());
490  }
491  }
492  // Process unsubscriptions for any resource no longer
493  // present in the request's resource list.
495  v3_resource_type, resources_in_current_request,
496  &subscription_name_map, &resource_name_map);
497  // Construct response if needed.
498  if (!resources_added_to_response.empty()) {
500  v3_resource_type, request.type_url(),
501  resource_type_state.resource_type_version, subscription_name_map,
502  resources_added_to_response, sent_state, &response->value());
503  }
504  }
505 
506  // Processes a resource update from the test.
507  // Populates response if needed.
508  void ProcessUpdate(const std::string& resource_type,
509  const std::string& resource_name,
510  SubscriptionMap* subscription_map, SentState* sent_state,
513  NoopMutexLock mu(parent_->ads_mu_);
514  const std::string v2_resource_type = TypeUrlToV2(resource_type);
515  gpr_log(GPR_INFO, "ADS[%p]: Received update for type=%s name=%s", this,
516  resource_type.c_str(), resource_name.c_str());
517  auto& subscription_name_map = (*subscription_map)[resource_type];
518  auto& resource_type_state = parent_->resource_map_[resource_type];
519  auto& resource_name_map = resource_type_state.resource_name_map;
520  auto it = subscription_name_map.find(resource_name);
521  if (it != subscription_name_map.end()) {
522  ResourceState& resource_state = resource_name_map[resource_name];
523  if (ClientNeedsResourceUpdate(resource_type_state, resource_state,
524  sent_state->resource_type_version)) {
525  gpr_log(GPR_INFO, "ADS[%p]: Sending update for type=%s name=%s", this,
526  resource_type.c_str(), resource_name.c_str());
527  response->emplace();
528  if (resource_state.resource.has_value()) {
529  auto* resource = (*response)->add_resources();
530  resource->CopyFrom(resource_state.resource.value());
531  if (is_v2_) {
532  resource->set_type_url(v2_resource_type);
533  }
534  }
536  resource_type, v2_resource_type,
537  resource_type_state.resource_type_version, subscription_name_map,
538  {resource_name}, sent_state, &response->value());
539  }
540  }
541  }
542 
543  // Starting a thread to do blocking read on the stream until cancel.
544  void BlockingRead(Stream* stream, std::deque<DiscoveryRequest>* requests,
545  bool* stream_closed) {
546  DiscoveryRequest request;
547  bool seen_first_request = false;
548  while (stream->Read(&request)) {
549  if (!seen_first_request) {
550  EXPECT_TRUE(request.has_node());
551  EXPECT_THAT(request.node().client_features(),
553  "envoy.lb.does_not_support_overprovisioning",
554  "xds.config.resource-in-sotw"));
556  seen_first_request = true;
557  }
558  {
560  requests->emplace_back(std::move(request));
561  }
562  }
563  gpr_log(GPR_INFO, "ADS[%p]: Null read, stream closed", this);
565  *stream_closed = true;
566  }
567 
568  // Completing the building a DiscoveryResponse by adding common information
569  // for all resources and by adding all subscribed resources for LDS and CDS.
571  const std::string& resource_type, const std::string& v2_resource_type,
572  const int version, const SubscriptionNameMap& subscription_name_map,
573  const std::set<std::string>& resources_added_to_response,
574  SentState* sent_state, DiscoveryResponse* response)
576  NoopMutexLock mu(parent_->ads_mu_);
577  response->set_type_url(is_v2_ ? v2_resource_type : resource_type);
578  response->set_version_info(std::to_string(version));
579  response->set_nonce(std::to_string(++sent_state->nonce));
580  if (resource_type == kLdsTypeUrl || resource_type == kCdsTypeUrl) {
581  // For LDS and CDS we must send back all subscribed resources
582  // (even the unchanged ones)
583  for (const auto& p : subscription_name_map) {
584  const std::string& resource_name = p.first;
585  if (resources_added_to_response.find(resource_name) ==
586  resources_added_to_response.end()) {
587  ResourceNameMap& resource_name_map =
588  parent_->resource_map_[resource_type].resource_name_map;
589  const ResourceState& resource_state =
590  resource_name_map[resource_name];
591  if (resource_state.resource.has_value()) {
592  auto* resource = response->add_resources();
593  resource->CopyFrom(resource_state.resource.value());
594  if (is_v2_) {
595  resource->set_type_url(v2_resource_type);
596  }
597  }
598  }
599  }
600  }
601  sent_state->resource_type_version = version;
602  }
603 
604  static std::string TypeUrlToV2(const std::string& resource_type) {
605  if (resource_type == kLdsTypeUrl) return kLdsV2TypeUrl;
606  if (resource_type == kRdsTypeUrl) return kRdsV2TypeUrl;
607  if (resource_type == kCdsTypeUrl) return kCdsV2TypeUrl;
608  if (resource_type == kEdsTypeUrl) return kEdsV2TypeUrl;
609  return resource_type;
610  }
611 
612  static std::string TypeUrlToV3(const std::string& resource_type) {
613  if (resource_type == kLdsV2TypeUrl) return kLdsTypeUrl;
614  if (resource_type == kRdsV2TypeUrl) return kRdsTypeUrl;
615  if (resource_type == kCdsV2TypeUrl) return kCdsTypeUrl;
616  if (resource_type == kEdsV2TypeUrl) return kEdsTypeUrl;
617  return resource_type;
618  }
619 
620  static void CheckBuildVersion(
621  const ::envoy::api::v2::DiscoveryRequest& request) {
622  EXPECT_FALSE(request.node().build_version().empty());
623  }
624 
625  static void CheckBuildVersion(
626  const ::envoy::service::discovery::v3::DiscoveryRequest& /*request*/) {}
627 
629  const bool is_v2_;
630  };
631 
632  // Checks whether the client needs to receive a newer version of
633  // the resource.
634  static bool ClientNeedsResourceUpdate(
635  const ResourceTypeState& resource_type_state,
636  const ResourceState& resource_state, int client_resource_type_version);
637 
638  // Subscribes to a resource if not already subscribed:
639  // 1. Sets the update_queue field in subscription_state.
640  // 2. Adds subscription_state to resource_state->subscriptions.
641  bool MaybeSubscribe(const std::string& resource_type,
642  const std::string& resource_name,
643  SubscriptionState* subscription_state,
644  ResourceState* resource_state, UpdateQueue* update_queue);
645 
646  // Removes subscriptions for resources no longer present in the
647  // current request.
649  const std::string& resource_type,
650  const std::set<std::string>& resources_in_current_request,
651  SubscriptionNameMap* subscription_name_map,
652  ResourceNameMap* resource_name_map);
653 
654  void AddClient(const std::string& client) {
656  clients_.insert(client);
657  }
658 
661  clients_.erase(client);
662  }
663 
664  RpcService<::envoy::service::discovery::v2::AggregatedDiscoveryService,
665  ::envoy::api::v2::DiscoveryRequest,
666  ::envoy::api::v2::DiscoveryResponse>
668  RpcService<::envoy::service::discovery::v3::AggregatedDiscoveryService,
669  ::envoy::service::discovery::v3::DiscoveryRequest,
670  ::envoy::service::discovery::v3::DiscoveryResponse>
672 
673  std::atomic_bool seen_v2_client_{false};
674  std::atomic_bool seen_v3_client_{false};
675 
678  bool ads_done_ ABSL_GUARDED_BY(ads_mu_) = false;
679  std::map<std::string /* type_url */, std::deque<ResponseState>>
680  resource_type_response_state_ ABSL_GUARDED_BY(ads_mu_);
681  std::set<std::string /*resource_type*/> resource_types_to_ignore_
683  std::map<std::string /*resource_type*/, int> resource_type_min_versions_
685  // An instance data member containing the current state of all resources.
686  // Note that an entry will exist whenever either of the following is true:
687  // - The resource exists (i.e., has been created by SetResource() and has not
688  // yet been destroyed by UnsetResource()).
689  // - There is at least one subscription for the resource.
690  ResourceMap resource_map_ ABSL_GUARDED_BY(ads_mu_);
691  absl::optional<Status> forced_ads_failure_ ABSL_GUARDED_BY(ads_mu_);
692  bool wrap_resources_ ABSL_GUARDED_BY(ads_mu_) = false;
693 
695  std::set<std::string> clients_ ABSL_GUARDED_BY(clients_mu_);
696 };
697 
698 // An LRS service implementation.
699 class LrsServiceImpl : public std::enable_shared_from_this<LrsServiceImpl> {
700  public:
701  // Stats reported by client.
702  class ClientStats {
703  public:
704  // Stats for a given locality.
705  struct LocalityStats {
707 
708  // Converts from proto message class.
709  template <class UpstreamLocalityStats>
710  explicit LocalityStats(
711  const UpstreamLocalityStats& upstream_locality_stats)
713  upstream_locality_stats.total_successful_requests()),
715  upstream_locality_stats.total_requests_in_progress()),
717  upstream_locality_stats.total_error_requests()),
719  upstream_locality_stats.total_issued_requests()) {}
720 
726  return *this;
727  }
728 
733  };
734 
736 
737  // Converts from proto message class.
738  template <class ClusterStats>
739  explicit ClientStats(const ClusterStats& cluster_stats)
740  : cluster_name_(cluster_stats.cluster_name()),
741  eds_service_name_(cluster_stats.cluster_service_name()),
743  for (const auto& input_locality_stats :
744  cluster_stats.upstream_locality_stats()) {
745  locality_stats_.emplace(input_locality_stats.locality().sub_zone(),
746  LocalityStats(input_locality_stats));
747  }
748  for (const auto& input_dropped_requests :
749  cluster_stats.dropped_requests()) {
750  dropped_requests_.emplace(input_dropped_requests.category(),
751  input_dropped_requests.dropped_count());
752  }
753  }
754 
755  const std::string& cluster_name() const { return cluster_name_; }
756  const std::string& eds_service_name() const { return eds_service_name_; }
757 
758  const std::map<std::string, LocalityStats>& locality_stats() const {
759  return locality_stats_;
760  }
761 
766 
768 
769  uint64_t dropped_requests(const std::string& category) const;
770 
771  ClientStats& operator+=(const ClientStats& other);
772 
773  private:
776  std::map<std::string, LocalityStats> locality_stats_;
778  std::map<std::string, uint64_t> dropped_requests_;
779  };
780 
781  LrsServiceImpl(int client_load_reporting_interval_seconds,
782  std::set<std::string> cluster_names)
783  : v2_rpc_service_(this),
784  v3_rpc_service_(this),
786  client_load_reporting_interval_seconds),
787  cluster_names_(std::move(cluster_names)) {}
788 
789  ::envoy::service::load_stats::v2::LoadReportingService::Service*
791  return &v2_rpc_service_;
792  }
793 
794  ::envoy::service::load_stats::v3::LoadReportingService::Service*
796  return &v3_rpc_service_;
797  }
798 
799  size_t request_count() {
801  }
802 
803  size_t response_count() {
805  }
806 
807  // Must be called before the LRS call is started.
808  void set_send_all_clusters(bool send_all_clusters) {
809  send_all_clusters_ = send_all_clusters;
810  }
811  void set_cluster_names(const std::set<std::string>& cluster_names) {
812  cluster_names_ = cluster_names;
813  }
814 
816 
817  void Shutdown();
818 
819  std::vector<ClientStats> WaitForLoadReport();
820 
821  private:
822  // Templated RPC service implementation, works for both v2 and v3.
823  template <class RpcApi, class LoadStatsRequest, class LoadStatsResponse>
824  class RpcService : public CountedService<typename RpcApi::Service> {
825  public:
827 
828  explicit RpcService(LrsServiceImpl* parent) : parent_(parent) {}
829 
831  Stream* stream) override {
832  gpr_log(GPR_INFO, "LRS[%p]: StreamLoadStats starts", this);
833  EXPECT_GT(parent_->client_load_reporting_interval_seconds_, 0);
834  // Take a reference of the LrsServiceImpl object, reference will go
835  // out of scope after this method exits.
836  std::shared_ptr<LrsServiceImpl> lrs_service_impl =
837  parent_->shared_from_this();
838  // Read initial request.
839  LoadStatsRequest request;
840  if (stream->Read(&request)) {
842  // Verify client features.
843  EXPECT_THAT(
844  request.node().client_features(),
845  ::testing::Contains("envoy.lrs.supports_send_all_clusters"));
846  // Send initial response.
847  LoadStatsResponse response;
848  if (parent_->send_all_clusters_) {
849  response.set_send_all_clusters(true);
850  } else {
851  for (const std::string& cluster_name : parent_->cluster_names_) {
852  response.add_clusters(cluster_name);
853  }
854  }
855  response.mutable_load_reporting_interval()->set_seconds(
856  parent_->client_load_reporting_interval_seconds_);
857  stream->Write(response);
859  // Wait for report.
860  request.Clear();
861  while (stream->Read(&request)) {
862  gpr_log(GPR_INFO, "LRS[%p]: received client load report message: %s",
863  this, request.DebugString().c_str());
864  std::vector<ClientStats> stats;
865  for (const auto& cluster_stats : request.cluster_stats()) {
866  stats.emplace_back(cluster_stats);
867  }
868  grpc_core::MutexLock lock(&parent_->load_report_mu_);
869  parent_->result_queue_.emplace_back(std::move(stats));
870  if (parent_->load_report_cond_ != nullptr) {
871  parent_->load_report_cond_->Signal();
872  }
873  }
874  // Wait until notified done.
875  grpc_core::MutexLock lock(&parent_->lrs_mu_);
876  while (!parent_->lrs_done_) {
877  parent_->lrs_cv_.Wait(&parent_->lrs_mu_);
878  }
879  }
880  gpr_log(GPR_INFO, "LRS[%p]: StreamLoadStats done", this);
881  return Status::OK;
882  }
883 
884  private:
886  };
887 
888  RpcService<::envoy::service::load_stats::v2::LoadReportingService,
889  ::envoy::service::load_stats::v2::LoadStatsRequest,
890  ::envoy::service::load_stats::v2::LoadStatsResponse>
892  RpcService<::envoy::service::load_stats::v3::LoadReportingService,
893  ::envoy::service::load_stats::v3::LoadStatsRequest,
894  ::envoy::service::load_stats::v3::LoadStatsResponse>
896 
898  bool send_all_clusters_ = false;
899  std::set<std::string> cluster_names_;
900 
903  bool lrs_done_ ABSL_GUARDED_BY(lrs_mu_) = false;
904 
907  nullptr;
908  std::deque<std::vector<ClientStats>> result_queue_
910 };
911 
912 } // namespace testing
913 } // namespace grpc
914 
915 #endif // GRPC_TEST_CPP_END2END_XDS_XDS_SERVER_H
grpc::EXPECT_THAT
EXPECT_THAT(status.error_message(), ::testing::HasSubstr("subject_token_type"))
EXPECT_FALSE
#define EXPECT_FALSE(condition)
Definition: bloaty/third_party/googletest/googletest/include/gtest/gtest.h:1970
grpc::testing::LrsServiceImpl::ClientStats::locality_stats_
std::map< std::string, LocalityStats > locality_stats_
Definition: xds_server.h:776
grpc::testing::LrsServiceImpl::ClientStats::LocalityStats::total_successful_requests
uint64_t total_successful_requests
Definition: xds_server.h:729
grpc::testing::CountedService::request_count
size_t request_count()
Definition: counted_service.h:30
grpc::testing::AdsServiceImpl::RpcService
Definition: xds_server.h:237
grpc::ClientContext::peer
std::string peer() const
Definition: client_context.cc:174
Any
struct Any Any
Definition: bloaty/third_party/protobuf/php/ext/google/protobuf/protobuf.h:633
grpc::testing::AdsServiceImpl::ResponseState::State
State
Definition: xds_server.h:72
GPR_INFO
#define GPR_INFO
Definition: include/grpc/impl/codegen/log.h:56
testing
Definition: aws_request_signer_test.cc:25
grpc::status
auto status
Definition: cpp/client/credentials_test.cc:200
grpc::testing::LrsServiceImpl::v2_rpc_service_
RpcService<::envoy::service::load_stats::v2::LoadReportingService, ::envoy::service::load_stats::v2::LoadStatsRequest, ::envoy::service::load_stats::v2::LoadStatsResponse > v2_rpc_service_
Definition: xds_server.h:891
grpc_core::CondVar
Definition: src/core/lib/gprpp/sync.h:126
grpc::testing::AdsServiceImpl::SetResource
void SetResource(google::protobuf::Any resource, const std::string &type_url, const std::string &name)
Definition: xds_server.cc:47
regen-readme.it
it
Definition: regen-readme.py:15
grpc::testing::AdsServiceImpl::SubscriptionState::update_queue
UpdateQueue * update_queue
Definition: xds_server.h:199
grpc::ServerContext
Definition: grpcpp/impl/codegen/server_context.h:566
log.h
grpc::testing::LrsServiceImpl::RpcService::parent_
LrsServiceImpl * parent_
Definition: xds_server.h:885
grpc::testing::LrsServiceImpl::lrs_cv_
grpc_core::CondVar lrs_cv_
Definition: xds_server.h:901
grpc::testing::LrsServiceImpl::ClientStats::LocalityStats::operator+=
LocalityStats & operator+=(const LocalityStats &other)
Definition: xds_server.h:721
grpc::testing::LrsServiceImpl::ClientStats
Definition: xds_server.h:702
grpc::testing::AdsServiceImpl::ads_mu_
grpc_core::Mutex ads_mu_
Definition: xds_server.h:677
grpc::testing::kCdsTypeUrl
constexpr char kCdsTypeUrl[]
Definition: xds_server.h:55
grpc::testing::AdsServiceImpl::SubscriptionMap
std::map< std::string, SubscriptionNameMap > SubscriptionMap
Definition: xds_server.h:206
grpc::testing::LrsServiceImpl::send_all_clusters_
bool send_all_clusters_
Definition: xds_server.h:898
grpc
Definition: grpcpp/alarm.h:33
clients_
std::set< std::string > clients_
Definition: client_lb_end2end_test.cc:137
grpc::testing::AdsServiceImpl::ClientNeedsResourceUpdate
static bool ClientNeedsResourceUpdate(const ResourceTypeState &resource_type_state, const ResourceState &resource_state, int client_resource_type_version)
Definition: xds_server.cc:86
route
XdsRouteConfigResource::Route route
Definition: xds_resolver.cc:337
grpc::testing::kRdsTypeUrl
constexpr char kRdsTypeUrl[]
Definition: xds_server.h:53
grpc::testing::AdsServiceImpl::GetResponseState
absl::optional< ResponseState > GetResponseState(const std::string &type_url)
Definition: xds_server.h:151
grpc::testing::LrsServiceImpl::client_load_reporting_interval_seconds_
const int client_load_reporting_interval_seconds_
Definition: xds_server.h:897
false
#define false
Definition: setup_once.h:323
grpc::testing::LrsServiceImpl::ClientStats::operator+=
ClientStats & operator+=(const ClientStats &other)
Definition: xds_server.cc:203
Listener
::grpc_event_engine::experimental::EventEngine::Listener Listener
Definition: event_engine_test_utils.cc:42
grpc::testing::LrsServiceImpl::ClientStats::ClientStats
ClientStats()
Definition: xds_server.h:735
grpc::testing::AdsServiceImpl::RpcService::RpcService
RpcService(AdsServiceImpl *parent, bool is_v2)
Definition: xds_server.h:241
cluster_name
std::string cluster_name
Definition: xds_cluster_resolver.cc:91
client
Definition: examples/python/async_streaming/client.py:1
grpc_core::MutexLock
Definition: src/core/lib/gprpp/sync.h:88
grpc::testing::LrsServiceImpl
Definition: xds_server.h:699
grpc::testing::LrsServiceImpl::ClientStats::total_successful_requests
uint64_t total_successful_requests() const
Definition: xds_server.cc:164
benchmark.request
request
Definition: benchmark.py:77
EXPECT_GT
#define EXPECT_GT(val1, val2)
Definition: bloaty/third_party/googletest/googletest/include/gtest/gtest.h:2036
grpc::testing::AdsServiceImpl::seen_v3_client_
std::atomic_bool seen_v3_client_
Definition: xds_server.h:674
testing::internal::string
::std::string string
Definition: bloaty/third_party/protobuf/third_party/googletest/googletest/include/gtest/internal/gtest-port.h:881
grpc::testing::AdsServiceImpl::RpcService::ProcessUpdate
void ProcessUpdate(const std::string &resource_type, const std::string &resource_name, SubscriptionMap *subscription_map, SentState *sent_state, absl::optional< DiscoveryResponse > *response) ABSL_EXCLUSIVE_LOCKS_REQUIRED(parent_ -> ads_mu_)
Definition: xds_server.h:508
grpc::testing::LrsServiceImpl::request_count
size_t request_count()
Definition: xds_server.h:799
grpc::testing::LrsServiceImpl::ClientStats::total_requests_in_progress
uint64_t total_requests_in_progress() const
Definition: xds_server.cc:172
grpc::testing::AdsServiceImpl::RpcService::CompleteBuildingDiscoveryResponse
void CompleteBuildingDiscoveryResponse(const std::string &resource_type, const std::string &v2_resource_type, const int version, const SubscriptionNameMap &subscription_name_map, const std::set< std::string > &resources_added_to_response, SentState *sent_state, DiscoveryResponse *response) ABSL_EXCLUSIVE_LOCKS_REQUIRED(parent_ -> ads_mu_)
Definition: xds_server.h:570
grpc::ServerReaderWriter
Definition: grpcpp/impl/codegen/sync_stream.h:786
grpc::testing::AdsServiceImpl::RpcService::StreamAggregatedResources
Status StreamAggregatedResources(ServerContext *context, Stream *stream) override
Definition: xds_server.h:244
grpc::testing::LrsServiceImpl::WaitForLoadReport
std::vector< ClientStats > WaitForLoadReport()
Definition: xds_server.cc:241
setup.name
name
Definition: setup.py:542
grpc::testing::AdsServiceImpl::seen_v2_client
bool seen_v2_client() const
Definition: xds_server.h:84
version
Definition: version.py:1
grpc::testing::LrsServiceImpl::ClientStats::LocalityStats::total_error_requests
uint64_t total_error_requests
Definition: xds_server.h:731
grpc::testing::AdsServiceImpl::ads_cond_
grpc_core::CondVar ads_cond_
Definition: xds_server.h:676
grpc::testing::AdsServiceImpl::SentState::resource_type_version
int resource_type_version
Definition: xds_server.h:211
grpc::testing::AdsServiceImpl::ResponseState::error_message
std::string error_message
Definition: xds_server.h:77
grpc::testing::LrsServiceImpl::ClientStats::LocalityStats
Definition: xds_server.h:705
load_report_cond_
grpc::internal::CondVar load_report_cond_
Definition: grpclb_end2end_test.cc:340
GRPC_STATUS_INVALID_ARGUMENT
@ GRPC_STATUS_INVALID_ARGUMENT
Definition: include/grpc/impl/codegen/status.h:46
map
zval * map
Definition: php/ext/google/protobuf/encode_decode.c:480
grpc::testing::kCdsV2TypeUrl
constexpr char kCdsV2TypeUrl[]
Definition: xds_server.h:63
grpc::testing::CountedService::IncreaseResponseCount
void IncreaseResponseCount()
Definition: counted_service.h:40
grpc::testing::AdsServiceImpl::ProcessUnsubscriptions
void ProcessUnsubscriptions(const std::string &resource_type, const std::set< std::string > &resources_in_current_request, SubscriptionNameMap *subscription_name_map, ResourceNameMap *resource_name_map)
Definition: xds_server.cc:115
grpc::testing::AdsServiceImpl::ResponseState::NACKED
@ NACKED
Definition: xds_server.h:74
true
#define true
Definition: setup_once.h:324
grpc::testing::AdsServiceImpl::RpcService::CheckBuildVersion
static void CheckBuildVersion(const ::envoy::api::v2::DiscoveryRequest &request)
Definition: xds_server.h:620
grpc::testing::LrsServiceImpl::ClientStats::LocalityStats::LocalityStats
LocalityStats(const UpstreamLocalityStats &upstream_locality_stats)
Definition: xds_server.h:710
grpc::testing::AdsServiceImpl::eds_response_state
absl::optional< ResponseState > eds_response_state()
Definition: xds_server.h:169
grpc::Service
Desriptor of an RPC service and its various RPC methods.
Definition: grpcpp/impl/codegen/service_type.h:58
grpc::testing::LrsServiceImpl::ClientStats::dropped_requests_
std::map< std::string, uint64_t > dropped_requests_
Definition: xds_server.h:778
grpc::testing::LrsServiceImpl::ClientStats::total_dropped_requests_
uint64_t total_dropped_requests_
Definition: xds_server.h:777
grpc::testing::LrsServiceImpl::ClientStats::cluster_name
const std::string & cluster_name() const
Definition: xds_server.h:755
grpc::testing::AdsServiceImpl::MaybeSubscribe
bool MaybeSubscribe(const std::string &resource_type, const std::string &resource_name, SubscriptionState *subscription_state, ResourceState *resource_state, UpdateQueue *update_queue)
Definition: xds_server.cc:98
grpc::testing::AdsServiceImpl::seen_v2_client_
std::atomic_bool seen_v2_client_
Definition: xds_server.h:673
grpc::testing::kEdsTypeUrl
constexpr char kEdsTypeUrl[]
Definition: xds_server.h:57
grpc::testing::AdsServiceImpl::set_wrap_resources
void set_wrap_resources(bool wrap_resources)
Definition: xds_server.h:97
grpc::testing::LrsServiceImpl::ClientStats::LocalityStats::total_issued_requests
uint64_t total_issued_requests
Definition: xds_server.h:732
grpc::testing::AdsServiceImpl::clients_mu_
grpc_core::Mutex clients_mu_
Definition: xds_server.h:694
grpc::testing::AdsServiceImpl::Start
void Start()
Definition: xds_server.cc:143
grpc::testing::AdsServiceImpl::RemoveClient
void RemoveClient(const std::string &client)
Definition: xds_server.h:659
grpc::testing::AdsServiceImpl::SubscriptionState
Definition: xds_server.h:197
grpc::testing::kEdsV2TypeUrl
constexpr char kEdsV2TypeUrl[]
Definition: xds_server.h:64
grpc::testing::LrsServiceImpl::response_count
size_t response_count()
Definition: xds_server.h:803
absl::SimpleAtoi
ABSL_NAMESPACE_BEGIN ABSL_MUST_USE_RESULT bool SimpleAtoi(absl::string_view str, int_type *out)
Definition: abseil-cpp/absl/strings/numbers.h:271
grpc::testing::AdsServiceImpl::SetCdsResource
void SetCdsResource(const ::envoy::config::cluster::v3::Cluster &cluster)
Definition: xds_server.h:122
parse_address.h
grpc::testing::LrsServiceImpl::RpcService::StreamLoadStats
Status StreamLoadStats(ServerContext *, Stream *stream) override
Definition: xds_server.h:830
grpc::testing::AdsServiceImpl::ResourceState
Definition: xds_server.h:215
grpc::testing::LrsServiceImpl::set_cluster_names
void set_cluster_names(const std::set< std::string > &cluster_names)
Definition: xds_server.h:811
absl::move
constexpr absl::remove_reference_t< T > && move(T &&t) noexcept
Definition: abseil-cpp/absl/utility/utility.h:221
GPR_ASSERT
#define GPR_ASSERT(x)
Definition: include/grpc/impl/codegen/log.h:94
grpc::testing::AdsServiceImpl::ResourceState::resource_type_version
int resource_type_version
Definition: xds_server.h:219
grpc::testing::AdsServiceImpl::RpcService::TypeUrlToV3
static std::string TypeUrlToV3(const std::string &resource_type)
Definition: xds_server.h:612
cluster
absl::string_view cluster
Definition: xds_resolver.cc:331
absl::optional::has_value
constexpr bool has_value() const noexcept
Definition: abseil-cpp/absl/types/optional.h:461
grpc::testing::AdsServiceImpl::AdsServiceImpl
AdsServiceImpl()
Definition: xds_server.h:80
grpc::testing::LrsServiceImpl::LrsServiceImpl
LrsServiceImpl(int client_load_reporting_interval_seconds, std::set< std::string > cluster_names)
Definition: xds_server.h:781
grpc_timeout_milliseconds_to_deadline
gpr_timespec grpc_timeout_milliseconds_to_deadline(int64_t time_ms)
Definition: test/core/util/test_config.cc:89
gpr_log
GPRAPI void gpr_log(const char *file, int line, gpr_log_severity severity, const char *format,...) GPR_PRINT_FORMAT_CHECK(4
grpc::testing::AdsServiceImpl::Shutdown
void Shutdown()
Definition: xds_server.cc:148
ABSL_EXCLUSIVE_LOCKS_REQUIRED
#define ABSL_EXCLUSIVE_LOCKS_REQUIRED(...)
Definition: abseil-cpp/absl/base/thread_annotations.h:145
grpc::testing::AdsServiceImpl::ResourceMap
std::map< std::string, ResourceTypeState > ResourceMap
Definition: xds_server.h:233
grpc::testing::AdsServiceImpl::RpcService::TypeUrlToV2
static std::string TypeUrlToV2(const std::string &resource_type)
Definition: xds_server.h:604
gpr_sleep_until
GPRAPI void gpr_sleep_until(gpr_timespec until)
conf.version
string version
Definition: doc/python/sphinx/conf.py:36
grpc::testing::LrsServiceImpl::ClientStats::LocalityStats::total_requests_in_progress
uint64_t total_requests_in_progress
Definition: xds_server.h:730
grpc::testing::LrsServiceImpl::ClientStats::ClientStats
ClientStats(const ClusterStats &cluster_stats)
Definition: xds_server.h:739
grpc::testing::AdsServiceImpl::cds_response_state
absl::optional< ResponseState > cds_response_state()
Definition: xds_server.h:166
grpc::testing::LrsServiceImpl::ClientStats::cluster_name_
std::string cluster_name_
Definition: xds_server.h:774
gen_stats_data.stats
list stats
Definition: gen_stats_data.py:58
grpc::testing::AdsServiceImpl::v3_rpc_service
::envoy::service::discovery::v3::AggregatedDiscoveryService::Service * v3_rpc_service()
Definition: xds_server.h:93
uint64_t
unsigned __int64 uint64_t
Definition: stdint-msvc2008.h:90
grpc::testing::AdsServiceImpl::UnsetResource
void UnsetResource(const std::string &type_url, const std::string &name)
Definition: xds_server.cc:66
grpc::testing::LrsServiceImpl::ClientStats::eds_service_name
const std::string & eds_service_name() const
Definition: xds_server.h:756
grpc::testing::LrsServiceImpl::Start
void Start() ABSL_LOCKS_EXCLUDED(lrs_mu_
Definition: xds_server.cc:219
absl::optional
Definition: abseil-cpp/absl/types/internal/optional.h:61
grpc::testing::AdsServiceImpl::ResponseState::state
State state
Definition: xds_server.h:76
grpc::testing::AdsServiceImpl::SetResourceMinVersion
void SetResourceMinVersion(const std::string &type_url, int version)
Definition: xds_server.h:145
grpc::testing::AdsServiceImpl::UpdateQueue
std::deque< std::pair< std::string, std::string > > UpdateQueue
Definition: xds_server.h:194
grpc::testing::LrsServiceImpl::RpcService
Definition: xds_server.h:824
grpc::testing::kLdsTypeUrl
constexpr char kLdsTypeUrl[]
Definition: xds_server.h:51
grpc::testing::LrsServiceImpl::v2_rpc_service
::envoy::service::load_stats::v2::LoadReportingService::Service * v2_rpc_service()
Definition: xds_server.h:790
grpc::testing::AdsServiceImpl::RpcService::NoopMutexLock
Definition: xds_server.h:393
grpc::testing::AdsServiceImpl
Definition: xds_server.h:68
grpc::Status::OK
static const Status & OK
An OK pre-defined instance.
Definition: include/grpcpp/impl/codegen/status.h:113
grpc::testing::LrsServiceImpl::v3_rpc_service_
RpcService<::envoy::service::load_stats::v3::LoadReportingService, ::envoy::service::load_stats::v3::LoadStatsRequest, ::envoy::service::load_stats::v3::LoadStatsResponse > v3_rpc_service_
Definition: xds_server.h:895
grpc::testing::AdsServiceImpl::RpcService::is_v2_
const bool is_v2_
Definition: xds_server.h:629
grpc::testing::LrsServiceImpl::ClientStats::total_issued_requests
uint64_t total_issued_requests() const
Definition: xds_server.cc:188
grpc::testing::AdsServiceImpl::ResourceNameMap
std::map< std::string, ResourceState > ResourceNameMap
Definition: xds_server.h:226
grpc::testing::LrsServiceImpl::set_send_all_clusters
void set_send_all_clusters(bool send_all_clusters)
Definition: xds_server.h:808
grpc::testing::LrsServiceImpl::ClientStats::total_dropped_requests
uint64_t total_dropped_requests() const
Definition: xds_server.h:767
grpc::testing::CountedService::response_count
size_t response_count()
Definition: counted_service.h:35
grpc::testing::AdsServiceImpl::ABSL_GUARDED_BY
bool ads_done_ ABSL_GUARDED_BY(ads_mu_)
grpc::testing::LrsServiceImpl::ClientStats::eds_service_name_
std::string eds_service_name_
Definition: xds_server.h:775
grpc::testing::CountedService::IncreaseRequestCount
void IncreaseRequestCount()
Definition: counted_service.h:44
google_benchmark.example.empty
def empty(state)
Definition: example.py:31
grpc::testing::AdsServiceImpl::RpcService::parent_
AdsServiceImpl * parent_
Definition: xds_server.h:628
test_config.h
grpc::testing::AdsServiceImpl::v2_rpc_service
::envoy::service::discovery::v2::AggregatedDiscoveryService::Service * v2_rpc_service()
Definition: xds_server.h:88
grpc::testing::LrsServiceImpl::load_report_mu_
grpc_core::Mutex load_report_mu_
Definition: xds_server.h:905
grpc::testing::AdsServiceImpl::ResourceState::resource
absl::optional< google::protobuf::Any > resource
Definition: xds_server.h:217
grpc::testing::AdsServiceImpl::SubscriptionNameMap
std::map< std::string, SubscriptionState > SubscriptionNameMap
Definition: xds_server.h:204
grpc::testing::AdsServiceImpl::SentState::nonce
int nonce
Definition: xds_server.h:210
absl::optional::value
constexpr const T & value() const &
Definition: abseil-cpp/absl/types/optional.h:475
grpc::testing::AdsServiceImpl::ResponseState::ACKED
@ ACKED
Definition: xds_server.h:73
grpc_core::Mutex
Definition: src/core/lib/gprpp/sync.h:61
grpc::testing::AdsServiceImpl::RpcService::BlockingRead
void BlockingRead(Stream *stream, std::deque< DiscoveryRequest > *requests, bool *stream_closed)
Definition: xds_server.h:544
grpc::testing::LrsServiceImpl::ClientStats::locality_stats
const std::map< std::string, LocalityStats > & locality_stats() const
Definition: xds_server.h:758
grpc::testing::AdsServiceImpl::rds_response_state
absl::optional< ResponseState > rds_response_state()
Definition: xds_server.h:163
ABSL_EXCLUSIVE_LOCK_FUNCTION
#define ABSL_EXCLUSIVE_LOCK_FUNCTION(...)
Definition: abseil-cpp/absl/base/thread_annotations.h:207
grpc::testing::AdsServiceImpl::SentState
Definition: xds_server.h:209
grpc::testing::AdsServiceImpl::RpcService::NoopMutexLock::~NoopMutexLock
~NoopMutexLock() ABSL_UNLOCK_FUNCTION()
Definition: xds_server.h:397
grpc::testing::AdsServiceImpl::ResourceTypeState
Definition: xds_server.h:228
grpc::testing::AdsServiceImpl::RpcService::NoopMutexLock::NoopMutexLock
NoopMutexLock(grpc_core::Mutex &mu) ABSL_EXCLUSIVE_LOCK_FUNCTION(mu)
Definition: xds_server.h:395
ABSL_LOCKS_EXCLUDED
#define ABSL_LOCKS_EXCLUDED(...)
Definition: abseil-cpp/absl/base/thread_annotations.h:163
grpc::testing::AdsServiceImpl::lds_response_state
absl::optional< ResponseState > lds_response_state()
Definition: xds_server.h:160
grpc::testing::AdsServiceImpl::SetRdsResource
void SetRdsResource(const ::envoy::config::route::v3::RouteConfiguration &route)
Definition: xds_server.h:115
opencensus.proto.resource.v1.resource_pb2.Resource
Resource
Definition: resource_pb2.py:107
grpc::testing::AdsServiceImpl::AddClient
void AddClient(const std::string &client)
Definition: xds_server.h:654
grpc::testing::kRdsV2TypeUrl
constexpr char kRdsV2TypeUrl[]
Definition: xds_server.h:61
asyncio_get_stats.response
response
Definition: asyncio_get_stats.py:28
std
Definition: grpcpp/impl/codegen/async_unary_call.h:407
grpc::testing::LrsServiceImpl::ClientStats::total_error_requests
uint64_t total_error_requests() const
Definition: xds_server.cc:180
grpc::testing::CountedService
Definition: counted_service.h:28
cpp.gmock_class.set
set
Definition: bloaty/third_party/googletest/googlemock/scripts/generator/cpp/gmock_class.py:44
testing::UnorderedElementsAre
internal::UnorderedElementsAreMatcher< ::testing::tuple<> > UnorderedElementsAre()
Definition: cares/cares/test/gmock-1.8.0/gmock/gmock.h:13255
type_url
string * type_url
Definition: bloaty/third_party/protobuf/conformance/conformance_cpp.cc:72
grpc::Status
Definition: include/grpcpp/impl/codegen/status.h:35
grpc::testing::AdsServiceImpl::v3_rpc_service_
RpcService<::envoy::service::discovery::v3::AggregatedDiscoveryService, ::envoy::service::discovery::v3::DiscoveryRequest, ::envoy::service::discovery::v3::DiscoveryResponse > v3_rpc_service_
Definition: xds_server.h:671
state
Definition: bloaty/third_party/zlib/contrib/blast/blast.c:41
grpc::testing::EXPECT_EQ
EXPECT_EQ(options.token_exchange_service_uri, "https://foo/exchange")
grpc::testing::LrsServiceImpl::ClientStats::LocalityStats::LocalityStats
LocalityStats()
Definition: xds_server.h:706
grpc::testing::LrsServiceImpl::load_report_mu_
void load_report_mu_
Definition: xds_server.h:815
Any::PackFrom
void PackFrom(const ::PROTOBUF_NAMESPACE_ID::Message &message)
Definition: bloaty/third_party/protobuf/src/google/protobuf/any.pb.cc:88
EXPECT_GE
#define EXPECT_GE(val1, val2)
Definition: bloaty/third_party/googletest/googletest/include/gtest/gtest.h:2034
grpc::testing::kLdsV2TypeUrl
constexpr char kLdsV2TypeUrl[]
Definition: xds_server.h:60
grpc::testing::AdsServiceImpl::ForceADSFailure
void ForceADSFailure(Status status)
Definition: xds_server.h:185
grpc::testing::AdsServiceImpl::RpcService::CheckBuildVersion
static void CheckBuildVersion(const ::envoy::service::discovery::v3::DiscoveryRequest &)
Definition: xds_server.h:625
grpc::testing::AdsServiceImpl::IgnoreResourceType
void IgnoreResourceType(const std::string &type_url)
Definition: xds_server.h:137
grpc::testing::AdsServiceImpl::ResourceTypeState::resource_type_version
int resource_type_version
Definition: xds_server.h:229
grpc::testing::LrsServiceImpl::ClientStats::dropped_requests
uint64_t dropped_requests(const std::string &category) const
Definition: xds_server.cc:196
context
grpc::ClientContext context
Definition: istio_echo_server_lib.cc:61
grpc::testing::AdsServiceImpl::ResourceState::subscriptions
std::set< SubscriptionState * > subscriptions
Definition: xds_server.h:221
grpc::testing::EXPECT_TRUE
EXPECT_TRUE(grpc::experimental::StsCredentialsOptionsFromJson(minimum_valid_json, &options) .ok())
grpc::testing::AdsServiceImpl::v2_rpc_service_
RpcService<::envoy::service::discovery::v2::AggregatedDiscoveryService, ::envoy::api::v2::DiscoveryRequest, ::envoy::api::v2::DiscoveryResponse > v2_rpc_service_
Definition: xds_server.h:667
setup.template
template
Definition: setup.py:47
parent_
RefCountedPtr< GrpcLb > parent_
Definition: grpclb.cc:438
grpc::testing::LrsServiceImpl::RpcService::RpcService
RpcService(LrsServiceImpl *parent)
Definition: xds_server.h:828
grpc::testing::AdsServiceImpl::clients
std::set< std::string > clients()
Definition: xds_server.h:180
grpc::testing::LrsServiceImpl::lrs_mu_
grpc_core::Mutex lrs_mu_
Definition: xds_server.h:902
grpc::testing::AdsServiceImpl::seen_v3_client
bool seen_v3_client() const
Definition: xds_server.h:85
ABSL_UNLOCK_FUNCTION
#define ABSL_UNLOCK_FUNCTION(...)
Definition: abseil-cpp/absl/base/thread_annotations.h:228
grpc::testing::LrsServiceImpl::cluster_names_
std::set< std::string > cluster_names_
Definition: xds_server.h:899
to_string
static bool to_string(zval *from)
Definition: protobuf/php/ext/google/protobuf/convert.c:333
grpc::testing::AdsServiceImpl::SetEdsResource
void SetEdsResource(const ::envoy::config::endpoint::v3::ClusterLoadAssignment &assignment)
Definition: xds_server.h:128
grpc::testing::LrsServiceImpl::ABSL_GUARDED_BY
bool lrs_done_ ABSL_GUARDED_BY(lrs_mu_)
testing::Contains
internal::ContainsMatcher< M > Contains(M matcher)
Definition: cares/cares/test/gmock-1.8.0/gmock/gmock.h:9101
pair
std::pair< std::string, std::string > pair
Definition: abseil-cpp/absl/container/internal/raw_hash_set_benchmark.cc:78
grpc::testing::AdsServiceImpl::ResourceTypeState::resource_name_map
ResourceNameMap resource_name_map
Definition: xds_server.h:230
counted_service.h
grpc::testing::AdsServiceImpl::ResponseState
Definition: xds_server.h:71
thread
static uv_thread_t thread
Definition: test-async-null-cb.c:29
grpc::testing::AdsServiceImpl::SetLdsResource
void SetLdsResource(const ::envoy::config::listener::v3::Listener &listener)
Definition: xds_server.h:109
sync.h
reader
void reader(void *n)
Definition: libuv/docs/code/locks/main.c:8
grpc::testing::LrsServiceImpl::Shutdown
void Shutdown()
Definition: xds_server.cc:230
ABSL_SCOPED_LOCKABLE
#define ABSL_SCOPED_LOCKABLE
Definition: abseil-cpp/absl/base/thread_annotations.h:196
grpc::testing::LrsServiceImpl::v3_rpc_service
::envoy::service::load_stats::v3::LoadReportingService::Service * v3_rpc_service()
Definition: xds_server.h:795
grpc::testing::mu
static gpr_mu mu
Definition: bm_cq.cc:162
grpc::testing::AdsServiceImpl::RpcService::ProcessRequest
void ProcessRequest(const DiscoveryRequest &request, const std::string &v3_resource_type, UpdateQueue *update_queue, SubscriptionMap *subscription_map, SentState *sent_state, absl::optional< DiscoveryResponse > *response) ABSL_EXCLUSIVE_LOCKS_REQUIRED(parent_ -> ads_mu_)
Definition: xds_server.h:401
stream
voidpf stream
Definition: bloaty/third_party/zlib/contrib/minizip/ioapi.h:136


grpc
Author(s):
autogenerated on Thu Mar 13 2025 03:01:57