load_data_store.cc
Go to the documentation of this file.
1 /*
2  *
3  * Copyright 2018 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
20 
22 
23 #include <stdint.h>
24 #include <stdio.h>
25 
26 #include <cstdlib>
27 #include <iterator>
28 #include <set>
29 #include <unordered_map>
30 
31 #include <grpc/support/log.h>
32 
35 
36 namespace grpc {
37 namespace load_reporter {
38 
39 // Some helper functions.
40 namespace {
41 
42 // Given a map from type K to a set of value type V, finds the set associated
43 // with the given key and erases the value from the set. If the set becomes
44 // empty, also erases the key-set pair. Returns true if the value is erased
45 // successfully.
46 template <typename K, typename V>
47 bool UnorderedMapOfSetEraseKeyValue(std::unordered_map<K, std::set<V>>& map,
48  const K& key, const V& value) {
49  auto it = map.find(key);
50  if (it != map.end()) {
51  size_t erased = it->second.erase(value);
52  if (it->second.empty()) {
53  map.erase(it);
54  }
55  return erased;
56  }
57  return false;
58 };
59 
60 // Given a map from type K to a set of value type V, removes the given key and
61 // the associated set, and returns the set. Returns an empty set if the key is
62 // not found.
63 template <typename K, typename V>
64 std::set<V> UnorderedMapOfSetExtract(std::unordered_map<K, std::set<V>>& map,
65  const K& key) {
66  auto it = map.find(key);
67  if (it != map.end()) {
68  auto set = std::move(it->second);
69  map.erase(it);
70  return set;
71  }
72  return {};
73 };
74 
75 // From a non-empty container, returns a pointer to a random element.
76 template <typename C>
77 const typename C::value_type* RandomElement(const C& container) {
78  GPR_ASSERT(!container.empty());
79  auto it = container.begin();
80  std::advance(it, std::rand() % container.size());
81  return &(*it);
82 }
83 
84 } // namespace
85 
86 LoadRecordKey::LoadRecordKey(const std::string& client_ip_and_token,
87  std::string user_id)
88  : user_id_(std::move(user_id)) {
89  GPR_ASSERT(client_ip_and_token.size() >= 2);
90  int ip_hex_size;
91  GPR_ASSERT(sscanf(client_ip_and_token.substr(0, 2).c_str(), "%d",
92  &ip_hex_size) == 1);
93  GPR_ASSERT(ip_hex_size == 0 || ip_hex_size == kIpv4AddressLength ||
94  ip_hex_size == kIpv6AddressLength);
95  size_t cur_pos = 2;
96  client_ip_hex_ = client_ip_and_token.substr(cur_pos, ip_hex_size);
97  cur_pos += ip_hex_size;
98  if (client_ip_and_token.size() - cur_pos < kLbIdLength) {
100  lb_tag_ = "";
101  } else {
102  lb_id_ = client_ip_and_token.substr(cur_pos, kLbIdLength);
103  lb_tag_ = client_ip_and_token.substr(cur_pos + kLbIdLength);
104  }
105 }
106 
108  if (client_ip_hex_.empty()) {
109  return "";
110  } else if (client_ip_hex_.size() == kIpv4AddressLength) {
111  uint32_t ip_bytes;
112  if (sscanf(client_ip_hex_.c_str(), "%x", &ip_bytes) != 1) {
114  "Can't parse client IP (%s) from a hex string to an integer.",
115  client_ip_hex_.c_str());
116  return "";
117  }
118  ip_bytes = grpc_htonl(ip_bytes);
119  return std::string(reinterpret_cast<const char*>(&ip_bytes),
120  sizeof(ip_bytes));
121  } else if (client_ip_hex_.size() == kIpv6AddressLength) {
122  uint32_t ip_bytes[4];
123  for (size_t i = 0; i < 4; ++i) {
124  if (sscanf(client_ip_hex_.substr(i * 8, (i + 1) * 8).c_str(), "%x",
125  ip_bytes + i) != 1) {
126  gpr_log(
127  GPR_ERROR,
128  "Can't parse client IP part (%s) from a hex string to an integer.",
129  client_ip_hex_.substr(i * 8, (i + 1) * 8).c_str());
130  return "";
131  }
132  ip_bytes[i] = grpc_htonl(ip_bytes[i]);
133  }
134  return std::string(reinterpret_cast<const char*>(ip_bytes),
135  sizeof(ip_bytes));
136  } else {
137  GPR_UNREACHABLE_CODE(return "");
138  }
139 }
140 
142  double total_metric_value) {
143  call_metrics_.emplace(std::move(metric_name),
144  CallMetricValue(num_calls, total_metric_value));
145 }
146 
148  const LoadRecordValue& value) {
149  // During suspension, the load data received will be dropped.
150  if (!suspended_) {
151  load_record_map_[key].MergeFrom(value);
153  "[PerBalancerStore %p] Load data merged (Key: %s, Value: %s).",
154  this, key.ToString().c_str(), value.ToString().c_str());
155  } else {
157  "[PerBalancerStore %p] Load data dropped (Key: %s, Value: %s).",
158  this, key.ToString().c_str(), value.ToString().c_str());
159  }
160  // We always keep track of num_calls_in_progress_, so that when this
161  // store is resumed, we still have a correct value of
162  // num_calls_in_progress_.
164  value.GetNumCallsInProgressDelta() >=
165  0);
166  num_calls_in_progress_ += value.GetNumCallsInProgressDelta();
167 }
168 
170  suspended_ = true;
171  load_record_map_.clear();
172  gpr_log(GPR_DEBUG, "[PerBalancerStore %p] Suspended.", this);
173 }
174 
176  suspended_ = false;
177  gpr_log(GPR_DEBUG, "[PerBalancerStore %p] Resumed.", this);
178 }
179 
183  return num_calls_in_progress_;
184 }
185 
187  const std::string& load_key) {
188  GPR_ASSERT(lb_id != kInvalidLbId);
189  SetUpForNewLbId(lb_id, load_key);
190  // Prior to this one, there was no load balancer receiving report, so we may
191  // have unassigned orphaned stores to assign to this new balancer.
192  // TODO(juanlishen): If the load key of this new stream is the same with
193  // some previously adopted orphan store, we may want to take the orphan to
194  // this stream. Need to discuss with LB team.
195  if (assigned_stores_.size() == 1) {
196  for (const auto& p : per_balancer_stores_) {
197  const std::string& other_lb_id = p.first;
198  const std::unique_ptr<PerBalancerStore>& orphaned_store = p.second;
199  if (other_lb_id != lb_id) {
200  orphaned_store->Resume();
201  AssignOrphanedStore(orphaned_store.get(), lb_id);
202  }
203  }
204  }
205  // The first connected balancer will adopt the kInvalidLbId.
206  if (per_balancer_stores_.size() == 1) {
209  }
210 }
211 
213  auto it_store_for_gone_lb = per_balancer_stores_.find(lb_id);
214  GPR_ASSERT(it_store_for_gone_lb != per_balancer_stores_.end());
215  // Remove this closed stream from our records.
216  GPR_ASSERT(UnorderedMapOfSetEraseKeyValue(
217  load_key_to_receiving_lb_ids_, it_store_for_gone_lb->second->load_key(),
218  lb_id));
219  std::set<PerBalancerStore*> orphaned_stores =
220  UnorderedMapOfSetExtract(assigned_stores_, lb_id);
221  // The stores that were assigned to this balancer are orphaned now. They
222  // should be re-assigned to other balancers which are still receiving reports.
223  for (PerBalancerStore* orphaned_store : orphaned_stores) {
224  const std::string* new_receiver = nullptr;
225  auto it = load_key_to_receiving_lb_ids_.find(orphaned_store->load_key());
226  if (it != load_key_to_receiving_lb_ids_.end()) {
227  // First, try to pick from the active balancers with the same load key.
228  new_receiver = RandomElement(it->second);
229  } else if (!assigned_stores_.empty()) {
230  // If failed, pick from all the remaining active balancers.
231  new_receiver = &(RandomElement(assigned_stores_)->first);
232  }
233  if (new_receiver != nullptr) {
234  AssignOrphanedStore(orphaned_store, *new_receiver);
235  } else {
236  // Load data for an LB ID that can't be assigned to any stream should
237  // be dropped.
238  orphaned_store->Suspend();
239  }
240  }
241 }
242 
244  const std::string& lb_id) const {
245  return per_balancer_stores_.find(lb_id) != per_balancer_stores_.end()
246  ? per_balancer_stores_.find(lb_id)->second.get()
247  : nullptr;
248 }
249 
250 const std::set<PerBalancerStore*>* PerHostStore::GetAssignedStores(
251  const std::string& lb_id) const {
252  auto it = assigned_stores_.find(lb_id);
253  if (it == assigned_stores_.end()) return nullptr;
254  return &(it->second);
255 }
256 
258  const std::string& new_receiver) {
259  auto it = assigned_stores_.find(new_receiver);
260  GPR_ASSERT(it != assigned_stores_.end());
261  it->second.insert(orphaned_store);
263  "[PerHostStore %p] Re-assigned orphaned store (%p) with original LB"
264  " ID of %s to new receiver %s",
265  this, orphaned_store, orphaned_store->lb_id().c_str(),
266  new_receiver.c_str());
267 }
268 
270  const std::string& load_key) {
271  // The top-level caller (i.e., LoadReportService) should guarantee the
272  // lb_id is unique for each reporting stream.
274  GPR_ASSERT(assigned_stores_.find(lb_id) == assigned_stores_.end());
275  load_key_to_receiving_lb_ids_[load_key].insert(lb_id);
276  std::unique_ptr<PerBalancerStore> per_balancer_store(
277  new PerBalancerStore(lb_id, load_key));
278  assigned_stores_[lb_id] = {per_balancer_store.get()};
279  per_balancer_stores_[lb_id] = std::move(per_balancer_store);
280 }
281 
283  const string& hostname, const string& lb_id) const {
284  auto it = per_host_stores_.find(hostname);
285  if (it != per_host_stores_.end()) {
286  const PerHostStore& per_host_store = it->second;
287  return per_host_store.FindPerBalancerStore(lb_id);
288  } else {
289  return nullptr;
290  }
291 }
292 
294  const LoadRecordKey& key,
295  const LoadRecordValue& value) {
296  PerBalancerStore* per_balancer_store =
297  FindPerBalancerStore(hostname, key.lb_id());
298  if (per_balancer_store != nullptr) {
299  per_balancer_store->MergeRow(key, value);
300  return;
301  }
302  // Unknown LB ID. Track it until its number of in-progress calls drops to
303  // zero.
304  int64_t in_progress_delta = value.GetNumCallsInProgressDelta();
305  if (in_progress_delta != 0) {
306  auto it_tracker = unknown_balancer_id_trackers_.find(key.lb_id());
307  if (it_tracker == unknown_balancer_id_trackers_.end()) {
308  gpr_log(
309  GPR_DEBUG,
310  "[LoadDataStore %p] Start tracking unknown balancer (lb_id_: %s).",
311  this, key.lb_id().c_str());
313  {key.lb_id(), static_cast<uint64_t>(in_progress_delta)});
314  } else if ((it_tracker->second += in_progress_delta) == 0) {
315  unknown_balancer_id_trackers_.erase(it_tracker);
317  "[LoadDataStore %p] Stop tracking unknown balancer (lb_id_: %s).",
318  this, key.lb_id().c_str());
319  }
320  }
321 }
322 
323 const std::set<PerBalancerStore*>* LoadDataStore::GetAssignedStores(
324  const std::string& hostname, const std::string& lb_id) {
325  auto it = per_host_stores_.find(hostname);
326  if (it == per_host_stores_.end()) return nullptr;
327  return it->second.GetAssignedStores(lb_id);
328 }
329 
331  const std::string& lb_id,
332  const std::string& load_key) {
333  per_host_stores_[hostname].ReportStreamCreated(lb_id, load_key);
334 }
335 
337  const std::string& lb_id) {
338  auto it_per_host_store = per_host_stores_.find(hostname);
339  GPR_ASSERT(it_per_host_store != per_host_stores_.end());
340  it_per_host_store->second.ReportStreamClosed(lb_id);
341 }
342 
343 } // namespace load_reporter
344 } // namespace grpc
grpc::load_reporter::PerBalancerStore::MergeRow
void MergeRow(const LoadRecordKey &key, const LoadRecordValue &value)
Definition: load_data_store.cc:147
grpc::load_reporter::PerBalancerStore
Definition: load_data_store.h:197
GPR_INFO
#define GPR_INFO
Definition: include/grpc/impl/codegen/log.h:56
regen-readme.it
it
Definition: regen-readme.py:15
log.h
grpc::load_reporter::LoadDataStore::FindPerBalancerStore
PerBalancerStore * FindPerBalancerStore(const std::string &hostname, const std::string &lb_id) const
Definition: load_data_store.cc:282
grpc::load_reporter::LoadDataStore::ReportStreamCreated
void ReportStreamCreated(const std::string &hostname, const std::string &lb_id, const std::string &load_key)
Definition: load_data_store.cc:330
grpc::load_reporter::PerHostStore::FindPerBalancerStore
PerBalancerStore * FindPerBalancerStore(const std::string &lb_id) const
Definition: load_data_store.cc:243
grpc::load_reporter::PerHostStore
Definition: load_data_store.h:245
grpc::load_reporter::PerBalancerStore::last_reported_num_calls_in_progress_
uint64_t last_reported_num_calls_in_progress_
Definition: load_data_store.h:240
grpc::load_reporter::PerHostStore::per_balancer_stores_
std::unordered_map< std::string, std::unique_ptr< PerBalancerStore > > per_balancer_stores_
Definition: load_data_store.h:284
grpc
Definition: grpcpp/alarm.h:33
grpc::load_reporter::kIpv6AddressLength
constexpr size_t kIpv6AddressLength
Definition: src/cpp/server/load_reporter/constants.h:42
testing::internal::string
::std::string string
Definition: bloaty/third_party/protobuf/third_party/googletest/googletest/include/gtest/internal/gtest-port.h:881
grpc::load_reporter::PerBalancerStore::lb_id
const std::string & lb_id() const
Definition: load_data_store.h:230
grpc::load_reporter::LoadRecordKey::LoadRecordKey
LoadRecordKey(std::string lb_id, std::string lb_tag, std::string user_id, std::string client_ip_hex)
Definition: load_data_store.h:73
grpc::load_reporter::PerHostStore::AssignOrphanedStore
void AssignOrphanedStore(PerBalancerStore *orphaned_store, const std::string &new_receiver)
Definition: load_data_store.cc:257
grpc::load_reporter::PerBalancerStore::Resume
void Resume()
Definition: load_data_store.cc:175
grpc::load_reporter::LoadRecordValue
Definition: load_data_store.h:127
map
zval * map
Definition: php/ext/google/protobuf/encode_decode.c:480
grpc::load_reporter::kIpv4AddressLength
constexpr size_t kIpv4AddressLength
Definition: src/cpp/server/load_reporter/constants.h:41
uint32_t
unsigned int uint32_t
Definition: stdint-msvc2008.h:80
socket_utils.h
grpc::load_reporter::PerBalancerStore::load_record_map_
LoadRecordMap load_record_map_
Definition: load_data_store.h:238
grpc::load_reporter::PerHostStore::ReportStreamClosed
void ReportStreamClosed(const std::string &lb_id)
Definition: load_data_store.cc:212
absl::move
constexpr absl::remove_reference_t< T > && move(T &&t) noexcept
Definition: abseil-cpp/absl/utility/utility.h:221
GPR_ASSERT
#define GPR_ASSERT(x)
Definition: include/grpc/impl/codegen/log.h:94
int64_t
signed __int64 int64_t
Definition: stdint-msvc2008.h:89
grpc::load_reporter::PerBalancerStore::Suspend
void Suspend()
Definition: load_data_store.cc:169
gpr_log
GPRAPI void gpr_log(const char *file, int line, gpr_log_severity severity, const char *format,...) GPR_PRINT_FORMAT_CHECK(4
grpc::load_reporter::LoadDataStore::unknown_balancer_id_trackers_
std::unordered_map< std::string, uint64_t > unknown_balancer_id_trackers_
Definition: load_data_store.h:343
uint64_t
unsigned __int64 uint64_t
Definition: stdint-msvc2008.h:90
grpc::load_reporter::LoadRecordKey::lb_id_
std::string lb_id_
Definition: load_data_store.h:120
grpc::load_reporter::LoadDataStore::GetAssignedStores
const std::set< PerBalancerStore * > * GetAssignedStores(const string &hostname, const string &lb_id)
Definition: load_data_store.cc:323
grpc::load_reporter::LoadRecordKey::lb_tag_
std::string lb_tag_
Definition: load_data_store.h:121
grpc::load_reporter::LoadRecordValue::LoadRecordValue
LoadRecordValue(uint64_t start_count=0, uint64_t ok_count=0, uint64_t error_count=0, uint64_t bytes_sent=0, uint64_t bytes_recv=0, uint64_t latency_ms=0)
Definition: load_data_store.h:129
grpc_htonl
uint32_t grpc_htonl(uint32_t hostlong)
absl::compare_internal::value_type
int8_t value_type
Definition: abseil-cpp/absl/types/compare.h:45
grpc::load_reporter::LoadRecordKey::client_ip_hex_
std::string client_ip_hex_
Definition: load_data_store.h:123
advance
static void advance(upb_pbdecoder *d, size_t len)
Definition: bloaty/third_party/protobuf/php/ext/google/protobuf/upb.c:6553
GPR_ERROR
#define GPR_ERROR
Definition: include/grpc/impl/codegen/log.h:57
GPR_UNREACHABLE_CODE
#define GPR_UNREACHABLE_CODE(STATEMENT)
Definition: impl/codegen/port_platform.h:652
grpc::load_reporter::LoadRecordValue::call_metrics_
std::unordered_map< std::string, CallMetricValue > call_metrics_
Definition: load_data_store.h:193
stdint.h
grpc::load_reporter::LoadDataStore::per_host_stores_
std::unordered_map< std::string, PerHostStore > per_host_stores_
Definition: load_data_store.h:340
grpc::load_reporter::PerHostStore::ReportStreamCreated
void ReportStreamCreated(const std::string &lb_id, const std::string &load_key)
Definition: load_data_store.cc:186
value
const char * value
Definition: hpack_parser_table.cc:165
grpc::load_reporter::LoadDataStore::ReportStreamClosed
void ReportStreamClosed(const std::string &hostname, const std::string &lb_id)
Definition: load_data_store.cc:336
constants.h
key
const char * key
Definition: hpack_parser_table.cc:164
grpc::load_reporter::LoadRecordKey
Definition: load_data_store.h:71
grpc::load_reporter::PerBalancerStore::num_calls_in_progress_
uint64_t num_calls_in_progress_
Definition: load_data_store.h:239
grpc::load_reporter::LoadRecordKey::GetClientIpBytes
std::string GetClientIpBytes() const
Definition: load_data_store.cc:107
port_platform.h
std
Definition: grpcpp/impl/codegen/async_unary_call.h:407
grpc::load_reporter::PerBalancerStore::GetNumCallsInProgressForReport
uint64_t GetNumCallsInProgressForReport()
Definition: load_data_store.cc:180
grpc::load_reporter::kLbIdLength
constexpr size_t kLbIdLength
Definition: src/cpp/server/load_reporter/constants.h:40
cpp.gmock_class.set
set
Definition: bloaty/third_party/googletest/googlemock/scripts/generator/cpp/gmock_class.py:44
grpc::load_reporter::PerBalancerStore::suspended_
bool suspended_
Definition: load_data_store.h:241
GPR_DEBUG
#define GPR_DEBUG
Definition: include/grpc/impl/codegen/log.h:55
grpc::load_reporter::CallMetricValue
Definition: load_data_store.h:47
grpc::load_reporter::PerHostStore::SetUpForNewLbId
void SetUpForNewLbId(const std::string &lb_id, const std::string &load_key)
Definition: load_data_store.cc:269
load_data_store.h
grpc::load_reporter::PerHostStore::assigned_stores_
std::unordered_map< std::string, std::set< PerBalancerStore * > > assigned_stores_
Definition: load_data_store.h:291
grpc::load_reporter::PerHostStore::GetAssignedStores
const std::set< PerBalancerStore * > * GetAssignedStores(const std::string &lb_id) const
Definition: load_data_store.cc:250
grpc::load_reporter::kInvalidLbId
constexpr char kInvalidLbId[]
Definition: src/cpp/server/load_reporter/constants.h:44
container
static struct async_container * container
Definition: benchmark-million-async.c:33
i
uint64_t i
Definition: abseil-cpp/absl/container/btree_benchmark.cc:230
grpc::load_reporter::PerHostStore::load_key_to_receiving_lb_ids_
std::unordered_map< std::string, std::set< std::string > > load_key_to_receiving_lb_ids_
Definition: load_data_store.h:278
grpc::load_reporter::LoadDataStore::MergeRow
void MergeRow(const std::string &hostname, const LoadRecordKey &key, const LoadRecordValue &value)
Definition: load_data_store.cc:293


grpc
Author(s):
autogenerated on Thu Mar 13 2025 03:00:29