26 #include <gtest/gtest.h>
38 using ::grpc::load_reporter::CallMetricValue;
40 using ::grpc::load_reporter::LoadDataStore;
41 using ::grpc::load_reporter::LoadRecordKey;
42 using ::grpc::load_reporter::LoadRecordValue;
43 using ::grpc::load_reporter::PerBalancerStore;
53 bool PerBalancerStoresContains(
54 const LoadDataStore& load_data_store,
55 const std::set<PerBalancerStore*>* per_balancer_stores,
58 auto original_per_balancer_store =
59 load_data_store.FindPerBalancerStore(hostname, lb_id);
60 EXPECT_NE(original_per_balancer_store,
nullptr);
61 EXPECT_EQ(original_per_balancer_store->lb_id(), lb_id);
62 EXPECT_EQ(original_per_balancer_store->load_key(), load_key);
63 for (
auto per_balancer_store : *per_balancer_stores) {
64 if (per_balancer_store == original_per_balancer_store) {
95 using PerBalancerStoreTest = LoadDataStoreTest;
97 TEST_F(LoadDataStoreTest, AssignToSelf) {
98 LoadDataStore load_data_store;
100 auto assigned_stores = load_data_store.GetAssignedStores(
kHostname1,
kLbId1);
101 EXPECT_TRUE(PerBalancerStoresContains(load_data_store, assigned_stores,
105 TEST_F(LoadDataStoreTest, ReassignOrphanStores) {
106 LoadDataStore load_data_store;
113 auto assigned_to_lb_id_1 =
116 EXPECT_TRUE(PerBalancerStoresContains(load_data_store, assigned_to_lb_id_1,
118 EXPECT_TRUE(PerBalancerStoresContains(load_data_store, assigned_to_lb_id_1,
122 auto assigned_to_lb_id_3 =
126 EXPECT_TRUE(PerBalancerStoresContains(load_data_store, assigned_to_lb_id_3,
128 EXPECT_TRUE(PerBalancerStoresContains(load_data_store, assigned_to_lb_id_3,
130 EXPECT_TRUE(PerBalancerStoresContains(load_data_store, assigned_to_lb_id_3,
134 auto assigned_to_lb_id_4 =
138 EXPECT_FALSE(PerBalancerStoresContains(load_data_store, assigned_to_lb_id_4,
140 EXPECT_FALSE(PerBalancerStoresContains(load_data_store, assigned_to_lb_id_4,
142 EXPECT_FALSE(PerBalancerStoresContains(load_data_store, assigned_to_lb_id_4,
144 EXPECT_TRUE(PerBalancerStoresContains(load_data_store, assigned_to_lb_id_4,
148 TEST_F(LoadDataStoreTest, OrphanAssignmentIsSticky) {
149 LoadDataStore load_data_store;
150 std::set<std::string> active_lb_ids;
151 size_t num_lb_ids = 1000;
152 for (
size_t i = 0;
i < num_lb_ids; ++
i) {
154 active_lb_ids.insert(FormatLbId(i));
156 std::string orphaned_lb_id = FormatLbId(std::rand() % num_lb_ids);
157 load_data_store.ReportStreamClosed(
kHostname1, orphaned_lb_id);
158 active_lb_ids.erase(orphaned_lb_id);
161 for (
const auto& lb_id : active_lb_ids) {
162 if (PerBalancerStoresContains(
166 assigned_lb_id = lb_id;
173 for (
size_t _ = 0;
_ < 10; ++
_) {
175 for (
const auto& lb_id : active_lb_ids) {
176 if (lb_id != assigned_lb_id) {
177 lb_id_to_close = lb_id;
182 load_data_store.ReportStreamClosed(
kHostname1, lb_id_to_close);
183 active_lb_ids.erase(lb_id_to_close);
186 load_data_store.GetAssignedStores(
kHostname1, assigned_lb_id),
190 load_data_store.ReportStreamClosed(
kHostname1, assigned_lb_id);
191 active_lb_ids.erase(assigned_lb_id);
192 size_t orphaned_lb_id_occurences = 0;
193 for (
const auto& lb_id : active_lb_ids) {
194 if (PerBalancerStoresContains(
198 orphaned_lb_id_occurences++;
201 EXPECT_EQ(orphaned_lb_id_occurences, 1U);
204 TEST_F(LoadDataStoreTest, HostTemporarilyLoseAllStreams) {
205 LoadDataStore load_data_store;
208 auto store_lb_id_1 = load_data_store.FindPerBalancerStore(
kHostname1,
kLbId1);
209 auto store_invalid_lb_id_1 =
219 store_lb_id_1->MergeRow(
kKey1, LoadRecordValue());
220 store_invalid_lb_id_1->MergeRow(
kKey1, LoadRecordValue());
221 EXPECT_EQ(store_lb_id_1->load_record_map().size(), 0U);
222 EXPECT_EQ(store_invalid_lb_id_1->load_record_map().size(), 0U);
224 auto assigned_to_lb_id_2 =
226 EXPECT_EQ(assigned_to_lb_id_2->size(), 2U);
227 EXPECT_TRUE(PerBalancerStoresContains(load_data_store, assigned_to_lb_id_2,
229 EXPECT_TRUE(PerBalancerStoresContains(load_data_store, assigned_to_lb_id_2,
236 store_lb_id_1->MergeRow(
kKey1, LoadRecordValue());
237 store_invalid_lb_id_1->MergeRow(
kKey1, LoadRecordValue());
238 EXPECT_EQ(store_lb_id_1->load_record_map().size(), 1U);
239 EXPECT_EQ(store_invalid_lb_id_1->load_record_map().size(), 1U);
241 auto assigned_to_lb_id_3 =
243 EXPECT_EQ(assigned_to_lb_id_3->size(), 3U);
244 EXPECT_TRUE(PerBalancerStoresContains(load_data_store, assigned_to_lb_id_3,
246 EXPECT_TRUE(PerBalancerStoresContains(load_data_store, assigned_to_lb_id_3,
248 EXPECT_TRUE(PerBalancerStoresContains(load_data_store, assigned_to_lb_id_3,
252 TEST_F(LoadDataStoreTest, OneStorePerLbId) {
253 LoadDataStore load_data_store;
261 auto store_lb_id_1 = load_data_store.FindPerBalancerStore(
kHostname1,
kLbId1);
262 auto store_invalid_lb_id_1 =
267 EXPECT_NE(store_invalid_lb_id_1,
nullptr);
268 EXPECT_NE(store_lb_id_1, store_invalid_lb_id_1);
273 auto store_lb_id_3 = load_data_store.FindPerBalancerStore(
kHostname2,
kLbId3);
274 auto store_invalid_lb_id_2 =
277 EXPECT_NE(store_invalid_lb_id_2,
nullptr);
278 EXPECT_NE(store_lb_id_3, store_invalid_lb_id_2);
280 EXPECT_NE(store_lb_id_3, store_invalid_lb_id_1);
281 EXPECT_NE(store_invalid_lb_id_2, store_invalid_lb_id_1);
285 TEST_F(LoadDataStoreTest, ExactlyOnceAssignment) {
286 LoadDataStore load_data_store;
287 size_t num_create = 100;
288 size_t num_close = 50;
289 for (
size_t i = 0;
i < num_create; ++
i) {
292 for (
size_t i = 0;
i < num_close; ++
i) {
293 load_data_store.ReportStreamClosed(
kHostname1, FormatLbId(i));
295 std::set<std::string> reported_lb_ids;
296 for (
size_t i = num_close;
i < num_create; ++
i) {
297 for (
auto assigned_store :
298 *load_data_store.GetAssignedStores(
kHostname1, FormatLbId(i))) {
299 EXPECT_TRUE(reported_lb_ids.insert(assigned_store->lb_id()).second);
303 EXPECT_EQ(reported_lb_ids.size(), (num_create + 1));
307 TEST_F(LoadDataStoreTest, UnknownBalancerIdTracking) {
308 LoadDataStore load_data_store;
311 LoadRecordValue v1(192);
314 LoadRecordValue v2(23);
316 load_data_store.MergeRow(
319 LoadRecordValue v3(952);
320 load_data_store.MergeRow(
324 auto store_lb_id_1 = load_data_store.FindPerBalancerStore(
kHostname1,
kLbId1);
325 EXPECT_EQ(store_lb_id_1->load_record_map().size(), 1U);
326 EXPECT_EQ(store_lb_id_1->load_record_map().find(
kKey1)->second.start_count(),
328 EXPECT_EQ(store_lb_id_1->GetNumCallsInProgressForReport(), v1.start_count());
333 LoadRecordValue v4(0, v1.start_count());
335 EXPECT_EQ(store_lb_id_1->load_record_map().size(), 1U);
336 EXPECT_EQ(store_lb_id_1->load_record_map().find(
kKey1)->second.start_count(),
338 EXPECT_EQ(store_lb_id_1->load_record_map().find(
kKey1)->second.ok_count(),
340 EXPECT_EQ(store_lb_id_1->GetNumCallsInProgressForReport(), 0U);
343 LoadRecordValue v5(0, v2.start_count());
344 load_data_store.MergeRow(
348 LoadRecordValue v6(0, v3.start_count() / 2);
349 load_data_store.MergeRow(
354 TEST_F(PerBalancerStoreTest, Suspend) {
358 per_balancer_store.Suspend();
360 EXPECT_EQ(0U, per_balancer_store.load_record_map().size());
362 LoadRecordValue v1(139, 19);
363 per_balancer_store.MergeRow(
kKey1, v1);
364 EXPECT_EQ(0U, per_balancer_store.load_record_map().size());
366 per_balancer_store.Resume();
368 EXPECT_EQ(0U, per_balancer_store.load_record_map().size());
370 LoadRecordValue v2(23, 0, 51);
371 per_balancer_store.MergeRow(
kKey1, v2);
372 EXPECT_EQ(1U, per_balancer_store.load_record_map().size());
374 per_balancer_store.Suspend();
376 EXPECT_EQ(0U, per_balancer_store.load_record_map().size());
378 LoadRecordValue v3(62, 11);
379 per_balancer_store.MergeRow(
kKey1, v3);
380 EXPECT_EQ(0U, per_balancer_store.load_record_map().size());
382 per_balancer_store.Resume();
384 EXPECT_EQ(0U, per_balancer_store.load_record_map().size());
386 LoadRecordValue v4(225, 98);
387 per_balancer_store.MergeRow(
kKey1, v4);
388 EXPECT_EQ(1U, per_balancer_store.load_record_map().size());
390 EXPECT_EQ(per_balancer_store.GetNumCallsInProgressForReport(),
391 v1.start_count() - v1.ok_count() + v2.start_count() -
392 v2.error_count() + v3.start_count() - v3.ok_count() +
393 v4.start_count() - v4.ok_count());
396 TEST_F(PerBalancerStoreTest, DataAggregation) {
399 LoadRecordValue v1(992, 34, 13, 234, 164, 173467);
400 v1.InsertCallMetric(
kMetric1, CallMetricValue(3, 2773.2));
401 LoadRecordValue v2(4842, 213, 9, 393, 974, 1345);
402 v2.InsertCallMetric(
kMetric1, CallMetricValue(7, 25.234));
403 v2.InsertCallMetric(
kMetric2, CallMetricValue(2, 387.08));
405 LoadRecordValue v3(293, 55, 293 - 55, 28764, 5284, 5772);
406 v3.InsertCallMetric(
kMetric1, CallMetricValue(61, 3465.0));
407 v3.InsertCallMetric(
kMetric2, CallMetricValue(13, 672.0));
410 EXPECT_FALSE(per_balancer_store.IsNumCallsInProgressChangedSinceLastReport());
411 EXPECT_EQ(per_balancer_store.GetNumCallsInProgressForReport(),
412 num_calls_in_progress);
414 per_balancer_store.MergeRow(
kKey1, v1);
415 EXPECT_TRUE(per_balancer_store.IsNumCallsInProgressChangedSinceLastReport());
416 EXPECT_EQ(per_balancer_store.GetNumCallsInProgressForReport(),
417 num_calls_in_progress +=
418 (v1.start_count() - v1.ok_count() - v1.error_count()));
419 EXPECT_FALSE(per_balancer_store.IsNumCallsInProgressChangedSinceLastReport());
421 per_balancer_store.MergeRow(
kKey2, v2);
422 EXPECT_TRUE(per_balancer_store.IsNumCallsInProgressChangedSinceLastReport());
423 EXPECT_EQ(per_balancer_store.GetNumCallsInProgressForReport(),
424 num_calls_in_progress +=
425 (v2.start_count() - v2.ok_count() - v2.error_count()));
426 EXPECT_FALSE(per_balancer_store.IsNumCallsInProgressChangedSinceLastReport());
428 per_balancer_store.MergeRow(
kKey1, v3);
429 EXPECT_FALSE(per_balancer_store.IsNumCallsInProgressChangedSinceLastReport());
430 EXPECT_EQ(per_balancer_store.GetNumCallsInProgressForReport(),
431 num_calls_in_progress);
433 LoadRecordValue value_for_key1 =
434 per_balancer_store.load_record_map().find(
kKey1)->second;
435 EXPECT_EQ(value_for_key1.start_count(), v1.start_count() + v3.start_count());
436 EXPECT_EQ(value_for_key1.ok_count(), v1.ok_count() + v3.ok_count());
437 EXPECT_EQ(value_for_key1.error_count(), v1.error_count() + v3.error_count());
438 EXPECT_EQ(value_for_key1.bytes_sent(), v1.bytes_sent() + v3.bytes_sent());
439 EXPECT_EQ(value_for_key1.bytes_recv(), v1.bytes_recv() + v3.bytes_recv());
440 EXPECT_EQ(value_for_key1.latency_ms(), v1.latency_ms() + v3.latency_ms());
441 EXPECT_EQ(value_for_key1.call_metrics().size(), 2U);
443 v1.call_metrics().find(
kMetric1)->second.num_calls() +
444 v3.call_metrics().find(
kMetric1)->second.num_calls());
446 value_for_key1.call_metrics().find(
kMetric1)->second.total_metric_value(),
447 v1.call_metrics().find(
kMetric1)->second.total_metric_value() +
448 v3.call_metrics().find(
kMetric1)->second.total_metric_value());
450 v3.call_metrics().find(
kMetric2)->second.num_calls());
452 value_for_key1.call_metrics().find(
kMetric2)->second.total_metric_value(),
453 v3.call_metrics().find(
kMetric2)->second.total_metric_value());
455 LoadRecordValue value_for_key2 =
456 per_balancer_store.load_record_map().find(
kKey2)->second;
457 EXPECT_EQ(value_for_key2.start_count(), v2.start_count());
458 EXPECT_EQ(value_for_key2.ok_count(), v2.ok_count());
459 EXPECT_EQ(value_for_key2.error_count(), v2.error_count());
460 EXPECT_EQ(value_for_key2.bytes_sent(), v2.bytes_sent());
461 EXPECT_EQ(value_for_key2.bytes_recv(), v2.bytes_recv());
462 EXPECT_EQ(value_for_key2.latency_ms(), v2.latency_ms());
463 EXPECT_EQ(value_for_key2.call_metrics().size(), 2U);
465 v2.call_metrics().find(
kMetric1)->second.num_calls());
467 value_for_key2.call_metrics().find(
kMetric1)->second.total_metric_value(),
468 v2.call_metrics().find(
kMetric1)->second.total_metric_value());
470 v2.call_metrics().find(
kMetric2)->second.num_calls());
472 value_for_key2.call_metrics().find(
kMetric2)->second.total_metric_value(),
473 v2.call_metrics().find(
kMetric2)->second.total_metric_value());
480 int main(
int argc,
char** argv) {