load_reporter_async_service_impl.cc
Go to the documentation of this file.
1 /*
2  *
3  * Copyright 2018 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
20 
22 
23 #include <inttypes.h>
24 
25 #include <google/protobuf/duration.pb.h>
27 
28 #include "absl/memory/memory.h"
29 
31 #include <grpc/support/time.h>
32 #include <grpcpp/support/status.h>
33 
35 
36 namespace grpc {
37 namespace load_reporter {
38 
40  GPR_ASSERT(handler_function_ != nullptr);
41  GPR_ASSERT(handler_ != nullptr);
43 }
44 
46  std::unique_ptr<ServerCompletionQueue> cq)
47  : cq_(std::move(cq)) {
48  thread_ =
49  absl::make_unique<grpc_core::Thread>("server_load_reporting", Work, this);
50  std::unique_ptr<CpuStatsProvider> cpu_stats_provider = nullptr;
51 #if defined(GPR_LINUX) || defined(GPR_WINDOWS) || defined(GPR_APPLE)
52  cpu_stats_provider = absl::make_unique<CpuStatsProviderDefaultImpl>();
53 #endif
54  load_reporter_ = absl::make_unique<LoadReporter>(
56  std::unique_ptr<CensusViewProvider>(new CensusViewProviderDefaultImpl()),
57  std::move(cpu_stats_provider));
58 }
59 
61  // We will reach here after the server starts shutting down.
62  shutdown_ = true;
63  {
65  cq_->Shutdown();
66  }
67  if (next_fetch_and_sample_alarm_ != nullptr) {
69  }
70  thread_->Join();
71 }
72 
74  auto next_fetch_and_sample_time =
77  GPR_TIMESPAN));
78  {
80  if (shutdown_) return;
81  // TODO(juanlishen): Improve the Alarm implementation to reuse a single
82  // instance for multiple events.
83  next_fetch_and_sample_alarm_ = absl::make_unique<Alarm>();
84  next_fetch_and_sample_alarm_->Set(cq_.get(), next_fetch_and_sample_time,
85  this);
86  }
87  gpr_log(GPR_DEBUG, "[LRS %p] Next fetch-and-sample scheduled.", this);
88 }
89 
91  if (!ok) {
92  gpr_log(GPR_INFO, "[LRS %p] Fetch-and-sample is stopped.", this);
93  return;
94  }
95  gpr_log(GPR_DEBUG, "[LRS %p] Starting a fetch-and-sample...", this);
96  load_reporter_->FetchAndSample();
98 }
99 
102  static_cast<LoadReporterAsyncServiceImpl*>(arg);
103  service->FetchAndSample(true /* ok */);
104  // TODO(juanlishen): This is a workaround to wait for the cq to be ready. Need
105  // to figure out why cq is not ready after service starts.
109  service->load_reporter_.get());
110  void* tag;
111  bool ok;
112  while (true) {
113  if (!service->cq_->Next(&tag, &ok)) {
114  // The completion queue is shutting down.
115  GPR_ASSERT(service->shutdown_);
116  break;
117  }
118  if (tag == service) {
119  service->FetchAndSample(ok);
120  } else {
121  auto* next_step = static_cast<CallableTag*>(tag);
122  next_step->Run(ok);
123  }
124  }
125 }
126 
128 
131  LoadReporter* load_reporter) {
132  std::shared_ptr<ReportLoadHandler> handler =
133  std::make_shared<ReportLoadHandler>(cq, service, load_reporter);
134  ReportLoadHandler* p = handler.get();
135  {
136  grpc_core::MutexLock lock(&service->cq_shutdown_mu_);
137  if (service->shutdown_) return;
138  p->on_done_notified_ =
140  std::placeholders::_1, std::placeholders::_2),
141  handler);
142  p->next_inbound_ =
144  std::placeholders::_1, std::placeholders::_2),
145  std::move(handler));
146  p->ctx_.AsyncNotifyWhenDone(&p->on_done_notified_);
147  service->RequestReportLoad(&p->ctx_, &p->stream_, cq, cq,
148  &p->next_inbound_);
149  }
150 }
151 
154  LoadReporter* load_reporter)
155  : cq_(cq),
156  service_(service),
157  load_reporter_(load_reporter),
158  stream_(&ctx_),
159  call_status_(WAITING_FOR_DELIVERY) {}
160 
162  std::shared_ptr<ReportLoadHandler> self, bool ok) {
163  if (ok) {
164  call_status_ = DELIVERED;
165  } else {
166  // AsyncNotifyWhenDone() needs to be called before the call starts, but the
167  // tag will not pop out if the call never starts (
168  // https://github.com/grpc/grpc/issues/10136). So we need to manually
169  // release the ownership of the handler in this case.
170  GPR_ASSERT(on_done_notified_.ReleaseHandler() != nullptr);
171  }
172  if (!ok || shutdown_) {
173  // The value of ok being false means that the server is shutting down.
174  Shutdown(std::move(self), "OnRequestDelivered");
175  return;
176  }
177  // Spawn a new handler instance to serve the next new client. Every handler
178  // instance will deallocate itself when it's done.
179  CreateAndStart(cq_, service_, load_reporter_);
180  {
181  grpc_core::ReleasableMutexLock lock(&service_->cq_shutdown_mu_);
182  if (service_->shutdown_) {
183  lock.Release();
184  Shutdown(std::move(self), "OnRequestDelivered");
185  return;
186  }
187  next_inbound_ =
189  std::placeholders::_1, std::placeholders::_2),
190  std::move(self));
191  stream_.Read(&request_, &next_inbound_);
192  }
193  // LB ID is unique for each load reporting stream.
194  lb_id_ = load_reporter_->GenerateLbId();
196  "[LRS %p] Call request delivered (lb_id_: %s, handler: %p). "
197  "Start reading the initial request...",
198  service_, lb_id_.c_str(), this);
199 }
200 
202  std::shared_ptr<ReportLoadHandler> self, bool ok) {
203  if (!ok || shutdown_) {
204  if (!ok && call_status_ < INITIAL_REQUEST_RECEIVED) {
205  // The client may have half-closed the stream or the stream is broken.
207  "[LRS %p] Failed reading the initial request from the stream "
208  "(lb_id_: %s, handler: %p, done_notified: %d, is_cancelled: %d).",
209  service_, lb_id_.c_str(), this, static_cast<int>(done_notified_),
210  static_cast<int>(is_cancelled_));
211  }
212  Shutdown(std::move(self), "OnReadDone");
213  return;
214  }
215  // We only receive one request, which is the initial request.
216  if (call_status_ < INITIAL_REQUEST_RECEIVED) {
217  if (!request_.has_initial_request()) {
218  Shutdown(std::move(self), "OnReadDone+initial_request_not_found");
219  } else {
220  call_status_ = INITIAL_REQUEST_RECEIVED;
221  const auto& initial_request = request_.initial_request();
222  load_balanced_hostname_ = initial_request.load_balanced_hostname();
223  load_key_ = initial_request.load_key();
224  load_reporter_->ReportStreamCreated(load_balanced_hostname_, lb_id_,
225  load_key_);
226  const auto& load_report_interval = initial_request.load_report_interval();
227  load_report_interval_ms_ =
228  static_cast<unsigned long>(load_report_interval.seconds() * 1000 +
229  load_report_interval.nanos() / 1000);
231  "[LRS %p] Initial request received. Start load reporting (load "
232  "balanced host: %s, interval: %" PRIu64
233  " ms, lb_id_: %s, handler: %p)...",
234  service_, load_balanced_hostname_.c_str(),
235  load_report_interval_ms_, lb_id_.c_str(), this);
236  SendReport(self, true /* ok */);
237  // Expect this read to fail.
238  {
239  grpc_core::ReleasableMutexLock lock(&service_->cq_shutdown_mu_);
240  if (service_->shutdown_) {
241  lock.Release();
242  Shutdown(std::move(self), "OnReadDone");
243  return;
244  }
245  next_inbound_ =
247  std::placeholders::_1, std::placeholders::_2),
248  std::move(self));
249  stream_.Read(&request_, &next_inbound_);
250  }
251  }
252  } else {
253  // Another request received! This violates the spec.
255  "[LRS %p] Another request received (lb_id_: %s, handler: %p).",
256  service_, lb_id_.c_str(), this);
257  Shutdown(std::move(self), "OnReadDone+second_request");
258  }
259 }
260 
262  std::shared_ptr<ReportLoadHandler> self, bool ok) {
263  if (!ok || shutdown_) {
264  Shutdown(std::move(self), "ScheduleNextReport");
265  return;
266  }
267  auto next_report_time = gpr_time_add(
269  gpr_time_from_millis(load_report_interval_ms_, GPR_TIMESPAN));
270  {
271  grpc_core::ReleasableMutexLock lock(&service_->cq_shutdown_mu_);
272  if (service_->shutdown_) {
273  lock.Release();
274  Shutdown(std::move(self), "ScheduleNextReport");
275  return;
276  }
277  next_outbound_ =
279  std::placeholders::_1, std::placeholders::_2),
280  std::move(self));
281  // TODO(juanlishen): Improve the Alarm implementation to reuse a single
282  // instance for multiple events.
283  next_report_alarm_ = absl::make_unique<Alarm>();
284  next_report_alarm_->Set(cq_, next_report_time, &next_outbound_);
285  }
287  "[LRS %p] Next load report scheduled (lb_id_: %s, handler: %p).",
288  service_, lb_id_.c_str(), this);
289 }
290 
292  std::shared_ptr<ReportLoadHandler> self, bool ok) {
293  if (!ok || shutdown_) {
294  Shutdown(std::move(self), "SendReport");
295  return;
296  }
297  grpc::lb::v1::LoadReportResponse response;
298  auto loads = load_reporter_->GenerateLoads(load_balanced_hostname_, lb_id_);
299  response.mutable_load()->Swap(&loads);
300  auto feedback = load_reporter_->GenerateLoadBalancingFeedback();
301  response.mutable_load_balancing_feedback()->Swap(&feedback);
302  if (call_status_ < INITIAL_RESPONSE_SENT) {
303  auto initial_response = response.mutable_initial_response();
304  initial_response->set_load_balancer_id(lb_id_);
305  initial_response->set_implementation_id(
306  grpc::lb::v1::InitialLoadReportResponse::CPP);
307  initial_response->set_server_version(kVersion);
308  call_status_ = INITIAL_RESPONSE_SENT;
309  }
310  {
311  grpc_core::ReleasableMutexLock lock(&service_->cq_shutdown_mu_);
312  if (service_->shutdown_) {
313  lock.Release();
314  Shutdown(std::move(self), "SendReport");
315  return;
316  }
317  next_outbound_ =
319  std::placeholders::_1, std::placeholders::_2),
320  std::move(self));
321  stream_.Write(response, &next_outbound_);
323  "[LRS %p] Sending load report (lb_id_: %s, handler: %p, loads "
324  "count: %d)...",
325  service_, lb_id_.c_str(), this, response.load().size());
326  }
327 }
328 
330  std::shared_ptr<ReportLoadHandler> self, bool ok) {
331  GPR_ASSERT(ok);
332  done_notified_ = true;
333  if (ctx_.IsCancelled()) {
334  is_cancelled_ = true;
335  }
337  "[LRS %p] Load reporting call is notified done (handler: %p, "
338  "is_cancelled: %d).",
339  service_, this, static_cast<int>(is_cancelled_));
340  Shutdown(std::move(self), "OnDoneNotified");
341 }
342 
344  std::shared_ptr<ReportLoadHandler> self, const char* reason) {
345  if (!shutdown_) {
347  "[LRS %p] Shutting down the handler (lb_id_: %s, handler: %p, "
348  "reason: %s).",
349  service_, lb_id_.c_str(), this, reason);
350  shutdown_ = true;
351  if (call_status_ >= INITIAL_REQUEST_RECEIVED) {
352  load_reporter_->ReportStreamClosed(load_balanced_hostname_, lb_id_);
353  next_report_alarm_->Cancel();
354  }
355  }
356  // OnRequestDelivered() may be called after OnDoneNotified(), so we need to
357  // try to Finish() every time we are in Shutdown().
358  if (call_status_ >= DELIVERED && call_status_ < FINISH_CALLED) {
359  grpc_core::MutexLock lock(&service_->cq_shutdown_mu_);
360  if (!service_->shutdown_) {
361  on_finish_done_ =
363  std::placeholders::_1, std::placeholders::_2),
364  std::move(self));
365  // TODO(juanlishen): Maybe add a message proto for the client to
366  // explicitly cancel the stream so that we can return OK status in such
367  // cases.
368  stream_.Finish(Status::CANCELLED, &on_finish_done_);
369  call_status_ = FINISH_CALLED;
370  }
371  }
372 }
373 
375  // NOLINTNEXTLINE(performance-unnecessary-value-param)
376  std::shared_ptr<ReportLoadHandler> /*self*/, bool ok) {
377  if (ok) {
379  "[LRS %p] Load reporting finished (lb_id_: %s, handler: %p).",
380  service_, lb_id_.c_str(), this);
381  }
382 }
383 
384 } // namespace load_reporter
385 } // namespace grpc
grpc::load_reporter::CensusViewProviderDefaultImpl
Definition: load_reporter.h:96
grpc::load_reporter::LoadReporterAsyncServiceImpl::cq_shutdown_mu_
grpc_core::Mutex cq_shutdown_mu_
Definition: load_reporter_async_service_impl.h:197
GPR_TIMESPAN
@ GPR_TIMESPAN
Definition: gpr_types.h:45
GPR_INFO
#define GPR_INFO
Definition: include/grpc/impl/codegen/log.h:56
grpc::ServerCompletionQueue
Definition: include/grpcpp/impl/codegen/completion_queue.h:436
grpc::load_reporter::LoadReporterAsyncServiceImpl::ReportLoadHandler::OnDoneNotified
void OnDoneNotified(std::shared_ptr< ReportLoadHandler > self, bool ok)
Definition: load_reporter_async_service_impl.cc:329
grpc::Status::CANCELLED
static const Status & CANCELLED
A CANCELLED pre-defined instance.
Definition: include/grpcpp/impl/codegen/status.h:115
grpc
Definition: grpcpp/alarm.h:33
stream_
std::unique_ptr< grpc::ClientReaderInterface< OrcaLoadReport > > stream_
Definition: orca_service_end2end_test.cc:89
grpc::load_reporter::LoadReporterAsyncServiceImpl::ReportLoadHandler::ReportLoadHandler
ReportLoadHandler(ServerCompletionQueue *cq, LoadReporterAsyncServiceImpl *service, LoadReporter *load_reporter)
Definition: load_reporter_async_service_impl.cc:152
grpc_core::MutexLock
Definition: src/core/lib/gprpp/sync.h:88
grpc::load_reporter::LoadReporterAsyncServiceImpl::ScheduleNextFetchAndSample
void ScheduleNextFetchAndSample()
Definition: load_reporter_async_service_impl.cc:73
grpc_core::ReleasableMutexLock::Release
void Release() ABSL_UNLOCK_FUNCTION()
Definition: src/core/lib/gprpp/sync.h:115
time.h
ctx_
ClientContext ctx_
Definition: client_interceptors_end2end_test.cc:289
grpc::load_reporter::LoadReporterAsyncServiceImpl::CallableTag::Run
void Run(bool ok)
Definition: load_reporter_async_service_impl.cc:39
grpc::load_reporter::kFeedbackSampleWindowSeconds
constexpr uint32_t kFeedbackSampleWindowSeconds
Definition: src/cpp/server/load_reporter/constants.h:37
repeated_ptr_field.h
grpc::load_reporter::LoadReporterAsyncServiceImpl::~LoadReporterAsyncServiceImpl
~LoadReporterAsyncServiceImpl() override
Definition: load_reporter_async_service_impl.cc:60
absl::move
constexpr absl::remove_reference_t< T > && move(T &&t) noexcept
Definition: abseil-cpp/absl/utility/utility.h:221
GPR_ASSERT
#define GPR_ASSERT(x)
Definition: include/grpc/impl/codegen/log.h:94
tag
static void * tag(intptr_t t)
Definition: bad_client.cc:318
grpc::load_reporter::LoadReporterAsyncServiceImpl::CallableTag::handler_function_
HandlerFunction handler_function_
Definition: load_reporter_async_service_impl.h:100
grpc_core::ReleasableMutexLock
Definition: src/core/lib/gprpp/sync.h:102
grpc::load_reporter::kFetchAndSampleIntervalSeconds
constexpr uint32_t kFetchAndSampleIntervalSeconds
Definition: src/cpp/server/load_reporter/constants.h:38
gpr_log
GPRAPI void gpr_log(const char *file, int line, gpr_log_severity severity, const char *format,...) GPR_PRINT_FORMAT_CHECK(4
grpc::load_reporter::LoadReporterAsyncServiceImpl::Work
static void Work(void *arg)
Definition: load_reporter_async_service_impl.cc:100
grpc::load_reporter::LoadReporterAsyncServiceImpl::ReportLoadHandler::Shutdown
void Shutdown(std::shared_ptr< ReportLoadHandler > self, const char *reason)
Definition: load_reporter_async_service_impl.cc:343
grpc::load_reporter::LoadReporterAsyncServiceImpl::ReportLoadHandler::OnReadDone
void OnReadDone(std::shared_ptr< ReportLoadHandler > self, bool ok)
Definition: load_reporter_async_service_impl.cc:201
grpc::load_reporter::LoadReporterAsyncServiceImpl
Definition: load_reporter_async_service_impl.h:51
gpr_sleep_until
GPRAPI void gpr_sleep_until(gpr_timespec until)
arg
Definition: cmdline.cc:40
grpc::load_reporter::LoadReporterAsyncServiceImpl::ReportLoadHandler::SendReport
void SendReport(std::shared_ptr< ReportLoadHandler > self, bool ok)
Definition: load_reporter_async_service_impl.cc:291
GPR_CLOCK_MONOTONIC
@ GPR_CLOCK_MONOTONIC
Definition: gpr_types.h:36
benchmark::Shutdown
void Shutdown()
Definition: benchmark/src/benchmark.cc:607
GPR_ERROR
#define GPR_ERROR
Definition: include/grpc/impl/codegen/log.h:57
grpc::load_reporter::kVersion
constexpr uint32_t kVersion
Definition: src/cpp/server/load_reporter/constants.h:32
gpr_now
GPRAPI gpr_timespec gpr_now(gpr_clock_type clock)
gpr_types.h
request_
EchoRequest request_
Definition: client_callback_end2end_test.cc:724
grpc::load_reporter::LoadReporterAsyncServiceImpl::next_fetch_and_sample_alarm_
std::unique_ptr< Alarm > next_fetch_and_sample_alarm_
Definition: load_reporter_async_service_impl.h:201
load_reporter_async_service_impl.h
grpc::load_reporter::LoadReporterAsyncServiceImpl::FetchAndSample
void FetchAndSample(bool ok)
Definition: load_reporter_async_service_impl.cc:90
constants.h
grpc::load_reporter::LoadReporterAsyncServiceImpl::ReportLoadHandler
Definition: load_reporter_async_service_impl.h:107
grpc::load_reporter::LoadReporterAsyncServiceImpl::ReportLoadHandler::OnFinishDone
void OnFinishDone(std::shared_ptr< ReportLoadHandler > self, bool ok)
Definition: load_reporter_async_service_impl.cc:374
gpr_time_add
GPRAPI gpr_timespec gpr_time_add(gpr_timespec a, gpr_timespec b)
Definition: src/core/lib/gpr/time.cc:135
grpc::load_reporter::LoadReporterAsyncServiceImpl::ReportLoadHandler::ScheduleNextReport
void ScheduleNextReport(std::shared_ptr< ReportLoadHandler > self, bool ok)
Definition: load_reporter_async_service_impl.cc:261
grpc::load_reporter::LoadReporterAsyncServiceImpl::StartThread
void StartThread()
Definition: load_reporter_async_service_impl.cc:127
client.handler
handler
Definition: examples/python/multiprocessing/client.py:87
grpc::load_reporter::LoadReporterAsyncServiceImpl::ReportLoadHandler::CreateAndStart
static void CreateAndStart(ServerCompletionQueue *cq, LoadReporterAsyncServiceImpl *service, LoadReporter *load_reporter)
Definition: load_reporter_async_service_impl.cc:129
grpc::load_reporter::LoadReporterAsyncServiceImpl::CallableTag::handler_
std::shared_ptr< ReportLoadHandler > handler_
Definition: load_reporter_async_service_impl.h:101
asyncio_get_stats.response
response
Definition: asyncio_get_stats.py:28
std
Definition: grpcpp/impl/codegen/async_unary_call.h:407
grpc::load_reporter::LoadReporterAsyncServiceImpl::LoadReporterAsyncServiceImpl
LoadReporterAsyncServiceImpl(std::unique_ptr< ServerCompletionQueue > cq)
Definition: load_reporter_async_service_impl.cc:45
ok
bool ok
Definition: async_end2end_test.cc:197
arg
struct arg arg
grpc::load_reporter::LoadReporter
Definition: load_reporter.h:126
gpr_time_from_millis
GPRAPI gpr_timespec gpr_time_from_millis(int64_t ms, gpr_clock_type clock_type)
Definition: src/core/lib/gpr/time.cc:119
grpc::load_reporter::LoadReporterAsyncServiceImpl::thread_
std::unique_ptr< grpc_core::Thread > thread_
Definition: load_reporter_async_service_impl.h:199
GPR_DEBUG
#define GPR_DEBUG
Definition: include/grpc/impl/codegen/log.h:55
grpc::load_reporter::LoadReporterAsyncServiceImpl::load_reporter_
std::unique_ptr< LoadReporter > load_reporter_
Definition: load_reporter_async_service_impl.h:200
service
__attribute__((deprecated("Please use GRPCProtoMethod."))) @interface ProtoMethod NSString * service
Definition: ProtoMethod.h:25
grpc::load_reporter::LoadReporterAsyncServiceImpl::CallableTag
Definition: load_reporter_async_service_impl.h:76
grpc::load_reporter::LoadReporterAsyncServiceImpl::cq_
std::unique_ptr< ServerCompletionQueue > cq_
Definition: load_reporter_async_service_impl.h:194
grpc::load_reporter::LoadReporterAsyncServiceImpl::ReportLoadHandler::OnRequestDelivered
void OnRequestDelivered(std::shared_ptr< ReportLoadHandler > self, bool ok)
Definition: load_reporter_async_service_impl.cc:161
grpc::load_reporter::LoadReporterAsyncServiceImpl::shutdown_
std::atomic_bool shutdown_
Definition: load_reporter_async_service_impl.h:198
cq
static grpc_completion_queue * cq
Definition: test/core/fling/client.cc:37
service_
std::unique_ptr< grpc::testing::TestServiceImpl > service_
Definition: end2end_binder_transport_test.cc:71
status.h
gpr_time_from_seconds
GPRAPI gpr_timespec gpr_time_from_seconds(int64_t s, gpr_clock_type clock_type)
Definition: src/core/lib/gpr/time.cc:123
port_platform.h


grpc
Author(s):
autogenerated on Thu Mar 13 2025 03:00:29