33 #include <google/protobuf/duration.pb.h>
35 #include "opencensus/stats/internal/set_aggregation_window.h"
36 #include "opencensus/tags/tag_key.h"
44 namespace load_reporter {
59 ::opencensus::stats::ViewDescriptor()
62 .set_aggregation(::opencensus::stats::Aggregation::Sum())
67 "Delta count of calls started broken down by <token, host, "
69 ::opencensus::stats::SetAggregationWindow(
70 ::opencensus::stats::AggregationWindow::Delta(), &vd_start_count);
78 ::opencensus::stats::ViewDescriptor()
81 .set_aggregation(::opencensus::stats::Aggregation::Sum())
87 "Delta count of calls ended broken down by <token, host, "
89 ::opencensus::stats::SetAggregationWindow(
90 ::opencensus::stats::AggregationWindow::Delta(), &vd_end_count);
92 auto vd_end_bytes_sent =
93 ::opencensus::stats::ViewDescriptor()
96 .set_aggregation(::opencensus::stats::Aggregation::Sum())
102 "Delta sum of bytes sent broken down by <token, host, user_id, "
104 ::opencensus::stats::SetAggregationWindow(
105 ::opencensus::stats::AggregationWindow::Delta(), &vd_end_bytes_sent);
107 auto vd_end_bytes_received =
108 ::opencensus::stats::ViewDescriptor()
111 .set_aggregation(::opencensus::stats::Aggregation::Sum())
117 "Delta sum of bytes received broken down by <token, host, "
118 "user_id, status>.");
119 ::opencensus::stats::SetAggregationWindow(
120 ::opencensus::stats::AggregationWindow::Delta(), &vd_end_bytes_received);
122 auto vd_end_latency_ms =
123 ::opencensus::stats::ViewDescriptor()
126 .set_aggregation(::opencensus::stats::Aggregation::Sum())
132 "Delta sum of latency in ms broken down by <token, host, "
133 "user_id, status>.");
134 ::opencensus::stats::SetAggregationWindow(
135 ::opencensus::stats::AggregationWindow::Delta(), &vd_end_latency_ms);
138 auto vd_metric_call_count =
139 ::opencensus::stats::ViewDescriptor()
148 "Delta count of calls broken down by <token, host, user_id, "
150 ::opencensus::stats::SetAggregationWindow(
151 ::opencensus::stats::AggregationWindow::Delta(), &vd_metric_call_count);
153 auto vd_metric_value =
154 ::opencensus::stats::ViewDescriptor()
157 .set_aggregation(::opencensus::stats::Aggregation::Sum())
163 "Delta sum of call metric value broken down "
164 "by <token, host, user_id, metric_name>.");
165 ::opencensus::stats::SetAggregationWindow(
166 ::opencensus::stats::AggregationWindow::Delta(), &vd_metric_value);
171 const ViewDataMap& view_data_map,
const char* view_name,
172 size_t view_name_len,
const std::vector<std::string>& tag_values) {
173 auto it_vd = view_data_map.find(
std::string(view_name, view_name_len));
176 ::opencensus::stats::ViewData::Type::kDouble);
177 auto it_row = it_vd->second.double_data().find(tag_values);
178 GPR_ASSERT(it_row != it_vd->second.double_data().end());
179 return it_row->second;
183 const ViewDataMap& view_data_map,
const char* view_name,
184 size_t view_name_len,
const std::vector<std::string>& tag_values) {
185 auto it_vd = view_data_map.find(
std::string(view_name, view_name_len));
188 ::opencensus::stats::ViewData::Type::kInt64);
189 auto it_row = it_vd->second.int_data().find(tag_values);
190 GPR_ASSERT(it_row != it_vd->second.int_data().end());
192 return it_row->second;
198 const ::opencensus::stats::ViewDescriptor& vd = p.second;
201 view_map_.emplace(std::piecewise_construct,
202 std::forward_as_tuple(view_name),
203 std::forward_as_tuple(vd));
213 if (view.IsValid()) {
214 view_data_map.emplace(view_name, view.GetData());
220 "[CVP %p] Can't fetch view data because view is invalid (view: %s).",
221 this, view_name.c_str());
224 return view_data_map;
239 snprintf(
buf,
sizeof(
buf),
"%08" PRIx64, lb_id);
250 ::grpc::lb::v1::LoadBalancingFeedback
261 return grpc::lb::v1::LoadBalancingFeedback::default_instance();
266 while (std::distance(oldest, newest) > 0 &&
267 (newest->cpu_limit == 0 || oldest->cpu_limit == 0)) {
270 if (newest->cpu_limit == 0) --newest;
271 if (oldest->cpu_limit == 0) ++oldest;
273 if (std::distance(oldest, newest) < 1 ||
274 oldest->end_time == newest->end_time ||
275 newest->cpu_limit == oldest->cpu_limit) {
276 return grpc::lb::v1::LoadBalancingFeedback::default_instance();
280 for (
auto p = newest; p != oldest; --p) {
286 double cpu_usage = newest->cpu_usage - oldest->cpu_usage;
287 double cpu_limit = newest->cpu_limit - oldest->cpu_limit;
288 std::chrono::duration<double> duration_seconds =
289 newest->end_time - oldest->end_time;
291 grpc::lb::v1::LoadBalancingFeedback feedback;
292 feedback.set_server_utilization(
static_cast<float>(cpu_usage / cpu_limit));
293 feedback.set_calls_per_second(
294 static_cast<float>(rpcs / duration_seconds.count()));
295 feedback.set_errors_per_second(
296 static_cast<float>(
errors / duration_seconds.count()));
309 GPR_ASSERT(!per_balancer_store->IsSuspended());
310 if (!per_balancer_store->load_record_map().empty()) {
311 for (
const auto& p : per_balancer_store->load_record_map()) {
312 const auto&
key = p.first;
313 const auto&
value = p.second;
315 load->set_load_balance_tag(
key.lb_tag());
316 load->set_user_id(
key.user_id());
317 load->set_client_ip_address(
key.GetClientIpBytes());
319 load->set_num_calls_finished_without_error(
321 load->set_num_calls_finished_with_error(
324 load->set_total_bytes_received(
326 load->mutable_total_latency()->set_seconds(
328 load->mutable_total_latency()->set_nanos(
329 (
static_cast<int32_t>(
value.latency_ms()) % 1000) * 1000000);
330 for (
const auto& p :
value.call_metrics()) {
333 auto call_metric_data =
load->add_metric_data();
334 call_metric_data->set_metric_name(metric_name);
335 call_metric_data->set_num_calls_finished_with_metric(
337 call_metric_data->set_total_metric_value(
340 if (per_balancer_store->lb_id() != lb_id) {
346 per_balancer_store->ClearLoadRecordMap();
348 if (per_balancer_store->IsNumCallsInProgressChangedSinceLastReport()) {
350 load->set_num_calls_in_progress(
351 per_balancer_store->GetNumCallsInProgressForReport());
352 if (per_balancer_store->lb_id() != lb_id) {
365 load->set_load_key_unknown(
true);
369 load->mutable_orphaned_load_identifier()->set_load_key(
371 load->mutable_orphaned_load_identifier()->set_load_balancer_id(
372 per_balancer_store.
lb_id());
386 cpu_stats.first, cpu_stats.second);
395 "[LR %p] Report stream created (host: %s, LB ID: %s, load key: %s).",
396 this, hostname.c_str(), lb_id.c_str(), load_key.c_str());
403 gpr_log(
GPR_INFO,
"[LR %p] Report stream closed (host: %s, LB ID: %s).",
this,
404 hostname.c_str(), lb_id.c_str());
410 if (
it != view_data_map.end()) {
411 for (
const auto& p :
it->second.int_data()) {
412 const std::vector<std::string>& tag_values = p.first;
414 const std::string& client_ip_and_token = tag_values[0];
432 if (
it != view_data_map.end()) {
433 for (
const auto& p :
it->second.int_data()) {
434 const std::vector<std::string>& tag_values = p.first;
436 const std::string& client_ip_and_token = tag_values[0];
443 if (client_ip_and_token.empty()) {
445 "Skipping processing Opencensus record with empty "
446 "client_ip_and_token tag.");
462 total_end_count += end_count;
464 ok_count = end_count;
466 error_count = end_count;
467 total_error_count += end_count;
483 if (
it != view_data_map.end()) {
484 for (
const auto& p :
it->second.int_data()) {
485 const std::vector<std::string>& tag_values = p.first;
486 const int64_t num_calls = p.second;
487 const std::string& client_ip_and_token = tag_values[0];
492 const double total_metric_value =
497 metric_name,
static_cast<uint64_t>(num_calls), total_metric_value);
508 "[LR %p] Starts fetching Census view data and sampling LB feedback "