72 #include "absl/container/inlined_vector.h"
73 #include "absl/memory/memory.h"
74 #include "absl/status/status.h"
75 #include "absl/status/statusor.h"
76 #include "absl/strings/str_cat.h"
77 #include "absl/strings/str_format.h"
78 #include "absl/strings/str_join.h"
79 #include "absl/strings/string_view.h"
80 #include "absl/strings/strip.h"
81 #include "absl/types/optional.h"
82 #include "absl/types/variant.h"
85 #include <grpc/byte_buffer.h>
146 #define GRPC_GRPCLB_INITIAL_CONNECT_BACKOFF_SECONDS 1
147 #define GRPC_GRPCLB_RECONNECT_BACKOFF_MULTIPLIER 1.6
148 #define GRPC_GRPCLB_RECONNECT_MAX_BACKOFF_SECONDS 120
149 #define GRPC_GRPCLB_RECONNECT_JITTER 0.2
150 #define GRPC_GRPCLB_DEFAULT_FALLBACK_TIMEOUT_MS 10000
151 #define GRPC_GRPCLB_DEFAULT_SUBCHANNEL_DELETION_DELAY_MS 10000
161 using ::grpc_event_engine::experimental::EventEngine;
164 constexpr
char kGrpclb[] =
"grpclb";
172 const char*
name()
const override {
return kGrpclb; }
185 class GrpcLb :
public LoadBalancingPolicy {
189 const char*
name()
const override {
return kGrpclb; }
191 void UpdateLocked(UpdateArgs
args)
override;
192 void ResetBackoffLocked()
override;
196 class BalancerCallState :
public InternallyRefCounted<BalancerCallState> {
198 explicit BalancerCallState(
199 RefCountedPtr<LoadBalancingPolicy> parent_grpclb_policy);
200 ~BalancerCallState()
override;
204 void Orphan()
override;
208 GrpcLbClientStats* client_stats()
const {
return client_stats_.get(); }
214 GrpcLb* grpclb_policy()
const {
218 void ScheduleNextClientLoadReportLocked();
219 void SendClientLoadReportLocked();
222 void MaybeSendClientLoadReport();
223 void MaybeSendClientLoadReportLocked();
231 void OnInitialRequestSentLocked();
232 void OnBalancerMessageReceivedLocked();
271 class SubchannelWrapper :
public DelegatingSubchannel {
273 SubchannelWrapper(RefCountedPtr<SubchannelInterface>
subchannel,
274 RefCountedPtr<GrpcLb> lb_policy,
std::string lb_token,
275 RefCountedPtr<GrpcLbClientStats> client_stats)
281 ~SubchannelWrapper()
override {
283 lb_policy_->CacheDeletedSubchannelLocked(wrapped_subchannel());
288 GrpcLbClientStats* client_stats()
const {
return client_stats_.get(); }
296 class TokenAndClientStatsAttribute
297 :
public ServerAddress::AttributeInterface {
300 RefCountedPtr<GrpcLbClientStats> client_stats)
304 std::unique_ptr<AttributeInterface>
Copy()
const override {
305 return absl::make_unique<TokenAndClientStatsAttribute>(
lb_token_,
309 int Cmp(
const AttributeInterface* other_base)
const override {
310 const TokenAndClientStatsAttribute* other =
311 static_cast<const TokenAndClientStatsAttribute*
>(other_base);
313 if (
r != 0)
return r;
323 RefCountedPtr<GrpcLbClientStats> client_stats()
const {
332 class Serverlist :
public RefCounted<Serverlist> {
335 explicit Serverlist(std::vector<GrpcLbServer> serverlist)
338 bool operator==(
const Serverlist& other)
const;
340 const std::vector<GrpcLbServer>& serverlist()
const {
return serverlist_; }
347 GrpcLbClientStats* client_stats)
const;
351 bool ContainsAllDropEntries()
const;
360 const char* ShouldDrop();
371 class Picker :
public SubchannelPicker {
373 Picker(RefCountedPtr<Serverlist> serverlist,
374 std::unique_ptr<SubchannelPicker> child_picker,
375 RefCountedPtr<GrpcLbClientStats> client_stats)
380 PickResult Pick(PickArgs
args)
override;
387 class SubchannelCallTracker :
public SubchannelCallTrackerInterface {
389 SubchannelCallTracker(
390 RefCountedPtr<GrpcLbClientStats> client_stats,
391 std::unique_ptr<SubchannelCallTrackerInterface> original_call_tracker)
395 void Start()
override {
423 class Helper :
public ChannelControlHelper {
425 explicit Helper(RefCountedPtr<GrpcLb> parent)
428 RefCountedPtr<SubchannelInterface> CreateSubchannel(
431 std::unique_ptr<SubchannelPicker> picker)
override;
432 void RequestReresolution()
override;
434 void AddTraceEvent(TraceSeverity
severity,
441 class StateWatcher :
public AsyncConnectivityStateWatcherInterface {
443 explicit StateWatcher(RefCountedPtr<GrpcLb> parent)
444 : AsyncConnectivityStateWatcherInterface(parent->work_serializer()),
452 if (
parent_->fallback_at_startup_checks_pending_ &&
457 "[grpclb %p] balancer channel in state:TRANSIENT_FAILURE (%s); "
458 "entering fallback mode",
460 parent_->fallback_at_startup_checks_pending_ =
false;
462 parent_->fallback_mode_ =
true;
463 parent_->CreateOrUpdateChildPolicyLocked();
466 parent_->CancelBalancerChannelConnectivityWatchLocked();
475 void ShutdownLocked()
override;
480 void CancelBalancerChannelConnectivityWatchLocked();
483 void MaybeEnterFallbackModeAfterStartup();
488 void StartBalancerCallLocked();
489 void StartBalancerCallRetryTimerLocked();
495 bool is_backend_from_grpclb_load_balancer);
496 OrphanablePtr<LoadBalancingPolicy> CreateChildPolicyLocked(
498 void CreateOrUpdateChildPolicyLocked();
501 void CacheDeletedSubchannelLocked(
502 RefCountedPtr<SubchannelInterface>
subchannel);
503 void StartSubchannelCacheTimerLocked();
567 std::vector<RefCountedPtr<SubchannelInterface>>>
588 if (
server.ip_size == 4) {
589 addr->len =
static_cast<socklen_t
>(
sizeof(grpc_sockaddr_in));
590 grpc_sockaddr_in*
addr4 =
reinterpret_cast<grpc_sockaddr_in*
>(&
addr->addr);
591 addr4->sin_family = GRPC_AF_INET;
593 addr4->sin_port = netorder_port;
594 }
else if (
server.ip_size == 16) {
595 addr->len =
static_cast<socklen_t
>(
sizeof(grpc_sockaddr_in6));
596 grpc_sockaddr_in6*
addr6 =
597 reinterpret_cast<grpc_sockaddr_in6*
>(&
addr->addr);
605 std::vector<std::string> entries;
615 ipport = addr_str.ok() ? *addr_str : addr_str.status().ToString();
618 ipport,
server.load_balance_token));
623 bool IsServerValid(
const GrpcLbServer&
server,
size_t idx,
bool log) {
624 if (
server.drop)
return false;
628 "Invalid port '%d' at index %" PRIuPTR
629 " of serverlist. Ignoring.",
637 "Expected IP to be 4 or 16 bytes, got %d at index %" PRIuPTR
638 " of serverlist. Ignoring",
648 GrpcLbClientStats* client_stats)
const {
649 RefCountedPtr<GrpcLbClientStats>
stats;
650 if (client_stats !=
nullptr)
stats = client_stats->Ref();
654 if (!IsServerValid(
server, i,
false))
continue;
659 const size_t lb_token_length =
strnlen(
662 if (lb_token.empty()) {
665 "Missing LB token for backend address '%s'. The empty token will "
667 addr_uri.ok() ? addr_uri->c_str()
668 : addr_uri.status().ToString().c_str());
671 std::map<const char*, std::unique_ptr<ServerAddress::AttributeInterface>>
674 absl::make_unique<TokenAndClientStatsAttribute>(
std::move(lb_token),
677 addresses.emplace_back(
addr,
nullptr,
std::move(attributes));
682 bool GrpcLb::Serverlist::ContainsAllDropEntries()
const {
685 if (!
server.drop)
return false;
690 const char* GrpcLb::Serverlist::ShouldDrop() {
694 return server.drop ?
server.load_balance_token :
nullptr;
701 GrpcLb::PickResult GrpcLb::Picker::Pick(PickArgs
args) {
703 const char* drop_token =
705 if (drop_token !=
nullptr) {
714 return PickResult::Drop(
720 auto* complete_pick = absl::get_if<PickResult::Complete>(&
result.result);
721 if (complete_pick !=
nullptr) {
722 const SubchannelWrapper* subchannel_wrapper =
723 static_cast<SubchannelWrapper*
>(complete_pick->subchannel.get());
726 GrpcLbClientStats* client_stats = subchannel_wrapper->client_stats();
727 if (client_stats !=
nullptr) {
728 complete_pick->subchannel_call_tracker =
729 absl::make_unique<SubchannelCallTracker>(
731 std::move(complete_pick->subchannel_call_tracker));
735 args.initial_metadata->Add(
739 client_stats->AddCallStarted();
745 if (!subchannel_wrapper->lb_token().empty()) {
746 char* lb_token =
static_cast<char*
>(
747 args.call_state->Alloc(subchannel_wrapper->lb_token().size() + 1));
748 strcpy(lb_token, subchannel_wrapper->lb_token().c_str());
752 complete_pick->subchannel = subchannel_wrapper->wrapped_subchannel();
761 RefCountedPtr<SubchannelInterface> GrpcLb::Helper::CreateSubchannel(
763 if (
parent_->shutting_down_)
return nullptr;
764 const TokenAndClientStatsAttribute* attribute =
765 static_cast<const TokenAndClientStatsAttribute*
>(
767 if (attribute ==
nullptr) {
769 "[grpclb %p] no TokenAndClientStatsAttribute for address %p",
770 parent_.get(), address.ToString().c_str());
774 RefCountedPtr<GrpcLbClientStats> client_stats = attribute->client_stats();
775 return MakeRefCounted<SubchannelWrapper>(
784 std::unique_ptr<SubchannelPicker> picker) {
785 if (
parent_->shutting_down_)
return;
789 parent_->MaybeEnterFallbackModeAfterStartup();
798 RefCountedPtr<Serverlist> serverlist;
800 (
parent_->serverlist_ !=
nullptr &&
801 parent_->serverlist_->ContainsAllDropEntries())) {
802 serverlist =
parent_->serverlist_;
804 RefCountedPtr<GrpcLbClientStats> client_stats;
805 if (
parent_->lb_calld_ !=
nullptr &&
806 parent_->lb_calld_->client_stats() !=
nullptr) {
807 client_stats =
parent_->lb_calld_->client_stats()->Ref();
811 "[grpclb %p helper %p] state=%s (%s) wrapping child "
812 "picker %p (serverlist=%p, client_stats=%p)",
817 parent_->channel_control_helper()->UpdateState(
823 void GrpcLb::Helper::RequestReresolution() {
824 if (
parent_->shutting_down_)
return;
829 if (
parent_->lb_calld_ ==
nullptr ||
830 !
parent_->lb_calld_->seen_initial_response()) {
831 parent_->channel_control_helper()->RequestReresolution();
836 return parent_->channel_control_helper()->GetAuthority();
839 void GrpcLb::Helper::AddTraceEvent(TraceSeverity
severity,
841 if (
parent_->shutting_down_)
return;
849 GrpcLb::BalancerCallState::BalancerCallState(
850 RefCountedPtr<LoadBalancingPolicy> parent_grpclb_policy)
851 : InternallyRefCounted<BalancerCallState>(
863 grpc_schedule_on_exec_ctx);
865 OnBalancerMessageReceived,
this, grpc_schedule_on_exec_ctx);
867 this, grpc_schedule_on_exec_ctx);
869 this, grpc_schedule_on_exec_ctx);
871 grpclb_policy()->lb_call_timeout_ == Duration::Zero()
872 ? Timestamp::InfFuture()
877 Slice::FromStaticString(
"/grpc.lb.v1.LoadBalancer/BalanceLoad").c_slice(),
878 nullptr, deadline,
nullptr);
882 grpclb_policy()->
config_->service_name().empty()
884 : grpclb_policy()->
config_->service_name().c_str(),
894 GrpcLb::BalancerCallState::~BalancerCallState() {
904 void GrpcLb::BalancerCallState::Orphan() {
920 void GrpcLb::BalancerCallState::StartQuery() {
951 static_cast<size_t>(
op -
ops),
996 void GrpcLb::BalancerCallState::ScheduleNextClientLoadReportLocked() {
999 ApplicationCallbackExecCtx callback_exec_ctx;
1001 MaybeSendClientLoadReport();
1005 void GrpcLb::BalancerCallState::MaybeSendClientLoadReport() {
1006 grpclb_policy()->work_serializer()->Run(
1010 void GrpcLb::BalancerCallState::MaybeSendClientLoadReportLocked() {
1012 if (
this != grpclb_policy()->
lb_calld_.get()) {
1020 SendClientLoadReportLocked();
1026 void GrpcLb::BalancerCallState::SendClientLoadReportLocked() {
1045 ScheduleNextClientLoadReportLocked();
1070 "[grpclb %p] lb_calld=%p call_error=%d sending client load report",
1076 void GrpcLb::BalancerCallState::ClientLoadReportDone(
void*
arg,
1078 BalancerCallState* lb_calld =
static_cast<BalancerCallState*
>(
arg);
1080 lb_calld->grpclb_policy()->work_serializer()->Run(
1081 [lb_calld,
error]() { lb_calld->ClientLoadReportDoneLocked(
error); },
1085 void GrpcLb::BalancerCallState::ClientLoadReportDoneLocked(
1094 ScheduleNextClientLoadReportLocked();
1097 void GrpcLb::BalancerCallState::OnInitialRequestSent(
1099 BalancerCallState* lb_calld =
static_cast<BalancerCallState*
>(
arg);
1100 lb_calld->grpclb_policy()->work_serializer()->Run(
1101 [lb_calld]() { lb_calld->OnInitialRequestSentLocked(); },
DEBUG_LOCATION);
1104 void GrpcLb::BalancerCallState::OnInitialRequestSentLocked() {
1110 SendClientLoadReportLocked();
1116 void GrpcLb::BalancerCallState::OnBalancerMessageReceived(
1118 BalancerCallState* lb_calld =
static_cast<BalancerCallState*
>(
arg);
1119 lb_calld->grpclb_policy()->work_serializer()->Run(
1120 [lb_calld]() { lb_calld->OnBalancerMessageReceivedLocked(); },
1124 void GrpcLb::BalancerCallState::OnBalancerMessageReceivedLocked() {
1126 if (
this != grpclb_policy()->
lb_calld_.get() ||
1141 char* response_slice_str =
1144 "[grpclb %p] lb_calld=%p: Invalid LB response received: '%s'. "
1146 grpclb_policy(),
this, response_slice_str);
1151 if (
response.client_stats_report_interval != Duration::Zero()) {
1156 "[grpclb %p] lb_calld=%p: Received initial LB response "
1157 "message; client load reporting interval = %" PRId64
1159 grpclb_policy(),
this,
1164 "[grpclb %p] lb_calld=%p: Received initial LB response "
1165 "message; client load reporting NOT enabled",
1166 grpclb_policy(),
this);
1173 auto serverlist_wrapper =
1177 "[grpclb %p] lb_calld=%p: Serverlist with %" PRIuPTR
1178 " servers received:\n%s",
1179 grpclb_policy(),
this,
1180 serverlist_wrapper->serverlist().size(),
1181 serverlist_wrapper->AsText().c_str());
1191 ScheduleNextClientLoadReportLocked();
1195 *grpclb_policy()->
serverlist_ == *serverlist_wrapper) {
1198 "[grpclb %p] lb_calld=%p: Incoming server list identical "
1199 "to current, ignoring.",
1200 grpclb_policy(),
this);
1224 "[grpclb %p] Received response from balancer; exiting "
1227 grpclb_policy()->fallback_mode_ =
false;
1230 grpclb_policy()->fallback_at_startup_checks_pending_ =
false;
1232 grpclb_policy()->CancelBalancerChannelConnectivityWatchLocked();
1237 grpclb_policy()->serverlist_ =
std::move(serverlist_wrapper);
1238 grpclb_policy()->CreateOrUpdateChildPolicyLocked();
1245 "[grpclb %p] Entering fallback mode as requested by balancer",
1248 grpclb_policy()->fallback_at_startup_checks_pending_ =
false;
1250 grpclb_policy()->CancelBalancerChannelConnectivityWatchLocked();
1252 grpclb_policy()->fallback_mode_ =
true;
1253 grpclb_policy()->CreateOrUpdateChildPolicyLocked();
1257 grpclb_policy()->serverlist_.reset();
1281 void GrpcLb::BalancerCallState::OnBalancerStatusReceived(
1283 BalancerCallState* lb_calld =
static_cast<BalancerCallState*
>(
arg);
1285 lb_calld->grpclb_policy()->work_serializer()->Run(
1286 [lb_calld,
error]() { lb_calld->OnBalancerStatusReceivedLocked(
error); },
1290 void GrpcLb::BalancerCallState::OnBalancerStatusReceivedLocked(
1296 "[grpclb %p] lb_calld=%p: Status from LB server received. "
1297 "Status = %d, details = '%s', (lb_call: %p), error '%s'",
1306 if (
this == grpclb_policy()->
lb_calld_.get()) {
1310 grpclb_policy()->lb_calld_.reset();
1314 "[grpclb %p] Balancer call finished without receiving "
1315 "serverlist; entering fallback mode",
1317 grpclb_policy()->fallback_at_startup_checks_pending_ =
false;
1319 grpclb_policy()->CancelBalancerChannelConnectivityWatchLocked();
1320 grpclb_policy()->fallback_mode_ =
true;
1321 grpclb_policy()->CreateOrUpdateChildPolicyLocked();
1324 grpclb_policy()->MaybeEnterFallbackModeAfterStartup();
1327 grpclb_policy()->channel_control_helper()->RequestReresolution();
1331 grpclb_policy()->lb_call_backoff_.Reset();
1332 grpclb_policy()->StartBalancerCallLocked();
1336 grpclb_policy()->StartBalancerCallRetryTimerLocked();
1349 if (addresses !=
nullptr)
return *addresses;
1361 FakeResolverResponseGenerator* response_generator,
1364 static const char* args_to_remove[] = {
1400 RefCountedPtr<grpc_channel_credentials> creds_sans_call_creds =
1402 GPR_ASSERT(creds_sans_call_creds !=
nullptr);
1407 FakeResolverResponseGenerator::MakeChannelArg(response_generator),
1420 args_to_add.
size());
1453 {GRPC_GRPCLB_DEFAULT_FALLBACK_TIMEOUT_MS, 0, INT_MAX}))),
1457 {GRPC_GRPCLB_DEFAULT_SUBCHANNEL_DELETION_DELAY_MS, 0,
1461 "[grpclb %p] Will use '%s' as the server name for LB request.",
1466 grpc_schedule_on_exec_ctx);
1468 grpc_schedule_on_exec_ctx);
1475 void GrpcLb::ShutdownLocked() {
1489 CancelBalancerChannelConnectivityWatchLocked();
1493 interested_parties());
1502 channelz::ChannelNode* child_channelz_node =
1516 void GrpcLb::ResetBackoffLocked() {
1525 void GrpcLb::UpdateLocked(UpdateArgs
args) {
1526 const bool is_initial_update =
lb_channel_ ==
nullptr;
1534 address = address.WithAttribute(
1536 absl::make_unique<TokenAndClientStatsAttribute>(
"",
nullptr));
1541 UpdateBalancerChannelLocked(*
args.args);
1543 if (
child_policy_ !=
nullptr) CreateOrUpdateChildPolicyLocked();
1546 if (is_initial_update) {
1555 ClientChannel* client_channel =
1556 ClientChannel::GetFromChannel(Channel::FromC(
lb_channel_));
1560 client_channel->AddConnectivityWatcher(
1562 OrphanablePtr<AsyncConnectivityStateWatcherInterface>(
watcher_));
1564 StartBalancerCallLocked();
1598 channelz::ChannelNode* child_channelz_node =
1600 channelz::ChannelNode* parent_channelz_node =
1601 grpc_channel_args_find_pointer<channelz::ChannelNode>(
1603 if (child_channelz_node !=
nullptr && parent_channelz_node !=
nullptr) {
1604 parent_channelz_node->AddChildChannel(child_channelz_node->uuid());
1612 result.args = lb_channel_args;
1616 void GrpcLb::CancelBalancerChannelConnectivityWatchLocked() {
1617 ClientChannel* client_channel =
1618 ClientChannel::GetFromChannel(Channel::FromC(
lb_channel_));
1620 client_channel->RemoveConnectivityWatcher(
watcher_);
1627 void GrpcLb::StartBalancerCallLocked() {
1635 "[grpclb %p] Query for backends (lb_channel: %p, lb_calld: %p)",
1641 void GrpcLb::StartBalancerCallRetryTimerLocked() {
1644 gpr_log(
GPR_INFO,
"[grpclb %p] Connection to LB server lost...",
this);
1646 if (
timeout > Duration::Zero()) {
1647 gpr_log(
GPR_INFO,
"[grpclb %p] ... retry_timer_active in %" PRId64
"ms.",
1664 GrpcLb* grpclb_policy =
static_cast<GrpcLb*
>(
arg);
1666 grpclb_policy->work_serializer()->Run(
1667 [grpclb_policy,
error]() {
1668 grpclb_policy->OnBalancerCallRetryTimerLocked(
error);
1679 StartBalancerCallLocked();
1689 void GrpcLb::MaybeEnterFallbackModeAfterStartup() {
1699 "[grpclb %p] lost contact with balancer and backends from "
1700 "most recent serverlist; entering fallback mode",
1703 CreateOrUpdateChildPolicyLocked();
1708 GrpcLb* grpclb_policy =
static_cast<GrpcLb*
>(
arg);
1710 grpclb_policy->work_serializer()->Run(
1711 [grpclb_policy,
error]() { grpclb_policy->OnFallbackTimerLocked(
error); },
1721 "[grpclb %p] No response from balancer after fallback timeout; "
1722 "entering fallback mode",
1725 CancelBalancerChannelConnectivityWatchLocked();
1727 CreateOrUpdateChildPolicyLocked();
1738 bool is_backend_from_grpclb_load_balancer) {
1742 is_backend_from_grpclb_load_balancer));
1743 if (is_backend_from_grpclb_load_balancer) {
1748 args_to_add.
size());
1751 OrphanablePtr<LoadBalancingPolicy> GrpcLb::CreateChildPolicyLocked(
1754 lb_policy_args.work_serializer = work_serializer();
1755 lb_policy_args.args =
args;
1756 lb_policy_args.channel_control_helper = absl::make_unique<Helper>(
Ref());
1757 OrphanablePtr<LoadBalancingPolicy> lb_policy =
1758 MakeOrphanable<ChildPolicyHandler>(
std::move(lb_policy_args),
1761 gpr_log(
GPR_INFO,
"[grpclb %p] Created new child policy handler (%p)",
this,
1768 interested_parties());
1772 void GrpcLb::CreateOrUpdateChildPolicyLocked() {
1775 UpdateArgs update_args;
1776 bool is_backend_from_grpclb_load_balancer =
false;
1786 "grpclb in fallback mode without any balancer addresses: ",
1790 update_args.addresses =
serverlist_->GetServerAddressList(
1792 is_backend_from_grpclb_load_balancer =
true;
1795 CreateChildPolicyArgsLocked(is_backend_from_grpclb_load_balancer);
1797 update_args.config =
config_->child_policy();
1804 gpr_log(
GPR_INFO,
"[grpclb %p] Updating child policy handler %p",
this,
1814 void GrpcLb::CacheDeletedSubchannelLocked(
1815 RefCountedPtr<SubchannelInterface>
subchannel) {
1821 StartSubchannelCacheTimerLocked();
1825 void GrpcLb::StartSubchannelCacheTimerLocked() {
1832 auto*
self =
static_cast<GrpcLb*
>(
arg);
1834 self->work_serializer()->Run(
1835 [
self,
error]() {
self->GrpcLb::OnSubchannelCacheTimerLocked(
error); },
1845 "[grpclb %p] removing %" PRIuPTR
" subchannels from cache",
1846 this,
it->second.size());
1851 StartSubchannelCacheTimerLocked();
1864 class GrpcLbFactory :
public LoadBalancingPolicyFactory {
1866 OrphanablePtr<LoadBalancingPolicy> CreateLoadBalancingPolicy(
1871 const char*
name()
const override {
return kGrpclb; }
1873 RefCountedPtr<LoadBalancingPolicy::Config> ParseLoadBalancingConfig(
1876 if (json.type() == Json::Type::JSON_NULL) {
1877 return MakeRefCounted<GrpcLbConfig>(
nullptr,
"");
1879 std::vector<grpc_error_handle> error_list;
1880 Json child_policy_config_json_tmp;
1881 const Json* child_policy_config_json;
1883 auto it = json.object_value().find(
"serviceName");
1884 if (
it != json.object_value().end()) {
1885 const Json& service_name_json =
it->second;
1886 if (service_name_json.type() != Json::Type::STRING) {
1888 "field:serviceName error:type should be string"));
1890 service_name = service_name_json.string_value();
1893 it = json.object_value().find(
"childPolicy");
1894 if (
it == json.object_value().end()) {
1895 child_policy_config_json_tmp = Json::Array{Json::Object{
1896 {
"round_robin", Json::Object()},
1898 child_policy_config_json = &child_policy_config_json_tmp;
1900 child_policy_config_json = &
it->second;
1903 RefCountedPtr<LoadBalancingPolicy::Config> child_policy_config =
1904 LoadBalancingPolicyRegistry::ParseLoadBalancingConfig(
1907 std::vector<grpc_error_handle> child_errors;
1909 error_list.push_back(
1912 if (error_list.empty()) {
1913 return MakeRefCounted<GrpcLbConfig>(
std::move(child_policy_config),
1933 absl::make_unique<grpc_core::GrpcLbFactory>());
1940 builder->channel_init()->RegisterStage(
1950 builder->PrependFilter(&grpc_client_load_reporting_filter);