load_data_store.h
Go to the documentation of this file.
1 /*
2  *
3  * Copyright 2018 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
19 #ifndef GRPC_SRC_CPP_SERVER_LOAD_REPORTER_LOAD_DATA_STORE_H
20 #define GRPC_SRC_CPP_SERVER_LOAD_REPORTER_LOAD_DATA_STORE_H
21 
23 
24 #include <stddef.h>
25 #include <stdint.h>
26 
27 #include <cstdint>
28 #include <memory>
29 #include <set>
30 #include <string>
31 #include <unordered_map>
32 #include <utility>
33 
34 #include <grpcpp/support/config.h>
35 
36 namespace grpc {
37 namespace load_reporter {
38 
39 // The load data storage is organized in hierarchy. The LoadDataStore is the
40 // top-level data store. In LoadDataStore, for each host we keep a
41 // PerHostStore, in which for each balancer we keep a PerBalancerStore. Each
42 // PerBalancerStore maintains a map of load records, mapping from LoadRecordKey
43 // to LoadRecordValue. The LoadRecordValue contains a map of customized call
44 // metrics, mapping from a call metric name to the CallMetricValue.
45 
46 // The value of a customized call metric.
48  public:
50  double total_metric_value = 0)
52 
53  void MergeFrom(CallMetricValue other) {
54  num_calls_ += other.num_calls_;
56  }
57 
58  // Getters.
59  uint64_t num_calls() const { return num_calls_; }
60  double total_metric_value() const { return total_metric_value_; }
61 
62  private:
63  // The number of calls that finished with this metric.
65  // The sum of metric values across all the calls that finished with this
66  // metric.
67  double total_metric_value_ = 0;
68 };
69 
70 // The key of a load record.
72  public:
75  : lb_id_(std::move(lb_id)),
79 
80  // Parses the input client_ip_and_token to set client IP, LB ID, and LB tag.
81  LoadRecordKey(const std::string& client_ip_and_token, std::string user_id);
82 
84  return "[lb_id_=" + lb_id_ + ", lb_tag_=" + lb_tag_ +
85  ", user_id_=" + user_id_ + ", client_ip_hex_=" + client_ip_hex_ +
86  "]";
87  }
88 
89  bool operator==(const LoadRecordKey& other) const {
90  return lb_id_ == other.lb_id_ && lb_tag_ == other.lb_tag_ &&
91  user_id_ == other.user_id_ && client_ip_hex_ == other.client_ip_hex_;
92  }
93 
94  // Gets the client IP bytes in network order (i.e., big-endian).
96 
97  // Getters.
98  const std::string& lb_id() const { return lb_id_; }
99  const std::string& lb_tag() const { return lb_tag_; }
100  const std::string& user_id() const { return user_id_; }
101  const std::string& client_ip_hex() const { return client_ip_hex_; }
102 
103  struct Hasher {
104  void hash_combine(size_t* seed, const std::string& k) const {
105  *seed ^= std::hash<std::string>()(k) + 0x9e3779b9 + (*seed << 6) +
106  (*seed >> 2);
107  }
108 
109  size_t operator()(const LoadRecordKey& k) const {
110  size_t h = 0;
111  hash_combine(&h, k.lb_id_);
112  hash_combine(&h, k.lb_tag_);
113  hash_combine(&h, k.user_id_);
114  hash_combine(&h, k.client_ip_hex_);
115  return h;
116  }
117  };
118 
119  private:
124 };
125 
126 // The value of a load record.
128  public:
138 
139  LoadRecordValue(std::string metric_name, uint64_t num_calls,
140  double total_metric_value);
141 
142  void MergeFrom(const LoadRecordValue& other) {
143  start_count_ += other.start_count_;
144  ok_count_ += other.ok_count_;
145  error_count_ += other.error_count_;
146  bytes_sent_ += other.bytes_sent_;
147  bytes_recv_ += other.bytes_recv_;
148  latency_ms_ += other.latency_ms_;
149  for (const auto& p : other.call_metrics_) {
150  const std::string& key = p.first;
151  const CallMetricValue& value = p.second;
152  call_metrics_[key].MergeFrom(value);
153  }
154  }
155 
157  return static_cast<int64_t>(start_count_ - ok_count_ - error_count_);
158  }
159 
161  return "[start_count_=" + std::to_string(start_count_) +
162  ", ok_count_=" + std::to_string(ok_count_) +
163  ", error_count_=" + std::to_string(error_count_) +
164  ", bytes_sent_=" + std::to_string(bytes_sent_) +
165  ", bytes_recv_=" + std::to_string(bytes_recv_) +
166  ", latency_ms_=" + std::to_string(latency_ms_) + ", " +
167  std::to_string(call_metrics_.size()) + " other call metric(s)]";
168  }
169 
170  bool InsertCallMetric(const std::string& metric_name,
171  const CallMetricValue& metric_value) {
172  return call_metrics_.insert({metric_name, metric_value}).second;
173  }
174 
175  // Getters.
176  uint64_t start_count() const { return start_count_; }
177  uint64_t ok_count() const { return ok_count_; }
178  uint64_t error_count() const { return error_count_; }
179  uint64_t bytes_sent() const { return bytes_sent_; }
180  uint64_t bytes_recv() const { return bytes_recv_; }
181  uint64_t latency_ms() const { return latency_ms_; }
182  const std::unordered_map<std::string, CallMetricValue>& call_metrics() const {
183  return call_metrics_;
184  }
185 
186  private:
193  std::unordered_map<std::string, CallMetricValue> call_metrics_;
194 };
195 
196 // Stores the data associated with a particular LB ID.
198  public:
199  using LoadRecordMap =
200  std::unordered_map<LoadRecordKey, LoadRecordValue, LoadRecordKey::Hasher>;
201 
204 
205  // Merge a load record with the given key and value if the store is not
206  // suspended.
207  void MergeRow(const LoadRecordKey& key, const LoadRecordValue& value);
208 
209  // Suspend this store, so that no detailed load data will be recorded.
210  void Suspend();
211  // Resume this store from suspension.
212  void Resume();
213  // Is this store suspended or not?
214  bool IsSuspended() const { return suspended_; }
215 
218  }
219 
221 
223  return "[PerBalancerStore lb_id_=" + lb_id_ + " load_key_=" + load_key_ +
224  "]";
225  }
226 
228 
229  // Getters.
230  const std::string& lb_id() const { return lb_id_; }
231  const std::string& load_key() const { return load_key_; }
232  const LoadRecordMap& load_record_map() const { return load_record_map_; }
233 
234  private:
236  // TODO(juanlishen): Use bytestring protobuf type?
241  bool suspended_ = false;
242 };
243 
244 // Stores the data associated with a particular host.
246  public:
247  // When a report stream is created, a PerBalancerStore is created for the
248  // LB ID (guaranteed unique) associated with that stream. If it is the only
249  // active store, adopt all the orphaned stores. If it is the first created
250  // store, adopt the store of kInvalidLbId.
251  void ReportStreamCreated(const std::string& lb_id,
252  const std::string& load_key);
253 
254  // When a report stream is closed, the PerBalancerStores assigned to the
255  // associate LB ID need to be re-assigned to other active balancers,
256  // ideally with the same load key. If there is no active balancer, we have
257  // to suspend those stores and drop the incoming load data until they are
258  // resumed.
259  void ReportStreamClosed(const std::string& lb_id);
260 
261  // Returns null if not found. Caller doesn't own the returned store.
263 
264  // Returns null if lb_id is not found. The returned pointer points to the
265  // underlying data structure, which is not owned by the caller.
266  const std::set<PerBalancerStore*>* GetAssignedStores(
267  const std::string& lb_id) const;
268 
269  private:
270  // Creates a PerBalancerStore for the given LB ID, assigns the store to
271  // itself, and records the LB ID to the load key.
272  void SetUpForNewLbId(const std::string& lb_id, const std::string& load_key);
273 
274  void AssignOrphanedStore(PerBalancerStore* orphaned_store,
275  const std::string& new_receiver);
276 
277  std::unordered_map<std::string, std::set<std::string>>
279 
280  // Key: LB ID. The key set includes all the LB IDs that have been
281  // allocated for reporting streams so far.
282  // Value: the unique pointer to the PerBalancerStore of the LB ID.
283  std::unordered_map<std::string, std::unique_ptr<PerBalancerStore>>
285 
286  // Key: LB ID. The key set includes the LB IDs of the balancers that are
287  // currently receiving report.
288  // Value: the set of raw pointers to the PerBalancerStores assigned to the LB
289  // ID. Note that the sets in assigned_stores_ form a division of the value set
290  // of per_balancer_stores_.
291  std::unordered_map<std::string, std::set<PerBalancerStore*>> assigned_stores_;
292 };
293 
294 // Thread-unsafe two-level bookkeeper of all the load data.
295 // Note: We never remove any store objects from this class, as per the
296 // current spec. That's because premature removal of the store objects
297 // may lead to loss of critical information, e.g., mapping from lb_id to
298 // load_key, and the number of in-progress calls. Such loss will cause
299 // information inconsistency when the balancer is re-connected. Keeping
300 // all the stores should be fine for PerHostStore, since we assume there
301 // should only be a few hostnames. But it's a potential problem for
302 // PerBalancerStore.
304  public:
305  // Returns null if not found. Caller doesn't own the returned store.
307  const std::string& lb_id) const;
308 
309  // Returns null if hostname or lb_id is not found. The returned pointer points
310  // to the underlying data structure, which is not owned by the caller.
311  const std::set<PerBalancerStore*>* GetAssignedStores(const string& hostname,
312  const string& lb_id);
313 
314  // If a PerBalancerStore can be found by the hostname and LB ID in
315  // LoadRecordKey, the load data will be merged to that store. Otherwise,
316  // only track the number of the in-progress calls for this unknown LB ID.
317  void MergeRow(const std::string& hostname, const LoadRecordKey& key,
318  const LoadRecordValue& value);
319 
320  // Is the given lb_id a tracked unknown LB ID (i.e., the LB ID was associated
321  // with some received load data but unknown to this load data store)?
322  bool IsTrackedUnknownBalancerId(const std::string& lb_id) const {
323  return unknown_balancer_id_trackers_.find(lb_id) !=
325  }
326 
327  // Wrapper around PerHostStore::ReportStreamCreated.
328  void ReportStreamCreated(const std::string& hostname,
329  const std::string& lb_id,
330  const std::string& load_key);
331 
332  // Wrapper around PerHostStore::ReportStreamClosed.
333  void ReportStreamClosed(const std::string& hostname,
334  const std::string& lb_id);
335 
336  private:
337  // Buffered data that was fetched from Census but hasn't been sent to
338  // balancer. We need to keep this data ourselves because Census will
339  // delete the data once it's returned.
340  std::unordered_map<std::string, PerHostStore> per_host_stores_;
341 
342  // Tracks the number of in-progress calls for each unknown LB ID.
343  std::unordered_map<std::string, uint64_t> unknown_balancer_id_trackers_;
344 };
345 
346 } // namespace load_reporter
347 } // namespace grpc
348 
349 #endif // GRPC_SRC_CPP_SERVER_LOAD_REPORTER_LOAD_DATA_STORE_H
grpc::load_reporter::PerBalancerStore::MergeRow
void MergeRow(const LoadRecordKey &key, const LoadRecordValue &value)
Definition: load_data_store.cc:147
grpc::load_reporter::PerBalancerStore
Definition: load_data_store.h:197
grpc::load_reporter::LoadRecordValue::bytes_recv_
uint64_t bytes_recv_
Definition: load_data_store.h:191
grpc::load_reporter::LoadRecordValue::bytes_recv
uint64_t bytes_recv() const
Definition: load_data_store.h:180
grpc::load_reporter::PerBalancerStore::ToString
std::string ToString()
Definition: load_data_store.h:222
grpc::load_reporter::LoadDataStore::FindPerBalancerStore
PerBalancerStore * FindPerBalancerStore(const std::string &hostname, const std::string &lb_id) const
Definition: load_data_store.cc:282
grpc::load_reporter::LoadDataStore::ReportStreamCreated
void ReportStreamCreated(const std::string &hostname, const std::string &lb_id, const std::string &load_key)
Definition: load_data_store.cc:330
grpc::load_reporter::LoadRecordValue::error_count
uint64_t error_count() const
Definition: load_data_store.h:178
grpc::load_reporter::PerHostStore::FindPerBalancerStore
PerBalancerStore * FindPerBalancerStore(const std::string &lb_id) const
Definition: load_data_store.cc:243
grpc::load_reporter::PerHostStore
Definition: load_data_store.h:245
grpc::load_reporter::PerBalancerStore::last_reported_num_calls_in_progress_
uint64_t last_reported_num_calls_in_progress_
Definition: load_data_store.h:240
grpc::load_reporter::PerHostStore::per_balancer_stores_
std::unordered_map< std::string, std::unique_ptr< PerBalancerStore > > per_balancer_stores_
Definition: load_data_store.h:284
grpc
Definition: grpcpp/alarm.h:33
grpc::load_reporter::LoadRecordValue::MergeFrom
void MergeFrom(const LoadRecordValue &other)
Definition: load_data_store.h:142
grpc::load_reporter::LoadRecordKey::lb_tag
const std::string & lb_tag() const
Definition: load_data_store.h:99
grpc::load_reporter::LoadRecordValue::start_count
uint64_t start_count() const
Definition: load_data_store.h:176
seed
static const uint8_t seed[20]
Definition: dsa_test.cc:79
grpc::load_reporter::CallMetricValue::num_calls
uint64_t num_calls() const
Definition: load_data_store.h:59
testing::internal::string
::std::string string
Definition: bloaty/third_party/protobuf/third_party/googletest/googletest/include/gtest/internal/gtest-port.h:881
grpc::load_reporter::CallMetricValue::total_metric_value
double total_metric_value() const
Definition: load_data_store.h:60
grpc::load_reporter::PerBalancerStore::lb_id
const std::string & lb_id() const
Definition: load_data_store.h:230
grpc::load_reporter::LoadDataStore
Definition: load_data_store.h:303
grpc::load_reporter::PerBalancerStore::load_key_
std::string load_key_
Definition: load_data_store.h:237
grpc::load_reporter::LoadRecordKey::LoadRecordKey
LoadRecordKey(std::string lb_id, std::string lb_tag, std::string user_id, std::string client_ip_hex)
Definition: load_data_store.h:73
grpc::load_reporter::PerHostStore::AssignOrphanedStore
void AssignOrphanedStore(PerBalancerStore *orphaned_store, const std::string &new_receiver)
Definition: load_data_store.cc:257
grpc::load_reporter::PerBalancerStore::Resume
void Resume()
Definition: load_data_store.cc:175
grpc::load_reporter::LoadRecordValue::latency_ms
uint64_t latency_ms() const
Definition: load_data_store.h:181
second
StrT second
Definition: cxa_demangle.cpp:4885
grpc::load_reporter::PerBalancerStore::LoadRecordMap
std::unordered_map< LoadRecordKey, LoadRecordValue, LoadRecordKey::Hasher > LoadRecordMap
Definition: load_data_store.h:200
grpc::load_reporter::LoadRecordValue
Definition: load_data_store.h:127
setup.k
k
Definition: third_party/bloaty/third_party/capstone/bindings/python/setup.py:42
grpc::load_reporter::LoadRecordValue::error_count_
uint64_t error_count_
Definition: load_data_store.h:189
grpc::load_reporter::PerBalancerStore::load_key
const std::string & load_key() const
Definition: load_data_store.h:231
grpc::load_reporter::LoadRecordKey::user_id
const std::string & user_id() const
Definition: load_data_store.h:100
grpc::load_reporter::LoadRecordValue::GetNumCallsInProgressDelta
int64_t GetNumCallsInProgressDelta() const
Definition: load_data_store.h:156
grpc::load_reporter::PerBalancerStore::load_record_map_
LoadRecordMap load_record_map_
Definition: load_data_store.h:238
grpc::load_reporter::PerHostStore::ReportStreamClosed
void ReportStreamClosed(const std::string &lb_id)
Definition: load_data_store.cc:212
absl::move
constexpr absl::remove_reference_t< T > && move(T &&t) noexcept
Definition: abseil-cpp/absl/utility/utility.h:221
int64_t
signed __int64 int64_t
Definition: stdint-msvc2008.h:89
grpc::load_reporter::LoadRecordKey::lb_id
const std::string & lb_id() const
Definition: load_data_store.h:98
grpc::load_reporter::PerBalancerStore::Suspend
void Suspend()
Definition: load_data_store.cc:169
grpc::load_reporter::LoadDataStore::unknown_balancer_id_trackers_
std::unordered_map< std::string, uint64_t > unknown_balancer_id_trackers_
Definition: load_data_store.h:343
grpc::load_reporter::LoadRecordValue::latency_ms_
uint64_t latency_ms_
Definition: load_data_store.h:192
uint64_t
unsigned __int64 uint64_t
Definition: stdint-msvc2008.h:90
grpc::load_reporter::LoadRecordKey::lb_id_
std::string lb_id_
Definition: load_data_store.h:120
grpc::load_reporter::LoadDataStore::GetAssignedStores
const std::set< PerBalancerStore * > * GetAssignedStores(const string &hostname, const string &lb_id)
Definition: load_data_store.cc:323
grpc::load_reporter::LoadRecordKey::lb_tag_
std::string lb_tag_
Definition: load_data_store.h:121
grpc::load_reporter::LoadRecordValue::LoadRecordValue
LoadRecordValue(uint64_t start_count=0, uint64_t ok_count=0, uint64_t error_count=0, uint64_t bytes_sent=0, uint64_t bytes_recv=0, uint64_t latency_ms=0)
Definition: load_data_store.h:129
grpc::load_reporter::LoadRecordKey::client_ip_hex_
std::string client_ip_hex_
Definition: load_data_store.h:123
grpc::load_reporter::LoadRecordValue::bytes_sent_
uint64_t bytes_sent_
Definition: load_data_store.h:190
grpc::load_reporter::CallMetricValue::MergeFrom
void MergeFrom(CallMetricValue other)
Definition: load_data_store.h:53
grpc::load_reporter::CallMetricValue::total_metric_value_
double total_metric_value_
Definition: load_data_store.h:67
grpc::load_reporter::LoadRecordKey::Hasher::operator()
size_t operator()(const LoadRecordKey &k) const
Definition: load_data_store.h:109
config.h
grpc::load_reporter::LoadRecordValue::call_metrics_
std::unordered_map< std::string, CallMetricValue > call_metrics_
Definition: load_data_store.h:193
stdint.h
grpc::load_reporter::LoadDataStore::per_host_stores_
std::unordered_map< std::string, PerHostStore > per_host_stores_
Definition: load_data_store.h:340
grpc::load_reporter::PerHostStore::ReportStreamCreated
void ReportStreamCreated(const std::string &lb_id, const std::string &load_key)
Definition: load_data_store.cc:186
grpc::load_reporter::PerBalancerStore::ClearLoadRecordMap
void ClearLoadRecordMap()
Definition: load_data_store.h:227
grpc::load_reporter::LoadRecordKey::Hasher::hash_combine
void hash_combine(size_t *seed, const std::string &k) const
Definition: load_data_store.h:104
grpc::load_reporter::LoadRecordKey::operator==
bool operator==(const LoadRecordKey &other) const
Definition: load_data_store.h:89
value
const char * value
Definition: hpack_parser_table.cc:165
grpc::load_reporter::LoadDataStore::ReportStreamClosed
void ReportStreamClosed(const std::string &hostname, const std::string &lb_id)
Definition: load_data_store.cc:336
grpc::load_reporter::LoadDataStore::IsTrackedUnknownBalancerId
bool IsTrackedUnknownBalancerId(const std::string &lb_id) const
Definition: load_data_store.h:322
grpc::load_reporter::LoadRecordKey::client_ip_hex
const std::string & client_ip_hex() const
Definition: load_data_store.h:101
grpc::load_reporter::PerBalancerStore::lb_id_
std::string lb_id_
Definition: load_data_store.h:235
grpc::load_reporter::PerBalancerStore::load_record_map
const LoadRecordMap & load_record_map() const
Definition: load_data_store.h:232
grpc::load_reporter::LoadRecordValue::ok_count_
uint64_t ok_count_
Definition: load_data_store.h:188
key
const char * key
Definition: hpack_parser_table.cc:164
grpc::load_reporter::LoadRecordValue::bytes_sent
uint64_t bytes_sent() const
Definition: load_data_store.h:179
grpc::load_reporter::LoadRecordKey
Definition: load_data_store.h:71
grpc::load_reporter::CallMetricValue::CallMetricValue
CallMetricValue(uint64_t num_calls=0, double total_metric_value=0)
Definition: load_data_store.h:49
grpc::load_reporter::PerBalancerStore::num_calls_in_progress_
uint64_t num_calls_in_progress_
Definition: load_data_store.h:239
grpc::load_reporter::LoadRecordValue::InsertCallMetric
bool InsertCallMetric(const std::string &metric_name, const CallMetricValue &metric_value)
Definition: load_data_store.h:170
grpc::load_reporter::LoadRecordKey::GetClientIpBytes
std::string GetClientIpBytes() const
Definition: load_data_store.cc:107
std
Definition: grpcpp/impl/codegen/async_unary_call.h:407
grpc::load_reporter::PerBalancerStore::GetNumCallsInProgressForReport
uint64_t GetNumCallsInProgressForReport()
Definition: load_data_store.cc:180
grpc::load_reporter::LoadRecordKey::Hasher
Definition: load_data_store.h:103
grpc::load_reporter::LoadRecordValue::ToString
std::string ToString() const
Definition: load_data_store.h:160
grpc::load_reporter::PerBalancerStore::PerBalancerStore
PerBalancerStore(std::string lb_id, std::string load_key)
Definition: load_data_store.h:202
grpc::load_reporter::LoadRecordKey::ToString
std::string ToString() const
Definition: load_data_store.h:83
grpc::load_reporter::PerBalancerStore::suspended_
bool suspended_
Definition: load_data_store.h:241
grpc::load_reporter::CallMetricValue
Definition: load_data_store.h:47
grpc::load_reporter::PerBalancerStore::IsSuspended
bool IsSuspended() const
Definition: load_data_store.h:214
grpc::load_reporter::PerHostStore::SetUpForNewLbId
void SetUpForNewLbId(const std::string &lb_id, const std::string &load_key)
Definition: load_data_store.cc:269
grpc::load_reporter::PerHostStore::assigned_stores_
std::unordered_map< std::string, std::set< PerBalancerStore * > > assigned_stores_
Definition: load_data_store.h:291
grpc::load_reporter::PerHostStore::GetAssignedStores
const std::set< PerBalancerStore * > * GetAssignedStores(const std::string &lb_id) const
Definition: load_data_store.cc:250
to_string
static bool to_string(zval *from)
Definition: protobuf/php/ext/google/protobuf/convert.c:333
grpc::load_reporter::LoadRecordValue::start_count_
uint64_t start_count_
Definition: load_data_store.h:187
grpc::load_reporter::CallMetricValue::num_calls_
uint64_t num_calls_
Definition: load_data_store.h:64
grpc::load_reporter::PerBalancerStore::IsNumCallsInProgressChangedSinceLastReport
bool IsNumCallsInProgressChangedSinceLastReport() const
Definition: load_data_store.h:216
grpc::load_reporter::LoadRecordKey::user_id_
std::string user_id_
Definition: load_data_store.h:122
grpc::load_reporter::PerHostStore::load_key_to_receiving_lb_ids_
std::unordered_map< std::string, std::set< std::string > > load_key_to_receiving_lb_ids_
Definition: load_data_store.h:278
grpc::load_reporter::LoadRecordValue::ok_count
uint64_t ok_count() const
Definition: load_data_store.h:177
grpc::load_reporter::LoadRecordValue::call_metrics
const std::unordered_map< std::string, CallMetricValue > & call_metrics() const
Definition: load_data_store.h:182
port_platform.h
grpc::load_reporter::LoadDataStore::MergeRow
void MergeRow(const std::string &hostname, const LoadRecordKey &key, const LoadRecordValue &value)
Definition: load_data_store.cc:293


grpc
Author(s):
autogenerated on Thu Mar 13 2025 03:00:29