load_reporter.h
Go to the documentation of this file.
1 /*
2  *
3  * Copyright 2018 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
19 #ifndef GRPC_SRC_CPP_SERVER_LOAD_REPORTER_LOAD_REPORTER_H
20 #define GRPC_SRC_CPP_SERVER_LOAD_REPORTER_LOAD_REPORTER_H
21 
23 
24 #include <stddef.h>
25 #include <stdint.h>
26 
27 #include <atomic>
28 #include <chrono>
29 #include <cstdint>
30 #include <deque>
31 #include <memory>
32 #include <string>
33 #include <unordered_map>
34 #include <utility>
35 #include <vector>
36 
38 
39 #include "opencensus/stats/stats.h"
40 #include "opencensus/tags/tag_key.h"
41 
42 #include <grpcpp/support/config.h>
43 
46 #include "src/proto/grpc/lb/v1/load_reporter.pb.h"
47 
48 namespace grpc {
49 namespace load_reporter {
50 
51 // The interface to get the Census stats. Abstracted for mocking.
53  public:
54  // Maps from the view name to the view data.
55  using ViewDataMap =
56  std::unordered_map<std::string, ::opencensus::stats::ViewData>;
57  // Maps from the view name to the view descriptor.
58  using ViewDescriptorMap =
59  std::unordered_map<std::string, ::opencensus::stats::ViewDescriptor>;
60 
62  virtual ~CensusViewProvider() = default;
63 
64  // Fetches the view data accumulated since last fetching, and returns it as a
65  // map from the view name to the view data.
66  virtual ViewDataMap FetchViewData() = 0;
67 
68  // A helper function that gets a row with the input tag values from the view
69  // data. Only used when we know that row must exist because we have seen a row
70  // with the same tag values in a related view data. Several ViewData's are
71  // considered related if their views are based on the measures that are always
72  // recorded at the same time.
73  static double GetRelatedViewDataRowDouble(
74  const ViewDataMap& view_data_map, const char* view_name,
75  size_t view_name_len, const std::vector<std::string>& tag_values);
77  const ViewDataMap& view_data_map, const char* view_name,
78  size_t view_name_len, const std::vector<std::string>& tag_values);
79 
80  protected:
82  return view_descriptor_map_;
83  }
84 
85  private:
87  // Tag keys.
88  ::opencensus::tags::TagKey tag_key_token_;
89  ::opencensus::tags::TagKey tag_key_host_;
90  ::opencensus::tags::TagKey tag_key_user_id_;
91  ::opencensus::tags::TagKey tag_key_status_;
92  ::opencensus::tags::TagKey tag_key_metric_name_;
93 };
94 
95 // The default implementation fetches the real stats from Census.
97  public:
99 
100  ViewDataMap FetchViewData() override;
101 
102  private:
103  std::unordered_map<std::string, ::opencensus::stats::View> view_map_;
104 };
105 
106 // The interface to get the CPU stats. Abstracted for mocking.
108  public:
109  // The used and total amounts of CPU usage.
110  using CpuStatsSample = std::pair<uint64_t, uint64_t>;
111 
112  virtual ~CpuStatsProvider() = default;
113 
114  // Gets the cumulative used CPU and total CPU resource.
115  virtual CpuStatsSample GetCpuStats() = 0;
116 };
117 
118 // The default implementation reads CPU jiffies from the system to calculate CPU
119 // utilization.
121  public:
122  CpuStatsSample GetCpuStats() override;
123 };
124 
125 // Maintains all the load data and load reporting streams.
127  public:
128  // TODO(juanlishen): Allow config for providers from users.
129  LoadReporter(uint32_t feedback_sample_window_seconds,
130  std::unique_ptr<CensusViewProvider> census_view_provider,
131  std::unique_ptr<CpuStatsProvider> cpu_stats_provider)
132  : feedback_sample_window_seconds_(feedback_sample_window_seconds),
135  // Append the initial record so that the next real record can have a base.
137  }
138 
139  // Fetches the latest data from Census and merge it into the data store.
140  // Also adds a new sample to the LB feedback sliding window.
141  // Thread-unsafe. (1). The access to the load data store and feedback records
142  // has locking. (2). The access to the Census view provider and CPU stats
143  // provider lacks locking, but we only access these two members in this method
144  // (in testing, we also access them when setting up expectation). So the
145  // invocations of this method must be serialized.
146  void FetchAndSample();
147 
148  // Generates a report for that host and balancer. The report contains
149  // all the stats data accumulated between the last report (i.e., the last
150  // consumption) and the last fetch from Census (i.e., the last production).
151  // Thread-safe.
153  const std::string& hostname, const std::string& lb_id);
154 
155  // The feedback is calculated from the stats data recorded in the sliding
156  // window. Outdated records are discarded.
157  // Thread-safe.
158  grpc::lb::v1::LoadBalancingFeedback GenerateLoadBalancingFeedback();
159 
160  // Wrapper around LoadDataStore::ReportStreamCreated.
161  // Thread-safe.
162  void ReportStreamCreated(const std::string& hostname,
163  const std::string& lb_id,
164  const std::string& load_key);
165 
166  // Wrapper around LoadDataStore::ReportStreamClosed.
167  // Thread-safe.
168  void ReportStreamClosed(const std::string& hostname,
169  const std::string& lb_id);
170 
171  // Generates a unique LB ID of length kLbIdLength. Returns an empty string
172  // upon failure. Thread-safe.
174 
175  // Accessors only for testing.
177  return census_view_provider_.get();
178  }
180 
181  private:
188 
192  : end_time(end_time),
193  rpcs(rpcs),
194  errors(errors),
196  cpu_limit(cpu_limit) {}
197  };
198 
199  // Finds the view data about starting call from the view_data_map and merges
200  // the data to the load data store.
202  const CensusViewProvider::ViewDataMap& view_data_map);
203  // Finds the view data about ending call from the view_data_map and merges the
204  // data to the load data store.
206  const CensusViewProvider::ViewDataMap& view_data_map);
207  // Finds the view data about the customized call metrics from the
208  // view_data_map and merges the data to the load data store.
210  const CensusViewProvider::ViewDataMap& view_data_map);
211 
214  return record.end_time > now - feedback_sample_window_seconds_;
215  }
216 
218 
219  // Extracts an OrphanedLoadIdentifier from the per-balancer store and attaches
220  // it to the load.
221  void AttachOrphanLoadId(grpc::lb::v1::Load* load,
222  const PerBalancerStore& per_balancer_store);
223 
224  std::atomic<int64_t> next_lb_id_{0};
227  std::deque<LoadBalancingFeedbackRecord> feedback_records_;
228  // TODO(juanlishen): Lock in finer grain. Locking the whole store may be
229  // too expensive.
232  std::unique_ptr<CensusViewProvider> census_view_provider_;
233  std::unique_ptr<CpuStatsProvider> cpu_stats_provider_;
234 };
235 
236 } // namespace load_reporter
237 } // namespace grpc
238 
239 #endif // GRPC_SRC_CPP_SERVER_LOAD_REPORTER_LOAD_REPORTER_H
grpc::load_reporter::CensusViewProviderDefaultImpl
Definition: load_reporter.h:96
grpc::load_reporter::PerBalancerStore
Definition: load_data_store.h:197
absl::time_internal::cctz::seconds
std::chrono::duration< std::int_fast64_t > seconds
Definition: abseil-cpp/absl/time/internal/cctz/include/cctz/time_zone.h:40
grpc::load_reporter::CensusViewProvider::tag_key_metric_name_
::opencensus::tags::TagKey tag_key_metric_name_
Definition: load_reporter.h:92
google::protobuf::RepeatedPtrField
Definition: bloaty/third_party/protobuf/src/google/protobuf/compiler/command_line_interface.h:62
now
static double now(void)
Definition: test/core/fling/client.cc:130
grpc::load_reporter::LoadReporter::feedback_records_
std::deque< LoadBalancingFeedbackRecord > feedback_records_
Definition: load_reporter.h:227
grpc::load_reporter::LoadReporter::FetchAndSample
void FetchAndSample()
Definition: load_reporter.cc:506
grpc
Definition: grpcpp/alarm.h:33
absl::time_internal::cctz::time_point
std::chrono::time_point< std::chrono::system_clock, D > time_point
Definition: abseil-cpp/absl/time/internal/cctz/include/cctz/time_zone.h:39
grpc::load_reporter::LoadReporter::IsRecordInWindow
bool IsRecordInWindow(const LoadBalancingFeedbackRecord &record, std::chrono::system_clock::time_point now)
Definition: load_reporter.h:212
testing::internal::string
::std::string string
Definition: bloaty/third_party/protobuf/third_party/googletest/googletest/include/gtest/internal/gtest-port.h:881
grpc::load_reporter::CensusViewProviderDefaultImpl::FetchViewData
ViewDataMap FetchViewData() override
Definition: load_reporter.cc:207
grpc::load_reporter::LoadDataStore
Definition: load_data_store.h:303
grpc::load_reporter::LoadReporter::LoadReporter
LoadReporter(uint32_t feedback_sample_window_seconds, std::unique_ptr< CensusViewProvider > census_view_provider, std::unique_ptr< CpuStatsProvider > cpu_stats_provider)
Definition: load_reporter.h:129
grpc::load_reporter::LoadReporter::feedback_sample_window_seconds_
const std::chrono::seconds feedback_sample_window_seconds_
Definition: load_reporter.h:225
grpc::load_reporter::CensusViewProvider::GetRelatedViewDataRowInt
static uint64_t GetRelatedViewDataRowInt(const ViewDataMap &view_data_map, const char *view_name, size_t view_name_len, const std::vector< std::string > &tag_values)
Definition: load_reporter.cc:182
repeated_ptr_field.h
grpc::load_reporter::LoadReporter::LoadBalancingFeedbackRecord::rpcs
uint64_t rpcs
Definition: load_reporter.h:184
grpc::load_reporter::CensusViewProvider::ViewDataMap
std::unordered_map< std::string, ::opencensus::stats::ViewData > ViewDataMap
Definition: load_reporter.h:56
grpc::load_reporter::CpuStatsProvider::CpuStatsSample
std::pair< uint64_t, uint64_t > CpuStatsSample
Definition: load_reporter.h:110
grpc::load_reporter::CpuStatsProvider
Definition: load_reporter.h:107
grpc::load_reporter::CpuStatsProvider::~CpuStatsProvider
virtual ~CpuStatsProvider()=default
grpc::load_reporter::CensusViewProvider::tag_key_status_
::opencensus::tags::TagKey tag_key_status_
Definition: load_reporter.h:91
grpc::load_reporter::CpuStatsProviderDefaultImpl::GetCpuStats
CpuStatsSample GetCpuStats() override
Definition: load_reporter.cc:46
grpc::load_reporter::CensusViewProvider::tag_key_user_id_
::opencensus::tags::TagKey tag_key_user_id_
Definition: load_reporter.h:90
uint32_t
unsigned int uint32_t
Definition: stdint-msvc2008.h:80
grpc::load_reporter::LoadReporter::LoadBalancingFeedbackRecord::errors
uint64_t errors
Definition: load_reporter.h:185
grpc::load_reporter::CensusViewProvider::ViewDescriptorMap
std::unordered_map< std::string, ::opencensus::stats::ViewDescriptor > ViewDescriptorMap
Definition: load_reporter.h:59
grpc::load_reporter::CensusViewProvider::view_descriptor_map
const ViewDescriptorMap & view_descriptor_map() const
Definition: load_reporter.h:81
grpc::load_reporter::LoadReporter::ProcessViewDataOtherCallMetrics
void ProcessViewDataOtherCallMetrics(const CensusViewProvider::ViewDataMap &view_data_map)
Definition: load_reporter.cc:480
errors
const char * errors
Definition: bloaty/third_party/protobuf/src/google/protobuf/io/tokenizer_unittest.cc:841
absl::move
constexpr absl::remove_reference_t< T > && move(T &&t) noexcept
Definition: abseil-cpp/absl/utility/utility.h:221
grpc::load_reporter::CensusViewProvider::GetRelatedViewDataRowDouble
static double GetRelatedViewDataRowDouble(const ViewDataMap &view_data_map, const char *view_name, size_t view_name_len, const std::vector< std::string > &tag_values)
Definition: load_reporter.cc:170
grpc::load_reporter::LoadReporter::AttachOrphanLoadId
void AttachOrphanLoadId(grpc::lb::v1::Load *load, const PerBalancerStore &per_balancer_store)
Definition: load_reporter.cc:362
grpc::load_reporter::LoadReporter::feedback_mu_
grpc_core::Mutex feedback_mu_
Definition: load_reporter.h:226
grpc::load_reporter::LoadReporter::LoadBalancingFeedbackRecord
Definition: load_reporter.h:182
grpc::load_reporter::LoadReporter::load_data_store_
LoadDataStore load_data_store_
Definition: load_reporter.h:231
grpc::load_reporter::LoadReporter::next_lb_id_
std::atomic< int64_t > next_lb_id_
Definition: load_reporter.h:224
grpc::load_reporter::CensusViewProvider
Definition: load_reporter.h:52
grpc::load_reporter::LoadReporter::census_view_provider
CensusViewProvider * census_view_provider()
Definition: load_reporter.h:176
grpc::load_reporter::LoadReporter::GenerateLoads
::google::protobuf::RepeatedPtrField< grpc::lb::v1::Load > GenerateLoads(const std::string &hostname, const std::string &lb_id)
Definition: load_reporter.cc:301
uint64_t
unsigned __int64 uint64_t
Definition: stdint-msvc2008.h:90
grpc::load_reporter::LoadReporter::LoadBalancingFeedbackRecord::cpu_limit
uint64_t cpu_limit
Definition: load_reporter.h:187
grpc::load_reporter::LoadReporter::store_mu_
grpc_core::Mutex store_mu_
Definition: load_reporter.h:230
grpc::load_reporter::LoadReporter::GenerateLoadBalancingFeedback
grpc::lb::v1::LoadBalancingFeedback GenerateLoadBalancingFeedback()
Definition: load_reporter.cc:251
grpc::load_reporter::LoadReporter::ProcessViewDataCallEnd
void ProcessViewDataCallEnd(const CensusViewProvider::ViewDataMap &view_data_map)
Definition: load_reporter.cc:427
gen_build_yaml.load
def load(*args)
Definition: test/core/end2end/gen_build_yaml.py:25
grpc::load_reporter::CensusViewProviderDefaultImpl::view_map_
std::unordered_map< std::string, ::opencensus::stats::View > view_map_
Definition: load_reporter.h:103
config.h
grpc::load_reporter::LoadReporter::ProcessViewDataCallStart
void ProcessViewDataCallStart(const CensusViewProvider::ViewDataMap &view_data_map)
Definition: load_reporter.cc:407
stdint.h
grpc::load_reporter::LoadReporter::LoadBalancingFeedbackRecord::cpu_usage
uint64_t cpu_usage
Definition: load_reporter.h:186
grpc_core::Mutex
Definition: src/core/lib/gprpp/sync.h:61
grpc::load_reporter::LoadReporter::LoadBalancingFeedbackRecord::end_time
std::chrono::system_clock::time_point end_time
Definition: load_reporter.h:183
grpc::load_reporter::CensusViewProvider::tag_key_token_
::opencensus::tags::TagKey tag_key_token_
Definition: load_reporter.h:88
grpc::load_reporter::LoadReporter::LoadBalancingFeedbackRecord::LoadBalancingFeedbackRecord
LoadBalancingFeedbackRecord(const std::chrono::system_clock::time_point &end_time, uint64_t rpcs, uint64_t errors, uint64_t cpu_usage, uint64_t cpu_limit)
Definition: load_reporter.h:189
grpc::load_reporter::CpuStatsProvider::GetCpuStats
virtual CpuStatsSample GetCpuStats()=0
std
Definition: grpcpp/impl/codegen/async_unary_call.h:407
grpc::load_reporter::CensusViewProviderDefaultImpl::CensusViewProviderDefaultImpl
CensusViewProviderDefaultImpl()
Definition: load_reporter.cc:195
grpc::load_reporter::LoadReporter::cpu_stats_provider_
std::unique_ptr< CpuStatsProvider > cpu_stats_provider_
Definition: load_reporter.h:233
grpc::load_reporter::LoadReporter
Definition: load_reporter.h:126
grpc::load_reporter::CensusViewProvider::FetchViewData
virtual ViewDataMap FetchViewData()=0
grpc::load_reporter::CensusViewProvider::~CensusViewProvider
virtual ~CensusViewProvider()=default
grpc::load_reporter::LoadReporter::ReportStreamCreated
void ReportStreamCreated(const std::string &hostname, const std::string &lb_id, const std::string &load_key)
Definition: load_reporter.cc:389
load_data_store.h
grpc::load_reporter::CensusViewProvider::CensusViewProvider
CensusViewProvider()
Definition: load_reporter.cc:50
grpc::load_reporter::CensusViewProvider::view_descriptor_map_
ViewDescriptorMap view_descriptor_map_
Definition: load_reporter.h:86
grpc::load_reporter::LoadReporter::census_view_provider_
std::unique_ptr< CensusViewProvider > census_view_provider_
Definition: load_reporter.h:232
grpc::load_reporter::LoadReporter::ReportStreamClosed
void ReportStreamClosed(const std::string &hostname, const std::string &lb_id)
Definition: load_reporter.cc:399
sync.h
grpc::load_reporter::CensusViewProvider::tag_key_host_
::opencensus::tags::TagKey tag_key_host_
Definition: load_reporter.h:89
grpc::load_reporter::CpuStatsProviderDefaultImpl
Definition: load_reporter.h:120
grpc::load_reporter::LoadReporter::GenerateLbId
std::string GenerateLbId()
Definition: load_reporter.cc:227
grpc::load_reporter::LoadReporter::cpu_stats_provider
CpuStatsProvider * cpu_stats_provider()
Definition: load_reporter.h:179
port_platform.h
grpc::load_reporter::LoadReporter::AppendNewFeedbackRecord
void AppendNewFeedbackRecord(uint64_t rpcs, uint64_t errors)
Definition: load_reporter.cc:376


grpc
Author(s):
autogenerated on Fri May 16 2025 02:59:16