orca_service.cc
Go to the documentation of this file.
1 //
2 // Copyright 2022 gRPC authors.
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 // http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 //
16 
17 #include <stddef.h>
18 
19 #include <map>
20 #include <string>
21 #include <utility>
22 
23 #include "absl/base/thread_annotations.h"
24 #include "absl/time/time.h"
25 #include "absl/types/optional.h"
27 #include "upb/upb.h"
28 #include "upb/upb.hpp"
31 
33 #include <grpc/support/log.h>
37 #include <grpcpp/impl/rpc_method.h>
39 #include <grpcpp/server_context.h>
41 #include <grpcpp/support/config.h>
43 #include <grpcpp/support/slice.h>
44 #include <grpcpp/support/status.h>
45 
52 
53 namespace grpc {
54 namespace experimental {
55 
56 using ::grpc_event_engine::experimental::EventEngine;
58 
59 //
60 // OrcaService::Reactor
61 //
62 
63 class OrcaService::Reactor : public ServerWriteReactor<ByteBuffer>,
64  public grpc_core::RefCounted<Reactor> {
65  public:
66  explicit Reactor(OrcaService* service, const ByteBuffer* request_buffer)
67  : RefCounted("OrcaService::Reactor"), service_(service) {
68  // Get slice from request.
69  Slice slice;
70  GPR_ASSERT(request_buffer->DumpToSingleSlice(&slice).ok());
71  // Parse request proto.
75  reinterpret_cast<const char*>(slice.begin()), slice.size(),
76  arena.ptr());
77  if (request == nullptr) {
78  Finish(Status(StatusCode::INTERNAL, "could not parse request proto"));
79  return;
80  }
81  const auto* duration_proto =
83  if (duration_proto != nullptr) {
85  google_protobuf_Duration_seconds(duration_proto),
86  google_protobuf_Duration_nanos(duration_proto));
87  }
88  auto min_interval = grpc_core::Duration::Milliseconds(
90  if (report_interval_ < min_interval) report_interval_ = min_interval;
91  // Send initial response.
92  SendResponse();
93  }
94 
95  void OnWriteDone(bool ok) override {
96  if (!ok) {
97  Finish(Status(StatusCode::UNKNOWN, "write failed"));
98  return;
99  }
100  response_.Clear();
101  if (!MaybeScheduleTimer()) {
102  Finish(Status(StatusCode::UNKNOWN, "call cancelled by client"));
103  }
104  }
105 
106  void OnCancel() override {
107  if (MaybeCancelTimer()) {
108  Finish(Status(StatusCode::UNKNOWN, "call cancelled by client"));
109  }
110  }
111 
112  void OnDone() override {
113  // Free the initial ref from instantiation.
114  Unref(DEBUG_LOCATION, "OnDone");
115  }
116 
117  private:
118  void SendResponse() {
119  Slice response_slice = service_->GetOrCreateSerializedResponse();
120  ByteBuffer response_buffer(&response_slice, 1);
121  response_.Swap(&response_buffer);
123  }
124 
126  grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
129  if (cancelled_) return false;
132  [self = Ref(DEBUG_LOCATION, "Orca Service")] { self->OnTimer(); });
133  return true;
134  }
135 
138  cancelled_ = true;
139  if (timer_handle_.has_value() &&
140  GetDefaultEventEngine()->Cancel(*timer_handle_)) {
142  return true;
143  }
144  return false;
145  }
146 
147  void OnTimer() {
148  grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
152  SendResponse();
153  }
154 
156 
160  bool cancelled_ ABSL_GUARDED_BY(&timer_mu_) = false;
161 
164 };
165 
166 //
167 // OrcaService
168 //
169 
171  : min_report_duration_(options.min_report_duration) {
173  "/xds.service.orca.v3.OpenRcaService/StreamCoreMetrics",
174  internal::RpcMethod::SERVER_STREAMING, /*handler=*/nullptr));
177  [this](CallbackServerContext* /*ctx*/, const ByteBuffer* request) {
178  return new Reactor(this, request);
179  }));
180 }
181 
182 void OrcaService::SetCpuUtilization(double cpu_utilization) {
184  cpu_utilization_ = cpu_utilization;
185  response_slice_.reset();
186 }
187 
190  cpu_utilization_ = -1;
191  response_slice_.reset();
192 }
193 
194 void OrcaService::SetMemoryUtilization(double memory_utilization) {
196  memory_utilization_ = memory_utilization;
197  response_slice_.reset();
198 }
199 
202  memory_utilization_ = -1;
203  response_slice_.reset();
204 }
205 
208  named_utilization_[std::move(name)] = utilization;
209  response_slice_.reset();
210 }
211 
214  named_utilization_.erase(name);
215  response_slice_.reset();
216 }
217 
219  std::map<std::string, double> named_utilization) {
221  named_utilization_ = std::move(named_utilization);
222  response_slice_.reset();
223 }
224 
227  if (!response_slice_.has_value()) {
231  if (cpu_utilization_ != -1) {
233  cpu_utilization_);
234  }
235  if (memory_utilization_ != -1) {
237  memory_utilization_);
238  }
239  for (const auto& p : named_utilization_) {
241  response,
242  upb_StringView_FromDataAndSize(p.first.data(), p.first.size()),
243  p.second, arena.ptr());
244  }
245  size_t buf_length;
247  &buf_length);
248  response_slice_.emplace(buf, buf_length);
249  }
250  return Slice(*response_slice_);
251 }
252 
253 } // namespace experimental
254 } // namespace grpc
grpc::experimental::OrcaService::Reactor::OnDone
void OnDone() override
Definition: orca_service.cc:112
grpc::experimental::OrcaService::Options
Definition: orca_service.h:39
grpc::experimental::OrcaService
Definition: orca_service.h:37
orca_load_report.upb.h
log.h
grpc::experimental::OrcaService::DeleteCpuUtilization
void DeleteCpuUtilization()
Definition: orca_service.cc:188
grpc::internal::Mutex
Definition: include/grpcpp/impl/codegen/sync.h:59
grpc
Definition: grpcpp/alarm.h:33
grpc::experimental::OrcaService::min_report_duration_
const absl::Duration min_report_duration_
Definition: orca_service.h:71
grpc::experimental::OrcaService::Reactor::SendResponse
void SendResponse()
Definition: orca_service.cc:118
grpc::Status::ok
bool ok() const
Is the status OK?
Definition: include/grpcpp/impl/codegen/status.h:126
grpc::experimental::OrcaService::Reactor::MaybeCancelTimer
bool MaybeCancelTimer()
Definition: orca_service.cc:136
grpc::experimental::OrcaService::DeleteNamedUtilization
void DeleteNamedUtilization(const std::string &name)
Definition: orca_service.cc:212
event_engine.h
grpc::ByteBuffer::Swap
void Swap(ByteBuffer *other)
Swap the state of *this and *other.
Definition: include/grpcpp/impl/codegen/byte_buffer.h:157
options
double_dict options[]
Definition: capstone_test.c:55
benchmark.request
request
Definition: benchmark.py:77
buf
voidpf void * buf
Definition: bloaty/third_party/zlib/contrib/minizip/ioapi.h:136
testing::internal::string
::std::string string
Definition: bloaty/third_party/protobuf/third_party/googletest/googletest/include/gtest/internal/gtest-port.h:881
slice.h
grpc::experimental::OrcaService::Reactor::MaybeScheduleTimer
bool MaybeScheduleTimer()
Definition: orca_service.cc:125
rpc_service_method.h
grpc_core::ApplicationCallbackExecCtx
Definition: exec_ctx.h:283
setup.name
name
Definition: setup.py:542
rpc_method.h
grpc::experimental::OrcaService::SetAllNamedUtilization
void SetAllNamedUtilization(std::map< std::string, double > named_utilization)
Definition: orca_service.cc:218
grpc::experimental::OrcaService::SetNamedUtilization
void SetNamedUtilization(std::string name, double utilization)
Definition: orca_service.cc:206
grpc::experimental::OrcaService::Reactor::OnCancel
void OnCancel() override
Definition: orca_service.cc:106
grpc_event_engine::experimental::EventEngine::RunAfter
virtual TaskHandle RunAfter(Duration when, Closure *closure)=0
grpc::experimental::OrcaService::Reactor::response_
ByteBuffer response_
Definition: orca_service.cc:163
grpc::experimental::OrcaService::Reactor::timer_mu_
grpc::internal::Mutex timer_mu_
Definition: orca_service.cc:157
grpc::internal::MutexLock
Definition: include/grpcpp/impl/codegen/sync.h:86
arena
grpc_core::ScopedArenaPtr arena
Definition: binder_transport_test.cc:237
grpc::experimental::OrcaService::DeleteMemoryUtilization
void DeleteMemoryUtilization()
Definition: orca_service.cc:200
grpc::ServerWriteReactor< ByteBuffer >::StartWrite
void StartWrite(const ByteBuffer *resp)
Definition: impl/codegen/server_callback.h:599
DEBUG_LOCATION
#define DEBUG_LOCATION
Definition: debug_location.h:41
absl::Milliseconds
constexpr Duration Milliseconds(T n)
Definition: third_party/abseil-cpp/absl/time/time.h:415
grpc_core::RefCounted< Reactor >::Unref
void Unref()
Definition: ref_counted.h:302
grpc::internal::RpcServiceMethod
Server side rpc method class.
Definition: grpcpp/impl/codegen/rpc_service_method.h:86
sync.h
absl::move
constexpr absl::remove_reference_t< T > && move(T &&t) noexcept
Definition: abseil-cpp/absl/utility/utility.h:221
GPR_ASSERT
#define GPR_ASSERT(x)
Definition: include/grpc/impl/codegen/log.h:94
absl::optional::has_value
constexpr bool has_value() const noexcept
Definition: abseil-cpp/absl/types/optional.h:461
xds_data_orca_v3_OrcaLoadReport
struct xds_data_orca_v3_OrcaLoadReport xds_data_orca_v3_OrcaLoadReport
Definition: orca_load_report.upb.h:26
grpc::experimental::OrcaService::SetMemoryUtilization
void SetMemoryUtilization(double memory_utilization)
Definition: orca_service.cc:194
grpc.StatusCode.UNKNOWN
tuple UNKNOWN
Definition: src/python/grpcio/grpc/__init__.py:262
xds_data_orca_v3_OrcaLoadReport_set_cpu_utilization
UPB_INLINE void xds_data_orca_v3_OrcaLoadReport_set_cpu_utilization(xds_data_orca_v3_OrcaLoadReport *msg, double value)
Definition: orca_load_report.upb.h:115
xds_service_orca_v3_OrcaLoadReportRequest_parse
UPB_INLINE xds_service_orca_v3_OrcaLoadReportRequest * xds_service_orca_v3_OrcaLoadReportRequest_parse(const char *buf, size_t size, upb_Arena *arena)
Definition: orca.upb.h:36
grpc::experimental::OrcaService::SetCpuUtilization
void SetCpuUtilization(double cpu_utilization)
Definition: orca_service.cc:182
slice
grpc_slice slice
Definition: src/core/lib/surface/server.cc:467
upb.h
timer_handle_
absl::optional< EventEngine::TaskHandle > timer_handle_
Definition: weighted_target.cc:199
grpc_core::Duration::FromSecondsAndNanoseconds
static Duration FromSecondsAndNanoseconds(int64_t seconds, int32_t nanos)
Definition: src/core/lib/gprpp/time.h:274
google_protobuf_Duration_nanos
UPB_INLINE int32_t google_protobuf_Duration_nanos(const google_protobuf_Duration *msg)
Definition: duration.upb.h:69
absl::optional< EventEngine::TaskHandle >
grpc::experimental::OrcaService::GetOrCreateSerializedResponse
Slice GetOrCreateSerializedResponse()
Definition: orca_service.cc:225
time.h
grpc::ByteBuffer
A sequence of bytes.
Definition: include/grpcpp/impl/codegen/byte_buffer.h:61
grpc::ByteBuffer::Clear
void Clear()
Remove all data.
Definition: include/grpcpp/impl/codegen/byte_buffer.h:129
google_protobuf_Duration_seconds
UPB_INLINE int64_t google_protobuf_Duration_seconds(const google_protobuf_Duration *msg)
Definition: duration.upb.h:63
grpc::experimental::OrcaService::mu_
grpc::internal::Mutex mu_
Definition: orca_service.h:73
grpc_core::RefCounted
Definition: ref_counted.h:280
grpc::experimental::OrcaService::Reactor::OnTimer
void OnTimer()
Definition: orca_service.cc:147
grpc::ByteBuffer::DumpToSingleSlice
Status DumpToSingleSlice(Slice *slice) const
Dump (read) the buffer contents into slics.
Definition: byte_buffer_cc.cc:51
grpc_core::ExecCtx
Definition: exec_ctx.h:97
config.h
grpc::CallbackServerContext
Definition: grpcpp/impl/codegen/server_context.h:606
xds_data_orca_v3_OrcaLoadReport_serialize
UPB_INLINE char * xds_data_orca_v3_OrcaLoadReport_serialize(const xds_data_orca_v3_OrcaLoadReport *msg, upb_Arena *arena, size_t *len)
Definition: orca_load_report.upb.h:59
grpc::experimental::OrcaService::OrcaService
OrcaService(Options options)
Definition: orca_service.cc:170
server_callback.h
grpc_core::Duration::Milliseconds
static constexpr Duration Milliseconds(int64_t millis)
Definition: src/core/lib/gprpp/time.h:155
xds_service_orca_v3_OrcaLoadReportRequest
struct xds_service_orca_v3_OrcaLoadReportRequest xds_service_orca_v3_OrcaLoadReportRequest
Definition: orca.upb.h:24
upb::Arena
Definition: upb.hpp:68
debug_location.h
ref_counted.h
grpc::Service::MarkMethodCallback
void MarkMethodCallback(int index, internal::MethodHandler *handler)
Definition: grpcpp/impl/codegen/service_type.h:198
upb.hpp
exec_ctx
grpc_core::ExecCtx exec_ctx
Definition: end2end_binder_transport_test.cc:75
grpc::experimental::OrcaService::Reactor::report_interval_
grpc_core::Duration report_interval_
Definition: orca_service.cc:162
absl::optional::reset
ABSL_ATTRIBUTE_REINITIALIZES void reset() noexcept
Definition: abseil-cpp/absl/types/optional.h:342
orca_service.h
grpc::internal::CallbackServerStreamingHandler
Definition: include/grpcpp/impl/codegen/byte_buffer.h:43
server_context.h
asyncio_get_stats.response
response
Definition: asyncio_get_stats.py:28
upb_StringView_FromDataAndSize
UPB_INLINE upb_StringView upb_StringView_FromDataAndSize(const char *data, size_t size)
Definition: upb/upb/upb.h:77
grpc::protobuf::util::Status
GRPC_CUSTOM_UTIL_STATUS Status
Definition: include/grpcpp/impl/codegen/config_protobuf.h:93
server_callback_handlers.h
xds_service_orca_v3_OrcaLoadReportRequest_report_interval
UPB_INLINE const struct google_protobuf_Duration * xds_service_orca_v3_OrcaLoadReportRequest_report_interval(const xds_service_orca_v3_OrcaLoadReportRequest *msg)
Definition: orca.upb.h:68
ok
bool ok
Definition: async_end2end_test.cc:197
grpc_event_engine::experimental::GetDefaultEventEngine
EventEngine * GetDefaultEventEngine()
Definition: event_engine.cc:47
exec_ctx.h
grpc::ServerWriteReactor< ByteBuffer >::Finish
void Finish(grpc::Status s) ABSL_LOCKS_EXCLUDED(writer_mu_)
Definition: impl/codegen/server_callback.h:637
ref_counted_ptr.h
grpc::experimental::OrcaService::Reactor
Definition: orca_service.cc:63
grpc::experimental::OrcaService::Reactor::Reactor
Reactor(OrcaService *service, const ByteBuffer *request_buffer)
Definition: orca_service.cc:66
grpc::ServerWriteReactor< ByteBuffer >::ABSL_GUARDED_BY
PreBindBacklog backlog_ ABSL_GUARDED_BY(writer_mu_)
grpc::experimental::OrcaService::Reactor::service_
OrcaService * service_
Definition: orca_service.cc:155
grpc::Service::AddMethod
void AddMethod(internal::RpcServiceMethod *method)
Definition: grpcpp/impl/codegen/service_type.h:147
duration.upb.h
xds_data_orca_v3_OrcaLoadReport_new
UPB_INLINE xds_data_orca_v3_OrcaLoadReport * xds_data_orca_v3_OrcaLoadReport_new(upb_Arena *arena)
Definition: orca_load_report.upb.h:37
xds_data_orca_v3_OrcaLoadReport_set_mem_utilization
UPB_INLINE void xds_data_orca_v3_OrcaLoadReport_set_mem_utilization(xds_data_orca_v3_OrcaLoadReport *msg, double value)
Definition: orca_load_report.upb.h:118
grpc::Slice
Definition: include/grpcpp/impl/codegen/slice.h:36
service
__attribute__((deprecated("Please use GRPCProtoMethod."))) @interface ProtoMethod NSString * service
Definition: ProtoMethod.h:25
grpc.StatusCode.INTERNAL
tuple INTERNAL
Definition: src/python/grpcio/grpc/__init__.py:277
grpc::experimental::OrcaService::Reactor::OnWriteDone
void OnWriteDone(bool ok) override
Definition: orca_service.cc:95
event_engine_factory.h
grpc_core::RefCounted< Reactor >::RefCounted
RefCounted(const RefCounted &)=delete
xds_data_orca_v3_OrcaLoadReport_utilization_set
UPB_INLINE bool xds_data_orca_v3_OrcaLoadReport_utilization_set(xds_data_orca_v3_OrcaLoadReport *msg, upb_StringView key, double val, upb_Arena *a)
Definition: orca_load_report.upb.h:135
grpc_core::Duration
Definition: src/core/lib/gprpp/time.h:122
grpc::internal::RpcMethod::SERVER_STREAMING
@ SERVER_STREAMING
Definition: grpcpp/impl/codegen/rpc_method.h:36
orca.upb.h
grpc::ServerWriteReactor
ServerWriteReactor is the interface for a server-streaming RPC.
Definition: impl/codegen/server_callback.h:186
grpc_core::RefCounted< Reactor >::Ref
RefCountedPtr< Reactor > Ref() GRPC_MUST_USE_RESULT
Definition: ref_counted.h:287
status.h
byte_buffer.h


grpc
Author(s):
autogenerated on Fri May 16 2025 02:59:36