29 #include <unordered_map>
37 namespace load_reporter {
46 template <
typename K,
typename V>
47 bool UnorderedMapOfSetEraseKeyValue(std::unordered_map<K, std::set<V>>&
map,
50 if (
it !=
map.end()) {
51 size_t erased =
it->second.erase(
value);
52 if (
it->second.empty()) {
63 template <
typename K,
typename V>
64 std::set<V> UnorderedMapOfSetExtract(std::unordered_map<K, std::set<V>>&
map,
67 if (
it !=
map.end()) {
88 : user_id_(
std::
move(user_id)) {
91 GPR_ASSERT(sscanf(client_ip_and_token.substr(0, 2).c_str(),
"%d",
97 cur_pos += ip_hex_size;
98 if (client_ip_and_token.size() - cur_pos <
kLbIdLength) {
114 "Can't parse client IP (%s) from a hex string to an integer.",
119 return std::string(
reinterpret_cast<const char*
>(&ip_bytes),
123 for (
size_t i = 0;
i < 4; ++
i) {
125 ip_bytes +
i) != 1) {
128 "Can't parse client IP part (%s) from a hex string to an integer.",
134 return std::string(
reinterpret_cast<const char*
>(ip_bytes),
142 double total_metric_value) {
153 "[PerBalancerStore %p] Load data merged (Key: %s, Value: %s).",
154 this,
key.ToString().c_str(),
value.ToString().c_str());
157 "[PerBalancerStore %p] Load data dropped (Key: %s, Value: %s).",
158 this,
key.ToString().c_str(),
value.ToString().c_str());
164 value.GetNumCallsInProgressDelta() >=
198 const std::unique_ptr<PerBalancerStore>& orphaned_store = p.second;
199 if (other_lb_id != lb_id) {
200 orphaned_store->Resume();
219 std::set<PerBalancerStore*> orphaned_stores =
228 new_receiver = RandomElement(
it->second);
233 if (new_receiver !=
nullptr) {
238 orphaned_store->Suspend();
254 return &(
it->second);
261 it->second.insert(orphaned_store);
263 "[PerHostStore %p] Re-assigned orphaned store (%p) with original LB"
264 " ID of %s to new receiver %s",
265 this, orphaned_store, orphaned_store->
lb_id().c_str(),
266 new_receiver.c_str());
276 std::unique_ptr<PerBalancerStore> per_balancer_store(
283 const string& hostname,
const string& lb_id)
const {
298 if (per_balancer_store !=
nullptr) {
304 int64_t in_progress_delta =
value.GetNumCallsInProgressDelta();
305 if (in_progress_delta != 0) {
310 "[LoadDataStore %p] Start tracking unknown balancer (lb_id_: %s).",
311 this,
key.lb_id().c_str());
313 {
key.lb_id(),
static_cast<uint64_t>(in_progress_delta)});
314 }
else if ((it_tracker->second += in_progress_delta) == 0) {
317 "[LoadDataStore %p] Stop tracking unknown balancer (lb_id_: %s).",
318 this,
key.lb_id().c_str());
327 return it->second.GetAssignedStores(lb_id);
340 it_per_host_store->second.ReportStreamClosed(lb_id);