26 #include <gtest/gtest.h> 
   38 using ::grpc::load_reporter::CallMetricValue;
 
   40 using ::grpc::load_reporter::LoadDataStore;
 
   41 using ::grpc::load_reporter::LoadRecordKey;
 
   42 using ::grpc::load_reporter::LoadRecordValue;
 
   43 using ::grpc::load_reporter::PerBalancerStore;
 
   53   bool PerBalancerStoresContains(
 
   54       const LoadDataStore& load_data_store,
 
   55       const std::set<PerBalancerStore*>* per_balancer_stores,
 
   58     auto original_per_balancer_store =
 
   59         load_data_store.FindPerBalancerStore(hostname, lb_id);
 
   60     EXPECT_NE(original_per_balancer_store, 
nullptr);
 
   61     EXPECT_EQ(original_per_balancer_store->lb_id(), lb_id);
 
   62     EXPECT_EQ(original_per_balancer_store->load_key(), load_key);
 
   63     for (
auto per_balancer_store : *per_balancer_stores) {
 
   64       if (per_balancer_store == original_per_balancer_store) {
 
   95 using PerBalancerStoreTest = LoadDataStoreTest;
 
   97 TEST_F(LoadDataStoreTest, AssignToSelf) {
 
   98   LoadDataStore load_data_store;
 
  100   auto assigned_stores = load_data_store.GetAssignedStores(
kHostname1, 
kLbId1);
 
  101   EXPECT_TRUE(PerBalancerStoresContains(load_data_store, assigned_stores,
 
  105 TEST_F(LoadDataStoreTest, ReassignOrphanStores) {
 
  106   LoadDataStore load_data_store;
 
  113   auto assigned_to_lb_id_1 =
 
  116   EXPECT_TRUE(PerBalancerStoresContains(load_data_store, assigned_to_lb_id_1,
 
  118   EXPECT_TRUE(PerBalancerStoresContains(load_data_store, assigned_to_lb_id_1,
 
  122   auto assigned_to_lb_id_3 =
 
  126   EXPECT_TRUE(PerBalancerStoresContains(load_data_store, assigned_to_lb_id_3,
 
  128   EXPECT_TRUE(PerBalancerStoresContains(load_data_store, assigned_to_lb_id_3,
 
  130   EXPECT_TRUE(PerBalancerStoresContains(load_data_store, assigned_to_lb_id_3,
 
  134   auto assigned_to_lb_id_4 =
 
  138   EXPECT_FALSE(PerBalancerStoresContains(load_data_store, assigned_to_lb_id_4,
 
  140   EXPECT_FALSE(PerBalancerStoresContains(load_data_store, assigned_to_lb_id_4,
 
  142   EXPECT_FALSE(PerBalancerStoresContains(load_data_store, assigned_to_lb_id_4,
 
  144   EXPECT_TRUE(PerBalancerStoresContains(load_data_store, assigned_to_lb_id_4,
 
  148 TEST_F(LoadDataStoreTest, OrphanAssignmentIsSticky) {
 
  149   LoadDataStore load_data_store;
 
  150   std::set<std::string> active_lb_ids;
 
  151   size_t num_lb_ids = 1000;
 
  152   for (
size_t i = 0; 
i < num_lb_ids; ++
i) {
 
  154     active_lb_ids.insert(FormatLbId(i));
 
  156   std::string orphaned_lb_id = FormatLbId(std::rand() % num_lb_ids);
 
  157   load_data_store.ReportStreamClosed(
kHostname1, orphaned_lb_id);
 
  158   active_lb_ids.erase(orphaned_lb_id);
 
  161   for (
const auto& lb_id : active_lb_ids) {
 
  162     if (PerBalancerStoresContains(
 
  166       assigned_lb_id = lb_id;
 
  173   for (
size_t _ = 0; 
_ < 10; ++
_) {
 
  175     for (
const auto& lb_id : active_lb_ids) {
 
  176       if (lb_id != assigned_lb_id) {
 
  177         lb_id_to_close = lb_id;
 
  182     load_data_store.ReportStreamClosed(
kHostname1, lb_id_to_close);
 
  183     active_lb_ids.erase(lb_id_to_close);
 
  186         load_data_store.GetAssignedStores(
kHostname1, assigned_lb_id),
 
  190   load_data_store.ReportStreamClosed(
kHostname1, assigned_lb_id);
 
  191   active_lb_ids.erase(assigned_lb_id);
 
  192   size_t orphaned_lb_id_occurences = 0;
 
  193   for (
const auto& lb_id : active_lb_ids) {
 
  194     if (PerBalancerStoresContains(
 
  198       orphaned_lb_id_occurences++;
 
  201   EXPECT_EQ(orphaned_lb_id_occurences, 1U);
 
  204 TEST_F(LoadDataStoreTest, HostTemporarilyLoseAllStreams) {
 
  205   LoadDataStore load_data_store;
 
  208   auto store_lb_id_1 = load_data_store.FindPerBalancerStore(
kHostname1, 
kLbId1);
 
  209   auto store_invalid_lb_id_1 =
 
  219   store_lb_id_1->MergeRow(
kKey1, LoadRecordValue());
 
  220   store_invalid_lb_id_1->MergeRow(
kKey1, LoadRecordValue());
 
  221   EXPECT_EQ(store_lb_id_1->load_record_map().size(), 0U);
 
  222   EXPECT_EQ(store_invalid_lb_id_1->load_record_map().size(), 0U);
 
  224   auto assigned_to_lb_id_2 =
 
  226   EXPECT_EQ(assigned_to_lb_id_2->size(), 2U);
 
  227   EXPECT_TRUE(PerBalancerStoresContains(load_data_store, assigned_to_lb_id_2,
 
  229   EXPECT_TRUE(PerBalancerStoresContains(load_data_store, assigned_to_lb_id_2,
 
  236   store_lb_id_1->MergeRow(
kKey1, LoadRecordValue());
 
  237   store_invalid_lb_id_1->MergeRow(
kKey1, LoadRecordValue());
 
  238   EXPECT_EQ(store_lb_id_1->load_record_map().size(), 1U);
 
  239   EXPECT_EQ(store_invalid_lb_id_1->load_record_map().size(), 1U);
 
  241   auto assigned_to_lb_id_3 =
 
  243   EXPECT_EQ(assigned_to_lb_id_3->size(), 3U);
 
  244   EXPECT_TRUE(PerBalancerStoresContains(load_data_store, assigned_to_lb_id_3,
 
  246   EXPECT_TRUE(PerBalancerStoresContains(load_data_store, assigned_to_lb_id_3,
 
  248   EXPECT_TRUE(PerBalancerStoresContains(load_data_store, assigned_to_lb_id_3,
 
  252 TEST_F(LoadDataStoreTest, OneStorePerLbId) {
 
  253   LoadDataStore load_data_store;
 
  261   auto store_lb_id_1 = load_data_store.FindPerBalancerStore(
kHostname1, 
kLbId1);
 
  262   auto store_invalid_lb_id_1 =
 
  267   EXPECT_NE(store_invalid_lb_id_1, 
nullptr);
 
  268   EXPECT_NE(store_lb_id_1, store_invalid_lb_id_1);
 
  273   auto store_lb_id_3 = load_data_store.FindPerBalancerStore(
kHostname2, 
kLbId3);
 
  274   auto store_invalid_lb_id_2 =
 
  277   EXPECT_NE(store_invalid_lb_id_2, 
nullptr);
 
  278   EXPECT_NE(store_lb_id_3, store_invalid_lb_id_2);
 
  280   EXPECT_NE(store_lb_id_3, store_invalid_lb_id_1);
 
  281   EXPECT_NE(store_invalid_lb_id_2, store_invalid_lb_id_1);
 
  285 TEST_F(LoadDataStoreTest, ExactlyOnceAssignment) {
 
  286   LoadDataStore load_data_store;
 
  287   size_t num_create = 100;
 
  288   size_t num_close = 50;
 
  289   for (
size_t i = 0; 
i < num_create; ++
i) {
 
  292   for (
size_t i = 0; 
i < num_close; ++
i) {
 
  293     load_data_store.ReportStreamClosed(
kHostname1, FormatLbId(i));
 
  295   std::set<std::string> reported_lb_ids;
 
  296   for (
size_t i = num_close; 
i < num_create; ++
i) {
 
  297     for (
auto assigned_store :
 
  298          *load_data_store.GetAssignedStores(
kHostname1, FormatLbId(i))) {
 
  299       EXPECT_TRUE(reported_lb_ids.insert(assigned_store->lb_id()).second);
 
  303   EXPECT_EQ(reported_lb_ids.size(), (num_create + 1));
 
  307 TEST_F(LoadDataStoreTest, UnknownBalancerIdTracking) {
 
  308   LoadDataStore load_data_store;
 
  311   LoadRecordValue v1(192);
 
  314   LoadRecordValue v2(23);
 
  316   load_data_store.MergeRow(
 
  319   LoadRecordValue v3(952);
 
  320   load_data_store.MergeRow(
 
  324   auto store_lb_id_1 = load_data_store.FindPerBalancerStore(
kHostname1, 
kLbId1);
 
  325   EXPECT_EQ(store_lb_id_1->load_record_map().size(), 1U);
 
  326   EXPECT_EQ(store_lb_id_1->load_record_map().find(
kKey1)->second.start_count(),
 
  328   EXPECT_EQ(store_lb_id_1->GetNumCallsInProgressForReport(), v1.start_count());
 
  333   LoadRecordValue v4(0, v1.start_count());
 
  335   EXPECT_EQ(store_lb_id_1->load_record_map().size(), 1U);
 
  336   EXPECT_EQ(store_lb_id_1->load_record_map().find(
kKey1)->second.start_count(),
 
  338   EXPECT_EQ(store_lb_id_1->load_record_map().find(
kKey1)->second.ok_count(),
 
  340   EXPECT_EQ(store_lb_id_1->GetNumCallsInProgressForReport(), 0U);
 
  343   LoadRecordValue v5(0, v2.start_count());
 
  344   load_data_store.MergeRow(
 
  348   LoadRecordValue v6(0, v3.start_count() / 2);
 
  349   load_data_store.MergeRow(
 
  354 TEST_F(PerBalancerStoreTest, Suspend) {
 
  358   per_balancer_store.Suspend();
 
  360   EXPECT_EQ(0U, per_balancer_store.load_record_map().size());
 
  362   LoadRecordValue v1(139, 19);
 
  363   per_balancer_store.MergeRow(
kKey1, v1);
 
  364   EXPECT_EQ(0U, per_balancer_store.load_record_map().size());
 
  366   per_balancer_store.Resume();
 
  368   EXPECT_EQ(0U, per_balancer_store.load_record_map().size());
 
  370   LoadRecordValue v2(23, 0, 51);
 
  371   per_balancer_store.MergeRow(
kKey1, v2);
 
  372   EXPECT_EQ(1U, per_balancer_store.load_record_map().size());
 
  374   per_balancer_store.Suspend();
 
  376   EXPECT_EQ(0U, per_balancer_store.load_record_map().size());
 
  378   LoadRecordValue v3(62, 11);
 
  379   per_balancer_store.MergeRow(
kKey1, v3);
 
  380   EXPECT_EQ(0U, per_balancer_store.load_record_map().size());
 
  382   per_balancer_store.Resume();
 
  384   EXPECT_EQ(0U, per_balancer_store.load_record_map().size());
 
  386   LoadRecordValue v4(225, 98);
 
  387   per_balancer_store.MergeRow(
kKey1, v4);
 
  388   EXPECT_EQ(1U, per_balancer_store.load_record_map().size());
 
  390   EXPECT_EQ(per_balancer_store.GetNumCallsInProgressForReport(),
 
  391             v1.start_count() - v1.ok_count() + v2.start_count() -
 
  392                 v2.error_count() + v3.start_count() - v3.ok_count() +
 
  393                 v4.start_count() - v4.ok_count());
 
  396 TEST_F(PerBalancerStoreTest, DataAggregation) {
 
  399   LoadRecordValue v1(992, 34, 13, 234, 164, 173467);
 
  400   v1.InsertCallMetric(
kMetric1, CallMetricValue(3, 2773.2));
 
  401   LoadRecordValue v2(4842, 213, 9, 393, 974, 1345);
 
  402   v2.InsertCallMetric(
kMetric1, CallMetricValue(7, 25.234));
 
  403   v2.InsertCallMetric(
kMetric2, CallMetricValue(2, 387.08));
 
  405   LoadRecordValue v3(293, 55, 293 - 55, 28764, 5284, 5772);
 
  406   v3.InsertCallMetric(
kMetric1, CallMetricValue(61, 3465.0));
 
  407   v3.InsertCallMetric(
kMetric2, CallMetricValue(13, 672.0));
 
  410   EXPECT_FALSE(per_balancer_store.IsNumCallsInProgressChangedSinceLastReport());
 
  411   EXPECT_EQ(per_balancer_store.GetNumCallsInProgressForReport(),
 
  412             num_calls_in_progress);
 
  414   per_balancer_store.MergeRow(
kKey1, v1);
 
  415   EXPECT_TRUE(per_balancer_store.IsNumCallsInProgressChangedSinceLastReport());
 
  416   EXPECT_EQ(per_balancer_store.GetNumCallsInProgressForReport(),
 
  417             num_calls_in_progress +=
 
  418             (v1.start_count() - v1.ok_count() - v1.error_count()));
 
  419   EXPECT_FALSE(per_balancer_store.IsNumCallsInProgressChangedSinceLastReport());
 
  421   per_balancer_store.MergeRow(
kKey2, v2);
 
  422   EXPECT_TRUE(per_balancer_store.IsNumCallsInProgressChangedSinceLastReport());
 
  423   EXPECT_EQ(per_balancer_store.GetNumCallsInProgressForReport(),
 
  424             num_calls_in_progress +=
 
  425             (v2.start_count() - v2.ok_count() - v2.error_count()));
 
  426   EXPECT_FALSE(per_balancer_store.IsNumCallsInProgressChangedSinceLastReport());
 
  428   per_balancer_store.MergeRow(
kKey1, v3);
 
  429   EXPECT_FALSE(per_balancer_store.IsNumCallsInProgressChangedSinceLastReport());
 
  430   EXPECT_EQ(per_balancer_store.GetNumCallsInProgressForReport(),
 
  431             num_calls_in_progress);
 
  433   LoadRecordValue value_for_key1 =
 
  434       per_balancer_store.load_record_map().find(
kKey1)->second;
 
  435   EXPECT_EQ(value_for_key1.start_count(), v1.start_count() + v3.start_count());
 
  436   EXPECT_EQ(value_for_key1.ok_count(), v1.ok_count() + v3.ok_count());
 
  437   EXPECT_EQ(value_for_key1.error_count(), v1.error_count() + v3.error_count());
 
  438   EXPECT_EQ(value_for_key1.bytes_sent(), v1.bytes_sent() + v3.bytes_sent());
 
  439   EXPECT_EQ(value_for_key1.bytes_recv(), v1.bytes_recv() + v3.bytes_recv());
 
  440   EXPECT_EQ(value_for_key1.latency_ms(), v1.latency_ms() + v3.latency_ms());
 
  441   EXPECT_EQ(value_for_key1.call_metrics().size(), 2U);
 
  443             v1.call_metrics().find(
kMetric1)->second.num_calls() +
 
  444                 v3.call_metrics().find(
kMetric1)->second.num_calls());
 
  446       value_for_key1.call_metrics().find(
kMetric1)->second.total_metric_value(),
 
  447       v1.call_metrics().find(
kMetric1)->second.total_metric_value() +
 
  448           v3.call_metrics().find(
kMetric1)->second.total_metric_value());
 
  450             v3.call_metrics().find(
kMetric2)->second.num_calls());
 
  452       value_for_key1.call_metrics().find(
kMetric2)->second.total_metric_value(),
 
  453       v3.call_metrics().find(
kMetric2)->second.total_metric_value());
 
  455   LoadRecordValue value_for_key2 =
 
  456       per_balancer_store.load_record_map().find(
kKey2)->second;
 
  457   EXPECT_EQ(value_for_key2.start_count(), v2.start_count());
 
  458   EXPECT_EQ(value_for_key2.ok_count(), v2.ok_count());
 
  459   EXPECT_EQ(value_for_key2.error_count(), v2.error_count());
 
  460   EXPECT_EQ(value_for_key2.bytes_sent(), v2.bytes_sent());
 
  461   EXPECT_EQ(value_for_key2.bytes_recv(), v2.bytes_recv());
 
  462   EXPECT_EQ(value_for_key2.latency_ms(), v2.latency_ms());
 
  463   EXPECT_EQ(value_for_key2.call_metrics().size(), 2U);
 
  465             v2.call_metrics().find(
kMetric1)->second.num_calls());
 
  467       value_for_key2.call_metrics().find(
kMetric1)->second.total_metric_value(),
 
  468       v2.call_metrics().find(
kMetric1)->second.total_metric_value());
 
  470             v2.call_metrics().find(
kMetric2)->second.num_calls());
 
  472       value_for_key2.call_metrics().find(
kMetric2)->second.total_metric_value(),
 
  473       v2.call_metrics().find(
kMetric2)->second.total_metric_value());
 
  480 int main(
int argc, 
char** argv) {