load_reporter_async_service_impl.h
Go to the documentation of this file.
1 /*
2  *
3  * Copyright 2018 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
19 #ifndef GRPC_SRC_CPP_SERVER_LOAD_REPORTER_ASYNC_SERVICE_IMPL_H
20 #define GRPC_SRC_CPP_SERVER_LOAD_REPORTER_ASYNC_SERVICE_IMPL_H
21 
23 
24 #include <stdint.h>
25 
26 #include <atomic>
27 #include <functional>
28 #include <memory>
29 #include <string>
30 #include <utility>
31 
32 #include <grpc/support/log.h>
33 #include <grpcpp/alarm.h>
34 #include <grpcpp/grpcpp.h>
36 #include <grpcpp/support/config.h>
37 
39 #include "src/core/lib/gprpp/thd.h"
41 #include "src/proto/grpc/lb/v1/load_reporter.grpc.pb.h"
42 #include "src/proto/grpc/lb/v1/load_reporter.pb.h"
43 
44 namespace grpc {
45 namespace load_reporter {
46 
47 // Async load reporting service. It's mainly responsible for controlling the
48 // procedure of incoming requests. The real business logic is handed off to the
49 // LoadReporter. There should be at most one instance of this service on a
50 // server to avoid spreading the load data into multiple places.
52  : public grpc::lb::v1::LoadReporter::AsyncService {
53  public:
55  std::unique_ptr<ServerCompletionQueue> cq);
57 
58  // Starts the working thread.
59  void StartThread();
60 
61  // Not copyable nor movable.
64  delete;
65 
66  private:
67  class ReportLoadHandler;
68 
69  // A tag that can be called with a bool argument. It's tailored for
70  // ReportLoadHandler's use. Before being used, it should be constructed with a
71  // method of ReportLoadHandler and a shared pointer to the handler. The
72  // shared pointer will be moved to the invoked function and the function can
73  // only be invoked once. That makes ref counting of the handler easier,
74  // because the shared pointer is not bound to the function and can be gone
75  // once the invoked function returns (if not used any more).
76  class CallableTag {
77  public:
78  using HandlerFunction =
79  std::function<void(std::shared_ptr<ReportLoadHandler>, bool)>;
80 
82 
84  std::shared_ptr<ReportLoadHandler> handler)
86  GPR_ASSERT(handler_function_ != nullptr);
87  GPR_ASSERT(handler_ != nullptr);
88  }
89 
90  // Runs the tag. This should be called only once. The handler is no longer
91  // owned by this tag after this method is invoked.
92  void Run(bool ok);
93 
94  // Releases and returns the shared pointer to the handler.
95  std::shared_ptr<ReportLoadHandler> ReleaseHandler() {
96  return std::move(handler_);
97  }
98 
99  private:
101  std::shared_ptr<ReportLoadHandler> handler_;
102  };
103 
104  // Each handler takes care of one load reporting stream. It contains
105  // per-stream data and it will access the members of the parent class (i.e.,
106  // LoadReporterAsyncServiceImpl) for service-wide data (e.g., the load data).
108  public:
109  // Instantiates a ReportLoadHandler and requests the next load reporting
110  // call. The handler object will manage its own lifetime, so no action is
111  // needed from the caller any more regarding that object.
114  LoadReporter* load_reporter);
115 
116  // This ctor is public because we want to use std::make_shared<> in
117  // CreateAndStart(). This ctor shouldn't be used elsewhere.
120  LoadReporter* load_reporter);
121 
122  private:
123  // After the handler has a call request delivered, it starts reading the
124  // initial request. Also, a new handler is spawned so that we can keep
125  // servicing future calls.
126  void OnRequestDelivered(std::shared_ptr<ReportLoadHandler> self, bool ok);
127 
128  // The first Read() is expected to succeed, after which the handler starts
129  // sending load reports back to the balancer. The second Read() is
130  // expected to fail, which happens when the balancer half-closes the
131  // stream to signal that it's no longer interested in the load reports. For
132  // the latter case, the handler will then close the stream.
133  void OnReadDone(std::shared_ptr<ReportLoadHandler> self, bool ok);
134 
135  // The report sending operations are sequential as: send report -> send
136  // done, schedule the next send -> waiting for the alarm to fire -> alarm
137  // fires, send report -> ...
138  void SendReport(std::shared_ptr<ReportLoadHandler> self, bool ok);
139  void ScheduleNextReport(std::shared_ptr<ReportLoadHandler> self, bool ok);
140 
141  // Called when Finish() is done.
142  void OnFinishDone(std::shared_ptr<ReportLoadHandler> self, bool ok);
143 
144  // Called when AsyncNotifyWhenDone() notifies us.
145  void OnDoneNotified(std::shared_ptr<ReportLoadHandler> self, bool ok);
146 
147  void Shutdown(std::shared_ptr<ReportLoadHandler> self, const char* reason);
148 
149  // The key fields of the stream.
154 
155  // The data for RPC communication with the load reportee.
157  grpc::lb::v1::LoadReportRequest request_;
158 
159  // The members passed down from LoadReporterAsyncServiceImpl.
163  ServerAsyncReaderWriter<grpc::lb::v1::LoadReportResponse,
164  grpc::lb::v1::LoadReportRequest>
166 
167  // The status of the RPC progress.
168  enum CallStatus {
174  } call_status_;
175  bool shutdown_{false};
176  bool done_notified_{false};
177  bool is_cancelled_{false};
182  std::unique_ptr<Alarm> next_report_alarm_;
183  };
184 
185  // Handles the incoming requests and drives the completion queue in a loop.
186  static void Work(void* arg);
187 
188  // Schedules the next data fetching from Census and LB feedback sampling.
190 
191  // Fetches data from Census and samples LB feedback.
192  void FetchAndSample(bool ok);
193 
194  std::unique_ptr<ServerCompletionQueue> cq_;
195  // To synchronize the operations related to shutdown state of cq_, so that we
196  // don't enqueue new tags into cq_ after it is already shut down.
198  std::atomic_bool shutdown_{false};
199  std::unique_ptr<grpc_core::Thread> thread_;
200  std::unique_ptr<LoadReporter> load_reporter_;
201  std::unique_ptr<Alarm> next_fetch_and_sample_alarm_;
202 };
203 
204 } // namespace load_reporter
205 } // namespace grpc
206 
207 #endif // GRPC_SRC_CPP_SERVER_LOAD_REPORTER_ASYNC_SERVICE_IMPL_H
grpc::load_reporter::LoadReporterAsyncServiceImpl::cq_shutdown_mu_
grpc_core::Mutex cq_shutdown_mu_
Definition: load_reporter_async_service_impl.h:197
grpc::load_reporter::LoadReporterAsyncServiceImpl::ReportLoadHandler::next_report_alarm_
std::unique_ptr< Alarm > next_report_alarm_
Definition: load_reporter_async_service_impl.h:182
grpc::ServerCompletionQueue
Definition: include/grpcpp/impl/codegen/completion_queue.h:436
grpc::ServerContext
Definition: grpcpp/impl/codegen/server_context.h:566
log.h
grpc::load_reporter::LoadReporterAsyncServiceImpl::ReportLoadHandler::request_
grpc::lb::v1::LoadReportRequest request_
Definition: load_reporter_async_service_impl.h:157
grpc::load_reporter::LoadReporterAsyncServiceImpl::ReportLoadHandler::OnDoneNotified
void OnDoneNotified(std::shared_ptr< ReportLoadHandler > self, bool ok)
Definition: load_reporter_async_service_impl.cc:329
grpc::load_reporter::LoadReporterAsyncServiceImpl::ReportLoadHandler::on_done_notified_
CallableTag on_done_notified_
Definition: load_reporter_async_service_impl.h:178
grpc
Definition: grpcpp/alarm.h:33
grpc::load_reporter::LoadReporterAsyncServiceImpl::ReportLoadHandler::DELIVERED
@ DELIVERED
Definition: load_reporter_async_service_impl.h:170
grpc::load_reporter::LoadReporterAsyncServiceImpl::ReportLoadHandler::ReportLoadHandler
ReportLoadHandler(ServerCompletionQueue *cq, LoadReporterAsyncServiceImpl *service, LoadReporter *load_reporter)
Definition: load_reporter_async_service_impl.cc:152
grpc::load_reporter::LoadReporterAsyncServiceImpl::ScheduleNextFetchAndSample
void ScheduleNextFetchAndSample()
Definition: load_reporter_async_service_impl.cc:73
grpc::load_reporter::LoadReporterAsyncServiceImpl::ReportLoadHandler::call_status_
enum grpc::load_reporter::LoadReporterAsyncServiceImpl::ReportLoadHandler::CallStatus call_status_
grpc::load_reporter::LoadReporterAsyncServiceImpl::CallableTag::HandlerFunction
std::function< void(std::shared_ptr< ReportLoadHandler >, bool)> HandlerFunction
Definition: load_reporter_async_service_impl.h:79
testing::internal::string
::std::string string
Definition: bloaty/third_party/protobuf/third_party/googletest/googletest/include/gtest/internal/gtest-port.h:881
grpc::load_reporter::LoadReporterAsyncServiceImpl::ReportLoadHandler::load_reporter_
LoadReporter * load_reporter_
Definition: load_reporter_async_service_impl.h:162
grpc::ServerAsyncReaderWriter
Definition: grpcpp/impl/codegen/async_stream.h:1010
grpc::load_reporter::LoadReporterAsyncServiceImpl::ReportLoadHandler::load_report_interval_ms_
uint64_t load_report_interval_ms_
Definition: load_reporter_async_service_impl.h:153
grpc::load_reporter::LoadReporterAsyncServiceImpl::CallableTag::Run
void Run(bool ok)
Definition: load_reporter_async_service_impl.cc:39
alarm.h
grpc::load_reporter::LoadReporterAsyncServiceImpl::ReportLoadHandler::CallStatus
CallStatus
Definition: load_reporter_async_service_impl.h:168
grpc::load_reporter::LoadReporterAsyncServiceImpl::ReportLoadHandler::INITIAL_REQUEST_RECEIVED
@ INITIAL_REQUEST_RECEIVED
Definition: load_reporter_async_service_impl.h:171
grpc::load_reporter::LoadReporterAsyncServiceImpl::ReportLoadHandler::stream_
ServerAsyncReaderWriter< grpc::lb::v1::LoadReportResponse, grpc::lb::v1::LoadReportRequest > stream_
Definition: load_reporter_async_service_impl.h:165
grpc::load_reporter::LoadReporterAsyncServiceImpl::~LoadReporterAsyncServiceImpl
~LoadReporterAsyncServiceImpl() override
Definition: load_reporter_async_service_impl.cc:60
load_reporter.h
grpc::load_reporter::LoadReporterAsyncServiceImpl::ReportLoadHandler::done_notified_
bool done_notified_
Definition: load_reporter_async_service_impl.h:176
grpc::load_reporter::LoadReporterAsyncServiceImpl::ReportLoadHandler::on_finish_done_
CallableTag on_finish_done_
Definition: load_reporter_async_service_impl.h:179
grpc::load_reporter::LoadReporterAsyncServiceImpl::ReportLoadHandler::FINISH_CALLED
@ FINISH_CALLED
Definition: load_reporter_async_service_impl.h:173
grpc::load_reporter::LoadReporterAsyncServiceImpl::ReportLoadHandler::lb_id_
std::string lb_id_
Definition: load_reporter_async_service_impl.h:150
absl::move
constexpr absl::remove_reference_t< T > && move(T &&t) noexcept
Definition: abseil-cpp/absl/utility/utility.h:221
GPR_ASSERT
#define GPR_ASSERT(x)
Definition: include/grpc/impl/codegen/log.h:94
grpc::load_reporter::LoadReporterAsyncServiceImpl::ReportLoadHandler::next_inbound_
CallableTag next_inbound_
Definition: load_reporter_async_service_impl.h:180
grpc::load_reporter::LoadReporterAsyncServiceImpl::CallableTag::handler_function_
HandlerFunction handler_function_
Definition: load_reporter_async_service_impl.h:100
grpc::load_reporter::LoadReporterAsyncServiceImpl::Work
static void Work(void *arg)
Definition: load_reporter_async_service_impl.cc:100
grpc::load_reporter::LoadReporterAsyncServiceImpl::ReportLoadHandler::Shutdown
void Shutdown(std::shared_ptr< ReportLoadHandler > self, const char *reason)
Definition: load_reporter_async_service_impl.cc:343
grpc::load_reporter::LoadReporterAsyncServiceImpl::ReportLoadHandler::OnReadDone
void OnReadDone(std::shared_ptr< ReportLoadHandler > self, bool ok)
Definition: load_reporter_async_service_impl.cc:201
grpc::load_reporter::LoadReporterAsyncServiceImpl
Definition: load_reporter_async_service_impl.h:51
uint64_t
unsigned __int64 uint64_t
Definition: stdint-msvc2008.h:90
grpc::load_reporter::LoadReporterAsyncServiceImpl::ReportLoadHandler::WAITING_FOR_DELIVERY
@ WAITING_FOR_DELIVERY
Definition: load_reporter_async_service_impl.h:169
grpcpp.h
arg
Definition: cmdline.cc:40
grpc::load_reporter::LoadReporterAsyncServiceImpl::ReportLoadHandler::SendReport
void SendReport(std::shared_ptr< ReportLoadHandler > self, bool ok)
Definition: load_reporter_async_service_impl.cc:291
grpc::load_reporter::LoadReporterAsyncServiceImpl::CallableTag::ReleaseHandler
std::shared_ptr< ReportLoadHandler > ReleaseHandler()
Definition: load_reporter_async_service_impl.h:95
grpc::load_reporter::LoadReporterAsyncServiceImpl::ReportLoadHandler::cq_
ServerCompletionQueue * cq_
Definition: load_reporter_async_service_impl.h:160
config.h
grpc::load_reporter::LoadReporterAsyncServiceImpl::ReportLoadHandler::is_cancelled_
bool is_cancelled_
Definition: load_reporter_async_service_impl.h:177
stdint.h
grpc::load_reporter::LoadReporterAsyncServiceImpl::CallableTag::CallableTag
CallableTag()
Definition: load_reporter_async_service_impl.h:81
grpc::load_reporter::LoadReporterAsyncServiceImpl::ReportLoadHandler::load_key_
std::string load_key_
Definition: load_reporter_async_service_impl.h:152
grpc::load_reporter::LoadReporterAsyncServiceImpl::next_fetch_and_sample_alarm_
std::unique_ptr< Alarm > next_fetch_and_sample_alarm_
Definition: load_reporter_async_service_impl.h:201
grpc::load_reporter::LoadReporterAsyncServiceImpl::CallableTag::CallableTag
CallableTag(HandlerFunction func, std::shared_ptr< ReportLoadHandler > handler)
Definition: load_reporter_async_service_impl.h:83
grpc::load_reporter::LoadReporterAsyncServiceImpl::ReportLoadHandler::INITIAL_RESPONSE_SENT
@ INITIAL_RESPONSE_SENT
Definition: load_reporter_async_service_impl.h:172
grpc::load_reporter::LoadReporterAsyncServiceImpl::FetchAndSample
void FetchAndSample(bool ok)
Definition: load_reporter_async_service_impl.cc:90
grpc_core::Mutex
Definition: src/core/lib/gprpp/sync.h:61
grpc::load_reporter::LoadReporterAsyncServiceImpl::ReportLoadHandler::load_balanced_hostname_
std::string load_balanced_hostname_
Definition: load_reporter_async_service_impl.h:151
func
const EVP_CIPHER *(* func)(void)
Definition: cipher_extra.c:73
grpc::load_reporter::LoadReporterAsyncServiceImpl::ReportLoadHandler
Definition: load_reporter_async_service_impl.h:107
grpc::load_reporter::LoadReporterAsyncServiceImpl::ReportLoadHandler::OnFinishDone
void OnFinishDone(std::shared_ptr< ReportLoadHandler > self, bool ok)
Definition: load_reporter_async_service_impl.cc:374
grpc::load_reporter::LoadReporterAsyncServiceImpl::ReportLoadHandler::ScheduleNextReport
void ScheduleNextReport(std::shared_ptr< ReportLoadHandler > self, bool ok)
Definition: load_reporter_async_service_impl.cc:261
grpc::load_reporter::LoadReporterAsyncServiceImpl::StartThread
void StartThread()
Definition: load_reporter_async_service_impl.cc:127
client.handler
handler
Definition: examples/python/multiprocessing/client.py:87
grpc::load_reporter::LoadReporterAsyncServiceImpl::ReportLoadHandler::CreateAndStart
static void CreateAndStart(ServerCompletionQueue *cq, LoadReporterAsyncServiceImpl *service, LoadReporter *load_reporter)
Definition: load_reporter_async_service_impl.cc:129
grpc::load_reporter::LoadReporterAsyncServiceImpl::CallableTag::handler_
std::shared_ptr< ReportLoadHandler > handler_
Definition: load_reporter_async_service_impl.h:101
std
Definition: grpcpp/impl/codegen/async_unary_call.h:407
grpc::load_reporter::LoadReporterAsyncServiceImpl::LoadReporterAsyncServiceImpl
LoadReporterAsyncServiceImpl(std::unique_ptr< ServerCompletionQueue > cq)
Definition: load_reporter_async_service_impl.cc:45
thd.h
grpc::load_reporter::LoadReporterAsyncServiceImpl::ReportLoadHandler::shutdown_
bool shutdown_
Definition: load_reporter_async_service_impl.h:175
ok
bool ok
Definition: async_end2end_test.cc:197
grpc::load_reporter::LoadReporterAsyncServiceImpl::operator=
LoadReporterAsyncServiceImpl & operator=(const LoadReporterAsyncServiceImpl &)=delete
grpc::load_reporter::LoadReporter
Definition: load_reporter.h:126
grpc::load_reporter::LoadReporterAsyncServiceImpl::thread_
std::unique_ptr< grpc_core::Thread > thread_
Definition: load_reporter_async_service_impl.h:199
grpc::load_reporter::LoadReporterAsyncServiceImpl::ReportLoadHandler::next_outbound_
CallableTag next_outbound_
Definition: load_reporter_async_service_impl.h:181
grpc::load_reporter::LoadReporterAsyncServiceImpl::load_reporter_
std::unique_ptr< LoadReporter > load_reporter_
Definition: load_reporter_async_service_impl.h:200
service
__attribute__((deprecated("Please use GRPCProtoMethod."))) @interface ProtoMethod NSString * service
Definition: ProtoMethod.h:25
grpc::load_reporter::LoadReporterAsyncServiceImpl::ReportLoadHandler::service_
LoadReporterAsyncServiceImpl * service_
Definition: load_reporter_async_service_impl.h:161
function
std::function< bool(GrpcTool *, int, const char **, const CliCredentials &, GrpcToolOutputCallback)> function
Definition: grpc_tool.cc:250
grpc::load_reporter::LoadReporterAsyncServiceImpl::CallableTag
Definition: load_reporter_async_service_impl.h:76
async_stream.h
grpc::load_reporter::LoadReporterAsyncServiceImpl::cq_
std::unique_ptr< ServerCompletionQueue > cq_
Definition: load_reporter_async_service_impl.h:194
grpc::load_reporter::LoadReporterAsyncServiceImpl::ReportLoadHandler::OnRequestDelivered
void OnRequestDelivered(std::shared_ptr< ReportLoadHandler > self, bool ok)
Definition: load_reporter_async_service_impl.cc:161
grpc::load_reporter::LoadReporterAsyncServiceImpl::shutdown_
std::atomic_bool shutdown_
Definition: load_reporter_async_service_impl.h:198
cq
static grpc_completion_queue * cq
Definition: test/core/fling/client.cc:37
grpc::load_reporter::LoadReporterAsyncServiceImpl::ReportLoadHandler::ctx_
ServerContext ctx_
Definition: load_reporter_async_service_impl.h:156
sync.h
port_platform.h


grpc
Author(s):
autogenerated on Thu Mar 13 2025 03:00:29