34 #include "absl/memory/memory.h"
35 #include "absl/random/random.h"
36 #include "absl/status/status.h"
37 #include "absl/status/statusor.h"
38 #include "absl/strings/string_view.h"
39 #include "absl/types/variant.h"
82 return parse_succeeded && parsed_value;
87 constexpr
char kOutlierDetection[] =
"outlier_detection_experimental";
92 OutlierDetectionLbConfig(
93 OutlierDetectionConfig outlier_detection_config,
94 RefCountedPtr<LoadBalancingPolicy::Config> child_policy)
98 const char*
name()
const override {
return kOutlierDetection; }
100 bool CountingEnabled()
const {
107 const OutlierDetectionConfig& outlier_detection_config()
const {
111 RefCountedPtr<LoadBalancingPolicy::Config> child_policy()
const {
121 class OutlierDetectionLb :
public LoadBalancingPolicy {
123 explicit OutlierDetectionLb(
Args args);
125 const char*
name()
const override {
return kOutlierDetection; }
127 void UpdateLocked(UpdateArgs
args)
override;
129 void ResetBackoffLocked()
override;
132 class SubchannelState;
133 class SubchannelWrapper :
public DelegatingSubchannel {
135 SubchannelWrapper(RefCountedPtr<SubchannelState> subchannel_state,
136 RefCountedPtr<SubchannelInterface>
subchannel)
147 ~SubchannelWrapper()
override {
157 void WatchConnectivityState(
158 std::unique_ptr<ConnectivityStateWatcherInterface>
watcher)
override;
160 void CancelConnectivityStateWatch(
161 ConnectivityStateWatcherInterface*
watcher)
override;
163 RefCountedPtr<SubchannelState> subchannel_state()
const {
169 :
public SubchannelInterface::ConnectivityStateWatcherInterface {
171 WatcherWrapper(std::unique_ptr<
172 SubchannelInterface::ConnectivityStateWatcherInterface>
180 watcher_->OnConnectivityStateChange(
183 "subchannel ejected by outlier detection"));
204 "subchannel ejected by outlier detection");
211 return watcher_->interested_parties();
215 std::unique_ptr<SubchannelInterface::ConnectivityStateWatcherInterface>
224 std::map<SubchannelInterface::ConnectivityStateWatcherInterface*,
229 class SubchannelState :
public RefCounted<SubchannelState> {
236 void RotateBucket() {
246 if (total_request == 0) {
247 return absl::nullopt;
249 double success_rate =
256 void AddSubchannel(SubchannelWrapper*
wrapper) {
260 void RemoveSubchannel(SubchannelWrapper*
wrapper) {
264 void AddSuccessCount() {
active_bucket_.load()->successes.fetch_add(1); }
266 void AddFailureCount() {
active_bucket_.load()->failures.fetch_add(1); }
285 void MaybeUneject(
uint64_t base_ejection_time_in_millis,
286 uint64_t max_ejection_time_in_millis) {
296 std::max(base_ejection_time_in_millis,
297 max_ejection_time_in_millis)));
316 class RefCountedPicker :
public RefCounted<RefCountedPicker> {
318 explicit RefCountedPicker(std::unique_ptr<SubchannelPicker> picker)
327 class Picker :
public SubchannelPicker {
329 Picker(OutlierDetectionLb* outlier_detection_lb,
330 RefCountedPtr<RefCountedPicker> picker,
bool counting_enabled);
332 PickResult Pick(PickArgs
args)
override;
335 class SubchannelCallTracker;
336 RefCountedPtr<RefCountedPicker>
picker_;
340 class Helper :
public ChannelControlHelper {
342 explicit Helper(RefCountedPtr<OutlierDetectionLb> outlier_detection_policy)
349 RefCountedPtr<SubchannelInterface> CreateSubchannel(
352 std::unique_ptr<SubchannelPicker> picker)
override;
353 void RequestReresolution()
override;
355 void AddTraceEvent(TraceSeverity
severity,
362 class EjectionTimer :
public InternallyRefCounted<EjectionTimer> {
364 EjectionTimer(RefCountedPtr<OutlierDetectionLb> parent,
367 void Orphan()
override;
383 ~OutlierDetectionLb()
override;
385 static std::string MakeKeyForAddress(
const ServerAddress& address);
387 void ShutdownLocked()
override;
389 OrphanablePtr<LoadBalancingPolicy> CreateChildPolicyLocked(
392 void MaybeUpdatePickerLocked();
395 RefCountedPtr<OutlierDetectionLbConfig>
config_;
405 RefCountedPtr<RefCountedPicker>
picker_;
414 void OutlierDetectionLb::SubchannelWrapper::Eject() {
421 void OutlierDetectionLb::SubchannelWrapper::Uneject() {
428 void OutlierDetectionLb::SubchannelWrapper::WatchConnectivityState(
429 std::unique_ptr<ConnectivityStateWatcherInterface>
watcher) {
430 ConnectivityStateWatcherInterface* watcher_ptr =
watcher.get();
431 auto watcher_wrapper =
433 watchers_.emplace(watcher_ptr, watcher_wrapper.get());
434 wrapped_subchannel()->WatchConnectivityState(
std::move(watcher_wrapper));
437 void OutlierDetectionLb::SubchannelWrapper::CancelConnectivityStateWatch(
438 ConnectivityStateWatcherInterface*
watcher) {
441 wrapped_subchannel()->CancelConnectivityStateWatch(
it->second);
449 class OutlierDetectionLb::Picker::SubchannelCallTracker
450 :
public LoadBalancingPolicy::SubchannelCallTrackerInterface {
452 SubchannelCallTracker(
453 std::unique_ptr<LoadBalancingPolicy::SubchannelCallTrackerInterface>
454 original_subchannel_call_tracker,
455 RefCountedPtr<SubchannelState> subchannel_state)
457 std::
move(original_subchannel_call_tracker)),
460 ~SubchannelCallTracker()
override {
464 void Start()
override {
480 if (
args.status.ok()) {
489 std::unique_ptr<LoadBalancingPolicy::SubchannelCallTrackerInterface>
498 OutlierDetectionLb::Picker::Picker(OutlierDetectionLb* outlier_detection_lb,
499 RefCountedPtr<RefCountedPicker> picker,
500 bool counting_enabled)
504 "[outlier_detection_lb %p] constructed new picker %p and counting "
506 outlier_detection_lb,
this,
507 (counting_enabled ?
"enabled" :
"disabled"));
511 LoadBalancingPolicy::PickResult OutlierDetectionLb::Picker::Pick(
512 LoadBalancingPolicy::PickArgs
args) {
515 "outlier_detection picker not given any child picker"));
519 auto* complete_pick = absl::get_if<PickResult::Complete>(&
result.result);
520 if (complete_pick !=
nullptr) {
522 auto* subchannel_wrapper =
523 static_cast<SubchannelWrapper*
>(complete_pick->subchannel.get());
527 complete_pick->subchannel_call_tracker =
528 absl::make_unique<SubchannelCallTracker>(
529 std::move(complete_pick->subchannel_call_tracker),
530 subchannel_wrapper->subchannel_state());
532 complete_pick->subchannel = subchannel_wrapper->wrapped_subchannel();
541 OutlierDetectionLb::OutlierDetectionLb(
Args args)
548 OutlierDetectionLb::~OutlierDetectionLb() {
551 "[outlier_detection_lb %p] destroying outlier_detection LB policy",
557 const ServerAddress& address) {
560 return addr_str.ok() ? addr_str.value() : addr_str.status().ToString();
563 void OutlierDetectionLb::ShutdownLocked() {
573 interested_parties());
585 void OutlierDetectionLb::ResetBackoffLocked() {
589 void OutlierDetectionLb::UpdateLocked(UpdateArgs
args) {
597 if (!
config_->CountingEnabled()) {
605 p.second->RotateBucket();
607 }
else if (old_config->outlier_detection_config().interval !=
608 config_->outlier_detection_config().interval) {
620 if (
args.addresses.ok()) {
621 std::set<std::string> current_addresses;
622 for (
const ServerAddress& address : *
args.addresses) {
623 std::string address_key = MakeKeyForAddress(address);
625 if (subchannel_state ==
nullptr) {
626 subchannel_state = MakeRefCounted<SubchannelState>();
628 current_addresses.emplace(address_key);
632 if (current_addresses.find(
it->first) == current_addresses.end()) {
642 UpdateArgs update_args;
644 update_args.config =
config_->child_policy();
649 "[outlier_detection_lb %p] Updating child policy handler %p",
this,
655 void OutlierDetectionLb::MaybeUpdatePickerLocked() {
657 auto outlier_detection_picker =
658 absl::make_unique<Picker>(
this,
picker_,
config_->CountingEnabled());
661 "[outlier_detection_lb %p] updating connectivity: state=%s "
662 "status=(%s) picker=%p",
664 outlier_detection_picker.get());
671 OrphanablePtr<LoadBalancingPolicy> OutlierDetectionLb::CreateChildPolicyLocked(
674 lb_policy_args.work_serializer = work_serializer();
675 lb_policy_args.args =
args;
676 lb_policy_args.channel_control_helper =
678 OrphanablePtr<LoadBalancingPolicy> lb_policy =
679 MakeOrphanable<ChildPolicyHandler>(
std::move(lb_policy_args),
683 "[outlier_detection_lb %p] Created new child policy handler %p",
684 this, lb_policy.get());
690 interested_parties());
698 RefCountedPtr<SubchannelInterface> OutlierDetectionLb::Helper::CreateSubchannel(
702 RefCountedPtr<SubchannelState> subchannel_state;
705 subchannel_state =
it->second->Ref();
707 auto subchannel = MakeRefCounted<SubchannelWrapper>(
711 if (subchannel_state !=
nullptr) {
712 subchannel_state->AddSubchannel(
subchannel.get());
717 void OutlierDetectionLb::Helper::UpdateState(
719 std::unique_ptr<SubchannelPicker> picker) {
723 "[outlier_detection_lb %p] child connectivity state update: "
733 MakeRefCounted<RefCountedPicker>(
std::move(picker));
738 void OutlierDetectionLb::Helper::RequestReresolution() {
747 void OutlierDetectionLb::Helper::AddTraceEvent(TraceSeverity
severity,
758 OutlierDetectionLb::EjectionTimer::EjectionTimer(
769 void OutlierDetectionLb::EjectionTimer::Orphan() {
777 void OutlierDetectionLb::EjectionTimer::OnTimer(
void*
arg,
779 auto*
self =
static_cast<EjectionTimer*
>(
arg);
781 self->parent_->work_serializer()->Run(
787 std::map<SubchannelState*, double> success_rate_ejection_candidates;
788 std::map<SubchannelState*, double> failure_percentage_ejection_candidates;
789 size_t ejected_host_count = 0;
790 double success_rate_sum = 0;
794 auto* subchannel_state =
state.second.get();
797 subchannel_state->RotateBucket();
800 if (subchannel_state->ejection_time().has_value()) {
801 ++ejected_host_count;
804 subchannel_state->GetSuccessRateAndVolume();
805 if (!host_success_rate_and_volume.
has_value()) {
808 double success_rate = host_success_rate_and_volume->first;
809 uint64_t request_volume = host_success_rate_and_volume->second;
810 if (
config.success_rate_ejection.has_value()) {
811 if (request_volume >=
config.success_rate_ejection->request_volume) {
812 success_rate_ejection_candidates[subchannel_state] = success_rate;
813 success_rate_sum += success_rate;
816 if (
config.failure_percentage_ejection.has_value()) {
817 if (request_volume >=
818 config.failure_percentage_ejection->request_volume) {
819 failure_percentage_ejection_candidates[subchannel_state] =
825 if (!success_rate_ejection_candidates.empty() &&
826 success_rate_ejection_candidates.size() >=
827 config.success_rate_ejection->minimum_hosts) {
830 double mean = success_rate_sum / success_rate_ejection_candidates.size();
832 std::for_each(success_rate_ejection_candidates.begin(),
833 success_rate_ejection_candidates.end(),
834 [&variance, mean](std::pair<SubchannelState*, double>
v) {
835 variance += std::pow(v.second - mean, 2);
837 variance /= success_rate_ejection_candidates.size();
838 double stdev = std::sqrt(variance);
839 const double success_rate_stdev_factor =
840 static_cast<double>(
config.success_rate_ejection->stdev_factor) /
842 double ejection_threshold = mean - stdev * success_rate_stdev_factor;
843 for (
auto& candidate : success_rate_ejection_candidates) {
844 if (candidate.second < ejection_threshold) {
846 double current_percent = 100.0 * ejected_host_count /
847 parent_->subchannel_state_map_.size();
849 config.success_rate_ejection->enforcement_percentage &&
850 (ejected_host_count == 0 ||
851 (current_percent <
config.max_ejection_percent))) {
855 ++ejected_host_count;
861 if (!failure_percentage_ejection_candidates.empty() &&
862 failure_percentage_ejection_candidates.size() >=
863 config.failure_percentage_ejection->minimum_hosts) {
864 for (
auto& candidate : failure_percentage_ejection_candidates) {
867 if (candidate.first->ejection_time().has_value())
continue;
868 if ((100.0 - candidate.second) >
869 config.failure_percentage_ejection->threshold) {
871 double current_percent = 100.0 * ejected_host_count /
872 parent_->subchannel_state_map_.size();
874 config.failure_percentage_ejection->enforcement_percentage &&
875 (ejected_host_count == 0 ||
876 (current_percent <
config.max_ejection_percent))) {
880 ++ejected_host_count;
892 auto* subchannel_state =
state.second.get();
893 subchannel_state->MaybeUneject(
config.base_ejection_time.millis(),
894 config.max_ejection_time.millis());
908 class OutlierDetectionLbFactory :
public LoadBalancingPolicyFactory {
910 OrphanablePtr<LoadBalancingPolicy> CreateLoadBalancingPolicy(
915 const char*
name()
const override {
return kOutlierDetection; }
917 RefCountedPtr<LoadBalancingPolicy::Config> ParseLoadBalancingConfig(
920 if (json.type() == Json::Type::JSON_NULL) {
924 "field:loadBalancingPolicy error:outlier_detection policy requires "
925 "configuration. Please use loadBalancingConfig field of service "
929 std::vector<grpc_error_handle> error_list;
931 OutlierDetectionConfig outlier_detection_config;
932 auto it = json.object_value().find(
"successRateEjection");
933 if (
it != json.object_value().end()) {
934 if (
it->second.type() != Json::Type::OBJECT) {
936 "field:successRateEjection error:type must be object"));
938 OutlierDetectionConfig::SuccessRateEjection success_config;
939 const Json::Object&
object =
it->second.object_value();
941 &success_config.stdev_factor, &error_list,
944 &success_config.enforcement_percentage,
947 &success_config.minimum_hosts, &error_list,
950 &success_config.request_volume, &error_list,
952 outlier_detection_config.success_rate_ejection = success_config;
955 it = json.object_value().find(
"failurePercentageEjection");
956 if (
it != json.object_value().end()) {
957 if (
it->second.type() != Json::Type::OBJECT) {
959 "field:successRateEjection error:type must be object"));
961 OutlierDetectionConfig::FailurePercentageEjection failure_config;
962 const Json::Object&
object =
it->second.object_value();
966 &failure_config.enforcement_percentage,
969 &failure_config.minimum_hosts, &error_list,
972 &failure_config.request_volume, &error_list,
974 outlier_detection_config.failure_percentage_ejection = failure_config;
978 &outlier_detection_config.interval,
981 &outlier_detection_config.base_ejection_time,
984 json.object_value(),
"maxEjectionTime",
985 &outlier_detection_config.max_ejection_time, &error_list,
987 outlier_detection_config.max_ejection_time =
std::max(
991 &outlier_detection_config.max_ejection_percent,
993 RefCountedPtr<LoadBalancingPolicy::Config> child_policy;
994 it = json.object_value().find(
"childPolicy");
995 if (
it == json.object_value().end()) {
997 "field:childPolicy error:required field missing"));
1000 child_policy = LoadBalancingPolicyRegistry::ParseLoadBalancingConfig(
1002 if (child_policy ==
nullptr) {
1004 std::vector<grpc_error_handle> child_errors;
1006 error_list.push_back(
1010 if (!error_list.empty()) {
1012 "outlier_detection_experimental LB policy config", &error_list);
1015 return MakeRefCounted<OutlierDetectionLbConfig>(outlier_detection_config,
1032 absl::make_unique<grpc_core::OutlierDetectionLbFactory>());