29 #include "absl/base/thread_annotations.h"
30 #include "absl/memory/memory.h"
31 #include "absl/status/status.h"
32 #include "absl/strings/string_view.h"
33 #include "absl/types/optional.h"
70 TraceFlag grpc_orca_client_trace(
false,
"orca_client");
77 class OrcaProducer :
public Subchannel::DataProducerInterface {
79 explicit OrcaProducer(RefCountedPtr<Subchannel>
subchannel);
81 void Orphan()
override;
83 static UniqueTypeName
Type() {
84 static UniqueTypeName::Factory kFactory(
"orca");
85 return kFactory.Create();
88 UniqueTypeName
type()
const override {
return Type(); }
91 void AddWatcher(OrcaWatcher*
watcher);
92 void RemoveWatcher(OrcaWatcher*
watcher);
95 class ConnectivityWatcher;
96 class OrcaStreamEventHandler;
110 void NotifyWatchers(
const BackendMetricData& backend_metric_data);
123 class OrcaWatcher : public InternalSubchannelDataWatcherInterface {
125 OrcaWatcher(
Duration report_interval,
126 std::unique_ptr<OobBackendMetricWatcher>
watcher)
128 ~OrcaWatcher()
override;
147 class OrcaProducer::ConnectivityWatcher
148 :
public Subchannel::ConnectivityStateWatcherInterface {
150 explicit ConnectivityWatcher(WeakRefCountedPtr<OrcaProducer> producer)
154 ~ConnectivityWatcher()
override {
158 void OnConnectivityStateChange()
override {
159 auto change = PopConnectivityStateChange();
160 producer_->OnConnectivityStateChange(change.state);
168 WeakRefCountedPtr<OrcaProducer>
producer_;
176 class OrcaProducer::OrcaStreamEventHandler
177 :
public SubchannelStreamClient::CallEventHandler {
179 OrcaStreamEventHandler(WeakRefCountedPtr<OrcaProducer> producer,
183 Slice GetPathLocked()
override {
185 "/xds.service.orca.v3.OpenRcaService/StreamCoreMetrics");
188 void OnCallStartLocked(SubchannelStreamClient* )
override {}
190 void OnRetryTimerStartLocked(SubchannelStreamClient* )
override {}
192 grpc_slice EncodeSendMessageLocked()
override {
197 auto* report_interval =
207 return request_slice;
211 SubchannelStreamClient* ,
213 auto* allocator =
new BackendMetricAllocator(
producer_);
214 auto* backend_metric_data =
216 if (backend_metric_data ==
nullptr) {
220 allocator->AsyncNotifyWatchersAndDelete();
224 void RecvTrailingMetadataReadyLocked(SubchannelStreamClient* ,
228 "Orca stream returned UNIMPLEMENTED; disabling";
230 auto* channelz_node =
producer_->subchannel_->channelz_node();
231 if (channelz_node !=
nullptr) {
232 channelz_node->AddTraceEvent(
247 class BackendMetricAllocator :
public BackendMetricAllocatorInterface {
249 explicit BackendMetricAllocator(WeakRefCountedPtr<OrcaProducer> producer)
252 BackendMetricData* AllocateBackendMetricData()
override {
256 char* AllocateString(
size_t size)
override {
264 void AsyncNotifyWatchersAndDelete() {
270 static void NotifyWatchersInExecCtx(
void*
arg,
272 auto*
self =
static_cast<BackendMetricAllocator*
>(
arg);
273 self->producer_->NotifyWatchers(
self->backend_metric_data_);
277 WeakRefCountedPtr<OrcaProducer>
producer_;
283 WeakRefCountedPtr<OrcaProducer>
producer_;
291 OrcaProducer::OrcaProducer(RefCountedPtr<Subchannel>
subchannel)
295 auto connectivity_watcher = MakeRefCounted<ConnectivityWatcher>(WeakRef());
302 void OrcaProducer::Orphan() {
305 stream_client_.reset();
312 void OrcaProducer::AddWatcher(OrcaWatcher*
watcher) {
318 stream_client_.reset();
319 MaybeStartStreamLocked();
323 void OrcaProducer::RemoveWatcher(OrcaWatcher*
watcher) {
327 stream_client_.reset();
330 Duration new_interval = GetMinIntervalLocked();
333 stream_client_.reset();
334 MaybeStartStreamLocked();
338 Duration OrcaProducer::GetMinIntervalLocked()
const {
339 Duration duration = Duration::Infinity();
342 if (watcher_interval < duration) duration = watcher_interval;
347 void OrcaProducer::MaybeStartStreamLocked() {
349 stream_client_ = MakeOrphanable<SubchannelStreamClient>(
355 void OrcaProducer::NotifyWatchers(
356 const BackendMetricData& backend_metric_data) {
358 gpr_log(
GPR_INFO,
"OrcaProducer %p: reporting backend metrics to watchers",
363 watcher->watcher()->OnBackendMetricReport(backend_metric_data);
371 if (!
watchers_.empty()) MaybeStartStreamLocked();
374 stream_client_.reset();
382 OrcaWatcher::~OrcaWatcher() {
389 auto*
p =
static_cast<OrcaProducer*
>(
401 std::unique_ptr<SubchannelInterface::DataWatcherInterface>
403 std::unique_ptr<OobBackendMetricWatcher>
watcher) {