oob_backend_metric.cc
Go to the documentation of this file.
1 //
2 // Copyright 2022 gRPC authors.
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 // http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 //
16 
18 
20 
21 #include <string.h>
22 
23 #include <algorithm>
24 #include <set>
25 #include <string>
26 #include <utility>
27 #include <vector>
28 
29 #include "absl/base/thread_annotations.h"
30 #include "absl/memory/memory.h"
31 #include "absl/status/status.h"
32 #include "absl/strings/string_view.h"
33 #include "absl/types/optional.h"
35 #include "upb/upb.hpp"
37 
40 #include <grpc/slice.h>
41 #include <grpc/status.h>
42 #include <grpc/support/alloc.h>
43 #include <grpc/support/log.h>
44 
65 
66 namespace grpc_core {
67 
68 namespace {
69 
70 TraceFlag grpc_orca_client_trace(false, "orca_client");
71 
72 class OrcaWatcher;
73 
74 // This producer is registered with a subchannel. It creates a
75 // streaming ORCA call and reports the resulting backend metrics to all
76 // registered watchers.
77 class OrcaProducer : public Subchannel::DataProducerInterface {
78  public:
79  explicit OrcaProducer(RefCountedPtr<Subchannel> subchannel);
80 
81  void Orphan() override;
82 
83  static UniqueTypeName Type() {
84  static UniqueTypeName::Factory kFactory("orca");
85  return kFactory.Create();
86  }
87 
88  UniqueTypeName type() const override { return Type(); }
89 
90  // Adds and removes watchers.
91  void AddWatcher(OrcaWatcher* watcher);
92  void RemoveWatcher(OrcaWatcher* watcher);
93 
94  private:
95  class ConnectivityWatcher;
96  class OrcaStreamEventHandler;
97 
98  // Returns the minimum requested reporting interval across all watchers.
99  Duration GetMinIntervalLocked() const ABSL_EXCLUSIVE_LOCKS_REQUIRED(&mu_);
100 
101  // Starts a new stream if we have a connected subchannel.
102  // Called whenever the reporting interval changes or the subchannel
103  // transitions to state READY.
104  void MaybeStartStreamLocked() ABSL_EXCLUSIVE_LOCKS_REQUIRED(&mu_);
105 
106  // Handles a connectivity state change on the subchannel.
107  void OnConnectivityStateChange(grpc_connectivity_state state);
108 
109  // Called to notify watchers of a new backend metric report.
110  void NotifyWatchers(const BackendMetricData& backend_metric_data);
111 
112  RefCountedPtr<Subchannel> subchannel_;
113  RefCountedPtr<ConnectedSubchannel> connected_subchannel_;
114  ConnectivityWatcher* connectivity_watcher_;
116  std::set<OrcaWatcher*> watchers_ ABSL_GUARDED_BY(mu_);
118  OrphanablePtr<SubchannelStreamClient> stream_client_ ABSL_GUARDED_BY(mu_);
119 };
120 
121 // This watcher is returned to the LB policy and added to the
122 // client channel SubchannelWrapper.
123 class OrcaWatcher : public InternalSubchannelDataWatcherInterface {
124  public:
125  OrcaWatcher(Duration report_interval,
126  std::unique_ptr<OobBackendMetricWatcher> watcher)
127  : report_interval_(report_interval), watcher_(std::move(watcher)) {}
128  ~OrcaWatcher() override;
129 
130  Duration report_interval() const { return report_interval_; }
131  OobBackendMetricWatcher* watcher() const { return watcher_.get(); }
132 
133  // When the client channel sees this wrapper, it will pass it the real
134  // subchannel to use.
135  void SetSubchannel(Subchannel* subchannel) override;
136 
137  private:
139  std::unique_ptr<OobBackendMetricWatcher> watcher_;
140  RefCountedPtr<OrcaProducer> producer_;
141 };
142 
143 //
144 // OrcaProducer::ConnectivityWatcher
145 //
146 
147 class OrcaProducer::ConnectivityWatcher
148  : public Subchannel::ConnectivityStateWatcherInterface {
149  public:
150  explicit ConnectivityWatcher(WeakRefCountedPtr<OrcaProducer> producer)
151  : producer_(std::move(producer)),
153 
154  ~ConnectivityWatcher() override {
156  }
157 
158  void OnConnectivityStateChange() override {
159  auto change = PopConnectivityStateChange();
160  producer_->OnConnectivityStateChange(change.state);
161  }
162 
163  grpc_pollset_set* interested_parties() override {
164  return interested_parties_;
165  }
166 
167  private:
168  WeakRefCountedPtr<OrcaProducer> producer_;
170 };
171 
172 //
173 // OrcaProducer::OrcaStreamEventHandler
174 //
175 
176 class OrcaProducer::OrcaStreamEventHandler
177  : public SubchannelStreamClient::CallEventHandler {
178  public:
179  OrcaStreamEventHandler(WeakRefCountedPtr<OrcaProducer> producer,
180  Duration report_interval)
181  : producer_(std::move(producer)), report_interval_(report_interval) {}
182 
183  Slice GetPathLocked() override {
185  "/xds.service.orca.v3.OpenRcaService/StreamCoreMetrics");
186  }
187 
188  void OnCallStartLocked(SubchannelStreamClient* /*client*/) override {}
189 
190  void OnRetryTimerStartLocked(SubchannelStreamClient* /*client*/) override {}
191 
192  grpc_slice EncodeSendMessageLocked() override {
196  gpr_timespec timespec = report_interval_.as_timespec();
197  auto* report_interval =
199  request, arena.ptr());
200  google_protobuf_Duration_set_seconds(report_interval, timespec.tv_sec);
201  google_protobuf_Duration_set_nanos(report_interval, timespec.tv_nsec);
202  size_t buf_length;
204  request, arena.ptr(), &buf_length);
205  grpc_slice request_slice = GRPC_SLICE_MALLOC(buf_length);
206  memcpy(GRPC_SLICE_START_PTR(request_slice), buf, buf_length);
207  return request_slice;
208  }
209 
210  absl::Status RecvMessageReadyLocked(
211  SubchannelStreamClient* /*client*/,
212  absl::string_view serialized_message) override {
213  auto* allocator = new BackendMetricAllocator(producer_);
214  auto* backend_metric_data =
215  ParseBackendMetricData(serialized_message, allocator);
216  if (backend_metric_data == nullptr) {
217  delete allocator;
218  return absl::InvalidArgumentError("unable to parse Orca response");
219  }
220  allocator->AsyncNotifyWatchersAndDelete();
221  return absl::OkStatus();
222  }
223 
224  void RecvTrailingMetadataReadyLocked(SubchannelStreamClient* /*client*/,
225  grpc_status_code status) override {
227  static const char kErrorMessage[] =
228  "Orca stream returned UNIMPLEMENTED; disabling";
230  auto* channelz_node = producer_->subchannel_->channelz_node();
231  if (channelz_node != nullptr) {
232  channelz_node->AddTraceEvent(
235  }
236  }
237  }
238 
239  private:
240  // This class acts as storage for the parsed backend metric data. It
241  // is injected into ParseBackendMetricData() as an allocator that
242  // returns internal storage. It then also acts as a place to hold
243  // onto the data during an async hop into the ExecCtx before sending
244  // notifications, which avoids lock inversion problems due to
245  // acquiring producer_->mu_ while holding the lock from inside of
246  // SubchannelStreamClient.
247  class BackendMetricAllocator : public BackendMetricAllocatorInterface {
248  public:
249  explicit BackendMetricAllocator(WeakRefCountedPtr<OrcaProducer> producer)
250  : producer_(std::move(producer)) {}
251 
252  BackendMetricData* AllocateBackendMetricData() override {
253  return &backend_metric_data_;
254  }
255 
256  char* AllocateString(size_t size) override {
257  char* string = static_cast<char*>(gpr_malloc(size));
258  string_storage_.emplace_back(string);
259  return string;
260  }
261 
262  // Notifies watchers asynchronously and then deletes the
263  // BackendMetricAllocator object.
264  void AsyncNotifyWatchersAndDelete() {
265  GRPC_CLOSURE_INIT(&closure_, NotifyWatchersInExecCtx, this, nullptr);
267  }
268 
269  private:
270  static void NotifyWatchersInExecCtx(void* arg,
271  grpc_error_handle /*error*/) {
272  auto* self = static_cast<BackendMetricAllocator*>(arg);
273  self->producer_->NotifyWatchers(self->backend_metric_data_);
274  delete self;
275  }
276 
277  WeakRefCountedPtr<OrcaProducer> producer_;
278  BackendMetricData backend_metric_data_;
279  std::vector<UniquePtr<char>> string_storage_;
281  };
282 
283  WeakRefCountedPtr<OrcaProducer> producer_;
285 };
286 
287 //
288 // OrcaProducer
289 //
290 
291 OrcaProducer::OrcaProducer(RefCountedPtr<Subchannel> subchannel)
293  subchannel_->AddDataProducer(this);
294  connected_subchannel_ = subchannel_->connected_subchannel();
295  auto connectivity_watcher = MakeRefCounted<ConnectivityWatcher>(WeakRef());
296  connectivity_watcher_ = connectivity_watcher.get();
297  subchannel_->WatchConnectivityState(
298  /*health_check_service_name=*/absl::nullopt,
299  std::move(connectivity_watcher));
300 }
301 
302 void OrcaProducer::Orphan() {
303  {
304  MutexLock lock(&mu_);
305  stream_client_.reset();
306  }
307  subchannel_->CancelConnectivityStateWatch(
308  /*health_check_service_name=*/absl::nullopt, connectivity_watcher_);
309  subchannel_->RemoveDataProducer(this);
310 }
311 
312 void OrcaProducer::AddWatcher(OrcaWatcher* watcher) {
313  MutexLock lock(&mu_);
314  watchers_.insert(watcher);
315  Duration watcher_interval = watcher->report_interval();
316  if (watcher_interval < report_interval_) {
317  report_interval_ = watcher_interval;
318  stream_client_.reset();
319  MaybeStartStreamLocked();
320  }
321 }
322 
323 void OrcaProducer::RemoveWatcher(OrcaWatcher* watcher) {
324  MutexLock lock(&mu_);
325  watchers_.erase(watcher);
326  if (watchers_.empty()) {
327  stream_client_.reset();
328  return;
329  }
330  Duration new_interval = GetMinIntervalLocked();
331  if (new_interval < report_interval_) {
332  report_interval_ = new_interval;
333  stream_client_.reset();
334  MaybeStartStreamLocked();
335  }
336 }
337 
338 Duration OrcaProducer::GetMinIntervalLocked() const {
339  Duration duration = Duration::Infinity();
340  for (OrcaWatcher* watcher : watchers_) {
341  Duration watcher_interval = watcher->report_interval();
342  if (watcher_interval < duration) duration = watcher_interval;
343  }
344  return duration;
345 }
346 
347 void OrcaProducer::MaybeStartStreamLocked() {
348  if (connected_subchannel_ == nullptr) return;
349  stream_client_ = MakeOrphanable<SubchannelStreamClient>(
350  connected_subchannel_, subchannel_->pollset_set(),
351  absl::make_unique<OrcaStreamEventHandler>(WeakRef(), report_interval_),
352  GRPC_TRACE_FLAG_ENABLED(grpc_orca_client_trace) ? "OrcaClient" : nullptr);
353 }
354 
355 void OrcaProducer::NotifyWatchers(
356  const BackendMetricData& backend_metric_data) {
357  if (GRPC_TRACE_FLAG_ENABLED(grpc_orca_client_trace)) {
358  gpr_log(GPR_INFO, "OrcaProducer %p: reporting backend metrics to watchers",
359  this);
360  }
361  MutexLock lock(&mu_);
362  for (OrcaWatcher* watcher : watchers_) {
363  watcher->watcher()->OnBackendMetricReport(backend_metric_data);
364  }
365 }
366 
367 void OrcaProducer::OnConnectivityStateChange(grpc_connectivity_state state) {
368  MutexLock lock(&mu_);
369  if (state == GRPC_CHANNEL_READY) {
370  connected_subchannel_ = subchannel_->connected_subchannel();
371  if (!watchers_.empty()) MaybeStartStreamLocked();
372  } else {
373  connected_subchannel_.reset();
374  stream_client_.reset();
375  }
376 }
377 
378 //
379 // OrcaWatcher
380 //
381 
382 OrcaWatcher::~OrcaWatcher() {
383  if (producer_ != nullptr) producer_->RemoveWatcher(this);
384 }
385 
386 void OrcaWatcher::SetSubchannel(Subchannel* subchannel) {
387  // Check if our producer is already registered with the subchannel.
388  // If not, create a new one, which will register itself with the subchannel.
389  auto* p = static_cast<OrcaProducer*>(
390  subchannel->GetDataProducer(OrcaProducer::Type()));
391  if (p != nullptr) producer_ = p->RefIfNonZero();
392  if (producer_ == nullptr) {
393  producer_ = MakeRefCounted<OrcaProducer>(subchannel->Ref());
394  }
395  // Register ourself with the producer.
396  producer_->AddWatcher(this);
397 }
398 
399 } // namespace
400 
401 std::unique_ptr<SubchannelInterface::DataWatcherInterface>
403  std::unique_ptr<OobBackendMetricWatcher> watcher) {
404  return absl::make_unique<OrcaWatcher>(report_interval, std::move(watcher));
405 }
406 
407 } // namespace grpc_core
GRPC_CLOSURE_INIT
#define GRPC_CLOSURE_INIT(closure, cb, cb_arg, scheduler)
Definition: closure.h:115
trace.h
gpr_timespec::tv_nsec
int32_t tv_nsec
Definition: gpr_types.h:52
absl::InvalidArgumentError
Status InvalidArgumentError(absl::string_view message)
Definition: third_party/abseil-cpp/absl/status/status.cc:351
string_storage_
std::vector< UniquePtr< char > > string_storage_
Definition: oob_backend_metric.cc:279
slice.h
gpr_timespec::tv_sec
int64_t tv_sec
Definition: gpr_types.h:51
Type
struct Type Type
Definition: bloaty/third_party/protobuf/php/ext/google/protobuf/protobuf.h:673
GPR_INFO
#define GPR_INFO
Definition: include/grpc/impl/codegen/log.h:56
xds_service_orca_v3_OrcaLoadReportRequest_mutable_report_interval
UPB_INLINE struct google_protobuf_Duration * xds_service_orca_v3_OrcaLoadReportRequest_mutable_report_interval(xds_service_orca_v3_OrcaLoadReportRequest *msg, upb_Arena *arena)
Definition: orca.upb.h:82
GRPC_CHANNEL_READY
@ GRPC_CHANNEL_READY
Definition: include/grpc/impl/codegen/connectivity_state.h:36
orphanable.h
GRPC_ERROR_NONE
#define GRPC_ERROR_NONE
Definition: error.h:234
log.h
MutexLock
#define MutexLock(x)
Definition: bloaty/third_party/re2/util/mutex.h:125
const
#define const
Definition: bloaty/third_party/zlib/zconf.h:230
client_channel_channelz.h
slice.h
producer_
RefCountedPtr< OrcaProducer > producer_
Definition: oob_backend_metric.cc:140
report_interval_
const Duration report_interval_
Definition: oob_backend_metric.cc:138
subchannel.h
grpc_core
Definition: call_metric_recorder.h:31
grpc_core::channelz::ChannelTrace::Error
@ Error
Definition: channel_trace.h:55
grpc_pollset_set
struct grpc_pollset_set grpc_pollset_set
Definition: iomgr_fwd.h:23
string.h
benchmark.request
request
Definition: benchmark.py:77
grpc_core::slice_detail::StaticConstructors< Slice >::FromStaticString
static Slice FromStaticString(const char *s)
Definition: src/core/lib/slice/slice.h:201
buf
voidpf void * buf
Definition: bloaty/third_party/zlib/contrib/minizip/ioapi.h:136
absl::string_view
Definition: abseil-cpp/absl/strings/string_view.h:167
grpc_pollset_set_create
grpc_pollset_set * grpc_pollset_set_create()
Definition: pollset_set.cc:29
GRPC_SLICE_MALLOC
#define GRPC_SLICE_MALLOC(len)
Definition: include/grpc/slice.h:70
subchannel
RingHashSubchannelData * subchannel
Definition: ring_hash.cc:285
testing::internal::string
::std::string string
Definition: bloaty/third_party/protobuf/third_party/googletest/googletest/include/gtest/internal/gtest-port.h:881
grpc_status_code
grpc_status_code
Definition: include/grpc/impl/codegen/status.h:28
connected_subchannel_
RefCountedPtr< ConnectedSubchannel > connected_subchannel_
Definition: oob_backend_metric.cc:113
absl::OkStatus
Status OkStatus()
Definition: third_party/abseil-cpp/absl/status/status.h:882
gpr_malloc
GPRAPI void * gpr_malloc(size_t size)
Definition: alloc.cc:29
closure.h
ABSL_GUARDED_BY
#define ABSL_GUARDED_BY(x)
Definition: abseil-cpp/absl/base/thread_annotations.h:62
status
absl::Status status
Definition: rls.cc:251
subchannel_
RefCountedPtr< Subchannel > subchannel_
Definition: oob_backend_metric.cc:112
oob_backend_metric.h
xds_manager.p
p
Definition: xds_manager.py:60
GRPC_TRACE_FLAG_ENABLED
#define GRPC_TRACE_FLAG_ENABLED(f)
Definition: debug/trace.h:114
mu_
Mutex mu_
Definition: oob_backend_metric.cc:115
arena
grpc_core::ScopedArenaPtr arena
Definition: binder_transport_test.cc:237
grpc_connectivity_state
grpc_connectivity_state
Definition: include/grpc/impl/codegen/connectivity_state.h:30
status.h
connectivity_watcher_
ConnectivityWatcher * connectivity_watcher_
Definition: oob_backend_metric.cc:114
memory.h
DEBUG_LOCATION
#define DEBUG_LOCATION
Definition: debug_location.h:41
memcpy
memcpy(mem, inblock.get(), min(CONTAINING_RECORD(inblock.get(), MEMBLOCK, data) ->size, size))
absl::move
constexpr absl::remove_reference_t< T > && move(T &&t) noexcept
Definition: abseil-cpp/absl/utility/utility.h:221
subchannel_interface_internal.h
grpc_core::MakeOobBackendMetricWatcher
std::unique_ptr< SubchannelInterface::DataWatcherInterface > MakeOobBackendMetricWatcher(Duration report_interval, std::unique_ptr< OobBackendMetricWatcher > watcher)
Definition: oob_backend_metric.cc:402
google_protobuf_Duration_set_nanos
UPB_INLINE void google_protobuf_Duration_set_nanos(google_protobuf_Duration *msg, int32_t value)
Definition: duration.upb.h:76
gpr_log
GPRAPI void gpr_log(const char *file, int line, gpr_log_severity severity, const char *format,...) GPR_PRINT_FORMAT_CHECK(4
watchers_
std::map< SubchannelInterface::ConnectivityStateWatcherInterface *, WatcherWrapper * > watchers_
Definition: outlier_detection.cc:226
ABSL_EXCLUSIVE_LOCKS_REQUIRED
#define ABSL_EXCLUSIVE_LOCKS_REQUIRED(...)
Definition: abseil-cpp/absl/base/thread_annotations.h:145
xds_service_orca_v3_OrcaLoadReportRequest_serialize
UPB_INLINE char * xds_service_orca_v3_OrcaLoadReportRequest_serialize(const xds_service_orca_v3_OrcaLoadReportRequest *msg, upb_Arena *arena, size_t *len)
Definition: orca.upb.h:55
grpc_pollset_set_destroy
void grpc_pollset_set_destroy(grpc_pollset_set *pollset_set)
Definition: pollset_set.cc:33
connectivity_state.h
pollset_set.h
GRPC_SLICE_START_PTR
#define GRPC_SLICE_START_PTR(slice)
Definition: include/grpc/impl/codegen/slice.h:101
arg
Definition: cmdline.cc:40
grpc_slice_from_static_string
GPRAPI grpc_slice grpc_slice_from_static_string(const char *source)
Definition: slice/slice.cc:89
time.h
grpc_slice
Definition: include/grpc/impl/codegen/slice.h:65
error.h
closure_
grpc_closure closure_
Definition: oob_backend_metric.cc:280
GPR_ERROR
#define GPR_ERROR
Definition: include/grpc/impl/codegen/log.h:57
xds_service_orca_v3_OrcaLoadReportRequest_new
UPB_INLINE xds_service_orca_v3_OrcaLoadReportRequest * xds_service_orca_v3_OrcaLoadReportRequest_new(upb_Arena *arena)
Definition: orca.upb.h:33
subchannel_stream_client.h
gpr_types.h
xds_service_orca_v3_OrcaLoadReportRequest
struct xds_service_orca_v3_OrcaLoadReportRequest xds_service_orca_v3_OrcaLoadReportRequest
Definition: orca.upb.h:24
upb::Arena
Definition: upb.hpp:68
backend_metric.h
debug_location.h
grpc_core::testing::kErrorMessage
constexpr const char * kErrorMessage
Definition: grpc_tls_certificate_distributor_test.cc:51
upb.hpp
absl::Status
Definition: third_party/abseil-cpp/absl/status/status.h:424
alloc.h
watcher_
std::unique_ptr< OobBackendMetricWatcher > watcher_
Definition: oob_backend_metric.cc:139
google::protobuf.internal::Mutex
WrappedMutex Mutex
Definition: bloaty/third_party/protobuf/src/google/protobuf/stubs/mutex.h:113
grpc_core::OrphanablePtr
std::unique_ptr< T, Deleter > OrphanablePtr
Definition: orphanable.h:64
std
Definition: grpcpp/impl/codegen/async_unary_call.h:407
backend_metric_data_
BackendMetricData backend_metric_data_
Definition: oob_backend_metric.cc:278
framework.rpc.grpc_channelz.Subchannel
Subchannel
Definition: grpc_channelz.py:38
cpp.gmock_class.set
set
Definition: bloaty/third_party/googletest/googlemock/scripts/generator/cpp/gmock_class.py:44
arg
struct arg arg
state
Definition: bloaty/third_party/zlib/contrib/blast/blast.c:41
exec_ctx.h
unique_type_name.h
grpc_core::ExecCtx::Run
static void Run(const DebugLocation &location, grpc_closure *closure, grpc_error_handle error)
Definition: exec_ctx.cc:98
ref_counted_ptr.h
watcher
ClusterWatcher * watcher
Definition: cds.cc:148
GRPC_STATUS_UNIMPLEMENTED
@ GRPC_STATUS_UNIMPLEMENTED
Definition: include/grpc/impl/codegen/status.h:124
Duration
Definition: bloaty/third_party/protobuf/src/google/protobuf/duration.pb.h:69
duration.upb.h
asyncio_get_stats.type
type
Definition: asyncio_get_stats.py:37
iomgr_fwd.h
gpr_timespec
Definition: gpr_types.h:50
grpc_error
Definition: error_internal.h:42
size
voidpf void uLong size
Definition: bloaty/third_party/zlib/contrib/minizip/ioapi.h:136
channel_trace.h
grpc_core::Duration
Definition: src/core/lib/gprpp/time.h:122
self
PHP_PROTO_OBJECT_FREE_END PHP_PROTO_OBJECT_DTOR_END intern self
Definition: bloaty/third_party/protobuf/php/ext/google/protobuf/map.c:543
orca.upb.h
grpc_closure
Definition: closure.h:56
google_protobuf_Duration_set_seconds
UPB_INLINE void google_protobuf_Duration_set_seconds(google_protobuf_Duration *msg, int64_t value)
Definition: duration.upb.h:73
sync.h
grpc_core::ParseBackendMetricData
const BackendMetricData * ParseBackendMetricData(absl::string_view serialized_load_report, BackendMetricAllocatorInterface *allocator)
Definition: backend_metric.cc:57
interested_parties_
grpc_pollset_set * interested_parties_
Definition: oob_backend_metric.cc:169
port_platform.h


grpc
Author(s):
autogenerated on Fri May 16 2025 02:59:35