25 #include <google/protobuf/duration.pb.h>
28 #include "absl/memory/memory.h"
37 namespace load_reporter {
46 std::unique_ptr<ServerCompletionQueue>
cq)
49 absl::make_unique<grpc_core::Thread>(
"server_load_reporting",
Work,
this);
50 std::unique_ptr<CpuStatsProvider> cpu_stats_provider =
nullptr;
51 #if defined(GPR_LINUX) || defined(GPR_WINDOWS) || defined(GPR_APPLE)
52 cpu_stats_provider = absl::make_unique<CpuStatsProviderDefaultImpl>();
74 auto next_fetch_and_sample_time =
103 service->FetchAndSample(
true );
109 service->load_reporter_.get());
132 std::shared_ptr<ReportLoadHandler>
handler =
133 std::make_shared<ReportLoadHandler>(
cq,
service, load_reporter);
137 if (
service->shutdown_)
return;
138 p->on_done_notified_ =
140 std::placeholders::_1, std::placeholders::_2),
144 std::placeholders::_1, std::placeholders::_2),
146 p->ctx_.AsyncNotifyWhenDone(&p->on_done_notified_);
147 service->RequestReportLoad(&p->ctx_, &p->stream_,
cq,
cq,
159 call_status_(WAITING_FOR_DELIVERY) {}
162 std::shared_ptr<ReportLoadHandler>
self,
bool ok) {
164 call_status_ = DELIVERED;
170 GPR_ASSERT(on_done_notified_.ReleaseHandler() !=
nullptr);
189 std::placeholders::_1, std::placeholders::_2),
196 "[LRS %p] Call request delivered (lb_id_: %s, handler: %p). "
197 "Start reading the initial request...",
202 std::shared_ptr<ReportLoadHandler>
self,
bool ok) {
204 if (!
ok && call_status_ < INITIAL_REQUEST_RECEIVED) {
207 "[LRS %p] Failed reading the initial request from the stream "
208 "(lb_id_: %s, handler: %p, done_notified: %d, is_cancelled: %d).",
209 service_, lb_id_.c_str(),
this,
static_cast<int>(done_notified_),
210 static_cast<int>(is_cancelled_));
216 if (call_status_ < INITIAL_REQUEST_RECEIVED) {
217 if (!
request_.has_initial_request()) {
220 call_status_ = INITIAL_REQUEST_RECEIVED;
221 const auto& initial_request =
request_.initial_request();
222 load_balanced_hostname_ = initial_request.load_balanced_hostname();
223 load_key_ = initial_request.load_key();
224 load_reporter_->ReportStreamCreated(load_balanced_hostname_, lb_id_,
226 const auto& load_report_interval = initial_request.load_report_interval();
227 load_report_interval_ms_ =
228 static_cast<unsigned long>(load_report_interval.seconds() * 1000 +
229 load_report_interval.nanos() / 1000);
231 "[LRS %p] Initial request received. Start load reporting (load "
232 "balanced host: %s, interval: %" PRIu64
233 " ms, lb_id_: %s, handler: %p)...",
234 service_, load_balanced_hostname_.c_str(),
235 load_report_interval_ms_, lb_id_.c_str(),
this);
236 SendReport(
self,
true );
247 std::placeholders::_1, std::placeholders::_2),
255 "[LRS %p] Another request received (lb_id_: %s, handler: %p).",
262 std::shared_ptr<ReportLoadHandler>
self,
bool ok) {
279 std::placeholders::_1, std::placeholders::_2),
283 next_report_alarm_ = absl::make_unique<Alarm>();
284 next_report_alarm_->Set(
cq_, next_report_time, &next_outbound_);
287 "[LRS %p] Next load report scheduled (lb_id_: %s, handler: %p).",
292 std::shared_ptr<ReportLoadHandler>
self,
bool ok) {
297 grpc::lb::v1::LoadReportResponse
response;
298 auto loads =
load_reporter_->GenerateLoads(load_balanced_hostname_, lb_id_);
299 response.mutable_load()->Swap(&loads);
301 response.mutable_load_balancing_feedback()->Swap(&feedback);
302 if (call_status_ < INITIAL_RESPONSE_SENT) {
303 auto initial_response =
response.mutable_initial_response();
304 initial_response->set_load_balancer_id(lb_id_);
305 initial_response->set_implementation_id(
306 grpc::lb::v1::InitialLoadReportResponse::CPP);
307 initial_response->set_server_version(
kVersion);
308 call_status_ = INITIAL_RESPONSE_SENT;
319 std::placeholders::_1, std::placeholders::_2),
323 "[LRS %p] Sending load report (lb_id_: %s, handler: %p, loads "
330 std::shared_ptr<ReportLoadHandler>
self,
bool ok) {
332 done_notified_ =
true;
333 if (
ctx_.IsCancelled()) {
334 is_cancelled_ =
true;
337 "[LRS %p] Load reporting call is notified done (handler: %p, "
338 "is_cancelled: %d).",
339 service_,
this,
static_cast<int>(is_cancelled_));
344 std::shared_ptr<ReportLoadHandler>
self,
const char* reason) {
347 "[LRS %p] Shutting down the handler (lb_id_: %s, handler: %p, "
349 service_, lb_id_.c_str(),
this, reason);
351 if (call_status_ >= INITIAL_REQUEST_RECEIVED) {
352 load_reporter_->ReportStreamClosed(load_balanced_hostname_, lb_id_);
353 next_report_alarm_->Cancel();
358 if (call_status_ >= DELIVERED && call_status_ < FINISH_CALLED) {
363 std::placeholders::_1, std::placeholders::_2),
369 call_status_ = FINISH_CALLED;
376 std::shared_ptr<ReportLoadHandler> ,
bool ok) {
379 "[LRS %p] Load reporting finished (lb_id_: %s, handler: %p).",