Go to the documentation of this file.
31 #include "absl/container/inlined_vector.h"
32 #include "absl/memory/memory.h"
33 #include "absl/status/status.h"
34 #include "absl/status/statusor.h"
35 #include "absl/strings/cord.h"
36 #include "absl/strings/numbers.h"
37 #include "absl/strings/str_cat.h"
38 #include "absl/strings/str_join.h"
39 #include "absl/strings/string_view.h"
40 #include "absl/types/optional.h"
41 #include "absl/types/variant.h"
95 #define GRPC_ARG_HEALTH_CHECK_SERVICE_NAME \
96 "grpc.internal.health_check_service_name"
100 using internal::ClientChannelMethodParsedConfig;
159 return closures.
size() > 0;
219 bool queued_pending_resolver_result_
270 void* ClientChannelArgCopy(
void* p) {
return p; }
271 void ClientChannelArgDestroy(
void* ) {}
272 int ClientChannelArgCmp(
void* p,
void* q) {
return QsortCompare(p, q); }
274 ClientChannelArgCopy, ClientChannelArgDestroy, ClientChannelArgCmp};
277 void* ServiceConfigObjArgCopy(
void* p) {
278 auto* service_config =
static_cast<ServiceConfig*
>(
p);
279 service_config->Ref().release();
282 void ServiceConfigObjArgDestroy(
void* p) {
283 auto* service_config =
static_cast<ServiceConfig*
>(
p);
284 service_config->Unref();
286 int ServiceConfigObjArgCmp(
void* p,
void* q) {
return QsortCompare(p, q); }
288 ServiceConfigObjArgCopy, ServiceConfigObjArgDestroy,
289 ServiceConfigObjArgCmp};
291 class DynamicTerminationFilter {
301 new (
elem->channel_data) DynamicTerminationFilter(
args->channel_args);
306 auto* chand =
static_cast<DynamicTerminationFilter*
>(
elem->channel_data);
307 chand->~DynamicTerminationFilter();
324 class DynamicTerminationFilter::CallData {
328 new (
elem->call_data) CallData(*
args);
335 auto* calld =
static_cast<CallData*
>(
elem->call_data);
336 RefCountedPtr<SubchannelCall> subchannel_call;
338 subchannel_call = calld->lb_call_->subchannel_call();
342 subchannel_call->SetAfterCallStackDestroy(then_schedule_closure);
349 static void StartTransportStreamOpBatch(
351 auto* calld =
static_cast<CallData*
>(
elem->call_data);
352 calld->lb_call_->StartTransportStreamOpBatch(
batch);
357 auto* calld =
static_cast<CallData*
>(
elem->call_data);
358 auto* chand =
static_cast<DynamicTerminationFilter*
>(
elem->channel_data);
359 ClientChannel* client_channel = chand->chand_;
361 calld->call_context_, calld->path_,
363 calld->arena_, calld->call_combiner_};
364 auto* service_config_call_data =
365 static_cast<ClientChannelServiceConfigCallData*
>(
367 calld->lb_call_ = client_channel->CreateLoadBalancedCall(
368 args, pollent,
nullptr,
369 service_config_call_data->call_dispatch_controller(),
373 "chand=%p dynamic_termination_calld=%p: create lb_call=%p", chand,
374 client_channel, calld->lb_call_.get());
396 OrphanablePtr<ClientChannel::LoadBalancedCall>
lb_call_;
400 DynamicTerminationFilter::CallData::StartTransportStreamOpBatch,
403 sizeof(DynamicTerminationFilter::CallData),
405 DynamicTerminationFilter::CallData::SetPollent,
407 sizeof(DynamicTerminationFilter),
412 "dynamic_filter_termination",
460 ?
"SubchannelWrapper"
467 "chand=%p: creating subchannel wrapper %p for subchannel %p",
472 auto* subchannel_node =
subchannel_->channelz_node();
473 if (subchannel_node !=
nullptr) {
475 if (
it ==
chand_->subchannel_refcount_map_.end()) {
483 chand_->subchannel_wrappers_.insert(
this);
489 "chand=%p: destroying subchannel wrapper %p for subchannel %p",
492 chand_->subchannel_wrappers_.erase(
this);
494 auto* subchannel_node =
subchannel_->channelz_node();
495 if (subchannel_node !=
nullptr) {
499 if (
it->second == 0) {
501 subchannel_node->uuid());
502 chand_->subchannel_refcount_map_.erase(
it);
510 std::unique_ptr<ConnectivityStateWatcherInterface>
watcher)
override
512 auto& watcher_wrapper = watcher_map_[
watcher.get()];
528 watcher_map_.erase(
it);
541 std::unique_ptr<InternalSubchannelDataWatcherInterface> internal_watcher(
544 internal_watcher->SetSubchannel(
subchannel_.get());
545 data_watchers_.push_back(
std::move(internal_watcher));
553 subchannel_->ThrottleKeepaliveTime(new_keepalive_time);
574 std::unique_ptr<SubchannelInterface::ConnectivityStateWatcherInterface>
580 auto* parent =
parent_.release();
581 parent->chand_->work_serializer_->Run(
583 *
parent_->chand_->work_serializer_) {
584 parent->Unref(DEBUG_LOCATION,
"WatcherWrapper");
592 "chand=%p: connectivity change for subchannel wrapper %p "
593 "subchannel %p; hopping into work_serializer",
597 parent_->chand_->work_serializer_->Run(
599 *
parent_->chand_->work_serializer_) {
600 ApplyUpdateInControlPlaneWorkSerializer();
610 return watcher->interested_parties();
624 "chand=%p: processing connectivity change in work serializer "
625 "for subchannel wrapper %p subchannel %p "
634 int new_keepalive_time = -1;
636 &new_keepalive_time)) {
637 if (new_keepalive_time >
parent_->chand_->keepalive_time_) {
638 parent_->chand_->keepalive_time_ = new_keepalive_time;
646 for (
auto* subchannel_wrapper :
647 parent_->chand_->subchannel_wrappers_) {
648 subchannel_wrapper->ThrottleKeepaliveTime(new_keepalive_time);
672 std::unique_ptr<SubchannelInterface::ConnectivityStateWatcherInterface>
686 std::map<ConnectivityStateWatcherInterface*, WatcherWrapper*> watcher_map_
688 std::vector<std::unique_ptr<InternalSubchannelDataWatcherInterface>>
702 initial_state_(*
state),
705 watcher_timer_init_(watcher_timer_init) {
712 GPR_ASSERT(chand->external_watchers_[on_complete] ==
nullptr);
714 chand->external_watchers_[on_complete] =
728 chand_->interested_parties_);
730 "ExternalConnectivityWatcher");
740 auto it = chand->external_watchers_.find(on_complete);
741 if (
it != chand->external_watchers_.end()) {
743 chand->external_watchers_.erase(
it);
754 if (!
done_.compare_exchange_strong(
done,
true, std::memory_order_relaxed,
755 std::memory_order_relaxed)) {
768 chand_->work_serializer_->Run(
770 RemoveWatcherLocked();
778 if (!
done_.compare_exchange_strong(
done,
true, std::memory_order_relaxed,
779 std::memory_order_relaxed)) {
784 chand_->work_serializer_->Run(
786 RemoveWatcherLocked();
794 chand_->state_tracker_.AddWatcher(
799 chand_->state_tracker_.RemoveWatcher(
this);
812 initial_state_(initial_state),
815 chand_->work_serializer_->Run(
845 chand_->work_serializer_->Run(
847 RemoveWatcherLocked();
857 "ConnectivityWatcherRemover");
878 "ClientChannelControlHelper");
884 if (
chand_->resolver_ ==
nullptr)
return nullptr;
889 if (health_check_service_name_arg !=
nullptr) {
892 if (!inhibit_health_checking) {
893 health_check_service_name = health_check_service_name_arg;
906 chand_->subchannel_pool_.get()),
909 const char* default_authority =
912 if (address.args() !=
nullptr) {
913 for (
size_t j = 0; j < address.args()->num_args; ++j) {
920 if (default_authority !=
nullptr)
continue;
921 default_authority =
arg.
value.string;
928 if (default_authority ==
nullptr) {
933 const_cast<char*
>(
chand_->default_authority_.c_str())));
940 chand_->client_channel_factory_->CreateSubchannel(address.address(),
947 return MakeRefCounted<SubchannelWrapper>(
953 std::unique_ptr<LoadBalancingPolicy::SubchannelPicker> picker)
override
955 if (
chand_->resolver_ ==
nullptr)
return;
959 :
" (ignoring -- channel shutting down)";
960 gpr_log(
GPR_INFO,
"chand=%p: update: state=%s status=(%s) picker=%p%s",
962 picker.get(), extra);
973 if (
chand_->resolver_ ==
nullptr)
return;
977 chand_->resolver_->RequestReresolutionLocked();
981 return chand_->default_authority_;
986 if (
chand_->resolver_ ==
nullptr)
return;
987 if (
chand_->channelz_node_ !=
nullptr) {
988 chand_->channelz_node_->AddTraceEvent(
1036 if (use_local_subchannel_pool) {
1037 return MakeRefCounted<LocalSubchannelPool>();
1043 return grpc_channel_args_find_pointer<channelz::ChannelNode>(
1059 internal::ClientChannelServiceConfigParser::ParserIndex()),
1062 subchannel_pool_(GetSubchannelPool(
args->channel_args)) {
1064 gpr_log(
GPR_INFO,
"chand=%p: creating client_channel for channel stack %p",
1072 "Missing client channel factory in args for client channel filter");
1079 if (service_config_json ==
nullptr) service_config_json =
"{}";
1092 "target URI channel arg missing or wrong type in client channel "
1097 char* proxy_name =
nullptr;
1101 if (proxy_name !=
nullptr) {
1117 new_args !=
nullptr ? new_args :
args->channel_args, &arg_to_remove, 1);
1124 const char* default_authority =
1126 if (default_authority ==
nullptr) {
1154 bool is_transparent_retry) {
1156 this,
args, pollent, on_call_destruction_complete,
1157 call_dispatch_controller, is_transparent_retry));
1171 const char* policy_name =
nullptr;
1177 bool requires_config =
false;
1178 if (policy_name !=
nullptr &&
1180 policy_name, &requires_config) ||
1182 if (requires_config) {
1184 "LB policy: %s passed through channel_args must not "
1185 "require a config. Using pick_first instead.",
1189 "LB policy: %s passed through channel_args does not exist. "
1190 "Using pick_first instead.",
1193 policy_name =
"pick_first";
1198 if (policy_name ==
nullptr) policy_name =
"pick_first";
1217 return lb_policy_config;
1237 std::vector<const char*> trace_strings;
1238 const bool resolution_contains_addresses =
1240 if (!resolution_contains_addresses &&
1241 previous_resolution_contained_addresses_) {
1242 trace_strings.push_back(
"Address list became empty");
1243 }
else if (resolution_contains_addresses &&
1244 !previous_resolution_contained_addresses_) {
1245 trace_strings.push_back(
"Address list became non-empty");
1247 previous_resolution_contained_addresses_ = resolution_contains_addresses;
1249 if (!
result.service_config.ok()) {
1250 service_config_error_string_storage =
1251 result.service_config.status().ToString();
1252 trace_strings.push_back(service_config_error_string_storage.c_str());
1257 if (!
result.service_config.ok()) {
1259 gpr_log(
GPR_INFO,
"chand=%p: resolver returned service config error: %s",
1260 this,
result.service_config.status().ToString().c_str());
1264 if (saved_service_config_ !=
nullptr) {
1267 "chand=%p: resolver returned invalid service config. "
1268 "Continuing to use previous service config.",
1271 service_config = saved_service_config_;
1272 config_selector = saved_config_selector_;
1278 trace_strings.push_back(
"no valid service config");
1280 }
else if (*
result.service_config ==
nullptr) {
1284 "chand=%p: resolver returned no service config. Using default "
1285 "service config for channel.",
1297 if (service_config !=
nullptr) {
1301 service_config->GetGlobalParsedConfig(
1305 ChooseLbPolicy(
result, parsed_service_config);
1307 const bool service_config_changed =
1308 saved_service_config_ ==
nullptr ||
1309 service_config->json_string() != saved_service_config_->json_string();
1312 saved_config_selector_.get(), config_selector.
get());
1314 if (service_config_changed || config_selector_changed) {
1318 lb_policy_config->name());
1326 if (service_config_changed || config_selector_changed) {
1334 trace_strings.push_back(
"Service config changed");
1338 if (!trace_strings.empty()) {
1362 resolver_transient_failure_error_ =
status;
1369 if (calld->CheckResolutionLocked(
elem, &
error)) {
1378 absl::make_unique<LoadBalancingPolicy::TransientFailurePicker>(
status));
1393 if (health_check_service_name.
has_value()) {
1396 const_cast<char*
>(health_check_service_name->c_str())));
1403 result.args, &arg_to_remove, 1, args_to_add.
data(), args_to_add.
size());
1422 absl::make_unique<ClientChannelControlHelper>(
this);
1425 MakeOrphanable<ChildPolicyHandler>(
std::move(lb_policy_args),
1439 call->next = resolver_queued_calls_;
1440 resolver_queued_calls_ =
call;
1452 call = &(*call)->next) {
1453 if (*
call == to_remove) {
1463 std::string service_config_json(service_config->json_string());
1466 "chand=%p: resolver returned updated service config: \"%s\"",
this,
1467 service_config_json.c_str());
1470 saved_service_config_ =
std::move(service_config);
1474 info_lb_policy_name_ =
std::move(lb_policy_name);
1475 info_service_config_json_ =
std::move(service_config_json);
1478 saved_config_selector_ =
std::move(config_selector);
1481 saved_config_selector_.get());
1492 saved_config_selector_.get());
1494 if (config_selector ==
nullptr) {
1496 MakeRefCounted<DefaultConfigSelector>(saved_service_config_);
1501 &kClientChannelArgPointerVtable),
1504 &kServiceConfigObjArgPointerVtable),
1508 new_args = config_selector->ModifyChannelArgs(new_args);
1509 bool enable_retries =
1513 std::vector<const grpc_channel_filter*> filters =
1514 config_selector->GetFilters();
1515 if (enable_retries) {
1532 received_service_config_data_ =
true;
1535 config_selector_.swap(config_selector);
1536 dynamic_filters_.swap(dynamic_filters);
1552 if (calld->CheckResolutionLocked(
elem, &
error)) {
1573 absl::make_unique<LoadBalancingPolicy::QueuePicker>(
nullptr));
1602 std::unique_ptr<LoadBalancingPolicy::SubchannelPicker> picker) {
1605 saved_service_config_.reset();
1606 saved_config_selector_.reset();
1615 received_service_config_data_ =
false;
1617 config_selector_to_unref =
std::move(config_selector_);
1618 dynamic_filters_to_unref =
std::move(dynamic_filters_);
1626 channelz::ChannelTrace::Severity::Info,
1650 if (
call->lb_call->PickSubchannelLocked(&
error)) {
1661 template <
typename T>
1668 auto* complete_pick =
1669 absl::get_if<LoadBalancingPolicy::PickResult::Complete>(&
result->result);
1670 if (complete_pick !=
nullptr) {
1671 return complete_func(complete_pick);
1674 absl::get_if<LoadBalancingPolicy::PickResult::Queue>(&
result->result);
1675 if (queue_pick !=
nullptr) {
1676 return queue_func(queue_pick);
1679 absl::get_if<LoadBalancingPolicy::PickResult::Fail>(&
result->result);
1680 if (fail_pick !=
nullptr) {
1681 return fail_func(fail_pick);
1684 absl::get_if<LoadBalancingPolicy::PickResult::Drop>(&
result->result);
1686 return drop_func(drop_pick);
1700 return HandlePickResult<grpc_error_handle>(
1709 connected_subchannel->Ping(
op->send_ping.on_initiate,
1710 op->send_ping.on_ack);
1729 if (
op->start_connectivity_watch !=
nullptr) {
1730 state_tracker_.AddWatcher(
op->start_connectivity_watch_state,
1733 if (
op->stop_connectivity_watch !=
nullptr) {
1734 state_tracker_.RemoveWatcher(
op->stop_connectivity_watch);
1737 if (
op->send_ping.on_initiate !=
nullptr ||
op->send_ping.on_ack !=
nullptr) {
1744 op->bind_pollset =
nullptr;
1745 op->send_ping.on_initiate =
nullptr;
1746 op->send_ping.on_ack =
nullptr;
1749 if (
op->reset_connect_backoff) {
1768 "channel entering IDLE",
nullptr);
1774 disconnect_error_ =
op->disconnect_with_error;
1777 absl::make_unique<LoadBalancingPolicy::TransientFailurePicker>(
1790 if (
op->bind_pollset !=
nullptr) {
1797 chand->StartTransportOpLocked(op);
1811 gpr_strdup(chand->info_service_config_json_.c_str());
1818 call->next = lb_queued_calls_;
1819 lb_queued_calls_ =
call;
1831 call = &(*call)->next) {
1832 if (*
call == to_remove) {
1922 dynamic_call->SetAfterCallStackDestroy(then_schedule_closure);
1936 gpr_log(
GPR_INFO,
"chand=%p calld=%p: batch started from above: %s", chand,
1949 RecvTrailingMetadataReadyForConfigSelectorCommitCallback,
1959 gpr_log(
GPR_INFO,
"chand=%p calld=%p: starting batch on dynamic_call=%p",
1990 gpr_log(
GPR_INFO,
"chand=%p calld=%p: recording cancel_error=%s", chand,
1995 NoYieldCallCombiner);
2009 "chand=%p calld=%p: grabbing resolution mutex to apply service "
2018 "chand=%p calld=%p: saved batch, yielding call combiner", chand,
2022 "batch does not include send_initial_metadata");
2054 const size_t idx = GetBatchIndex(
batch);
2057 "chand=%p calld=%p: adding pending batch at index %" PRIuPTR, chand,
2079 YieldCallCombinerPredicate yield_call_combiner_predicate) {
2082 size_t num_batches = 0;
2087 "chand=%p calld=%p: failing %" PRIuPTR
" pending batches: %s",
2088 elem->channel_data,
this, num_batches,
2094 if (
batch !=
nullptr) {
2097 FailPendingBatchInCallCombiner,
batch,
2098 grpc_schedule_on_exec_ctx);
2100 "PendingBatchesFail");
2104 if (yield_call_combiner_predicate(closures)) {
2129 size_t num_batches = 0;
2134 "chand=%p calld=%p: starting %" PRIuPTR
2135 " pending batches on dynamic_call=%p",
2136 chand,
this, num_batches, dynamic_call_.get());
2141 if (
batch !=
nullptr) {
2144 ResumePendingBatchInCallCombiner,
batch,
nullptr);
2146 "resuming pending batch from client channel call");
2166 grpc_schedule_on_exec_ctx);
2167 calld->call_combiner_->SetNotifyOnCancel(&
closure_);
2173 auto* chand =
static_cast<ClientChannel*
>(
self->elem_->channel_data);
2174 auto* calld =
static_cast<CallData*
>(
self->elem_->call_data);
2179 "chand=%p calld=%p: cancelling resolver queued pick: "
2180 "error=%s self=%p calld->resolver_pick_canceller=%p",
2182 calld->resolver_call_canceller_);
2184 if (calld->resolver_call_canceller_ ==
self &&
2187 calld->MaybeRemoveCallFromResolverQueuedCallsLocked(
self->elem_);
2190 YieldCallCombinerIfPendingBatchesFound);
2201 void ClientChannel::CallData::MaybeRemoveCallFromResolverQueuedCallsLocked(
2203 if (!queued_pending_resolver_result_)
return;
2207 "chand=%p calld=%p: removing from resolver queued picks list",
2210 chand->RemoveResolverQueuedCall(&resolver_queued_call_,
pollent_);
2211 queued_pending_resolver_result_ =
false;
2213 resolver_call_canceller_ =
nullptr;
2216 void ClientChannel::CallData::MaybeAddCallToResolverQueuedCallsLocked(
2218 if (queued_pending_resolver_result_)
return;
2221 gpr_log(
GPR_INFO,
"chand=%p calld=%p: adding to resolver queued picks list",
2224 queued_pending_resolver_result_ =
true;
2225 resolver_queued_call_.elem =
elem;
2226 chand->AddResolverQueuedCall(&resolver_queued_call_,
pollent_);
2228 resolver_call_canceller_ =
new ResolverQueuedCallCanceller(
elem);
2235 gpr_log(
GPR_INFO,
"chand=%p calld=%p: applying service config to call",
2238 ConfigSelector* config_selector = chand->config_selector_.get();
2239 if (config_selector !=
nullptr) {
2241 ConfigSelector::CallConfig call_config =
2242 config_selector->GetCallConfig({&
path_, initial_metadata,
arena_});
2249 auto* service_config_call_data =
2250 arena_->New<ClientChannelServiceConfigCallData>(
2251 std::move(call_config.service_config), call_config.method_configs,
2255 auto* method_params =
static_cast<ClientChannelMethodParsedConfig*
>(
2256 service_config_call_data->GetMethodParsedConfig(
2257 chand->service_config_parser_index_));
2258 if (method_params !=
nullptr) {
2261 if (chand->deadline_checking_enabled_ &&
2265 method_params->timeout();
2273 uint32_t* send_initial_metadata_flags =
2275 ->payload->send_initial_metadata.send_initial_metadata_flags;
2276 if (method_params->wait_for_ready().has_value() &&
2277 !(*send_initial_metadata_flags &
2279 if (method_params->wait_for_ready().value()) {
2287 dynamic_filters_ = chand->dynamic_filters_;
2297 auto* calld =
static_cast<CallData*
>(
elem->call_data);
2298 auto* service_config_call_data =
2299 static_cast<ClientChannelServiceConfigCallData*
>(
2303 "chand=%p calld=%p: got recv_trailing_metadata_ready: error=%s "
2304 "service_config_call_data=%p",
2306 service_config_call_data);
2308 if (service_config_call_data !=
nullptr) {
2309 service_config_call_data->call_dispatch_controller()->Commit();
2327 CallData* calld =
static_cast<CallData*
>(
elem->call_data);
2331 "chand=%p calld=%p: error applying config to call: error=%s",
2337 calld->CreateDynamicCall(
elem);
2345 bool resolution_complete;
2348 resolution_complete = calld->CheckResolutionLocked(
elem, &
error);
2350 if (resolution_complete) {
2362 gpr_log(
GPR_INFO,
"chand=%p calld=%p: triggering exit idle", chand,
this);
2377 chand->CheckConnectivityState(true);
2378 GRPC_CHANNEL_STACK_UNREF(chand->owning_stack_,
2379 "CheckResolutionLocked");
2391 const uint32_t send_initial_metadata_flags =
2395 if (
GPR_UNLIKELY(!chand->received_service_config_data_)) {
2398 absl::Status resolver_error = chand->resolver_transient_failure_error_;
2399 if (!resolver_error.
ok() && (send_initial_metadata_flags &
2402 gpr_log(
GPR_INFO,
"chand=%p calld=%p: resolution failed, failing call",
2405 MaybeRemoveCallFromResolverQueuedCallsLocked(
elem);
2413 gpr_log(
GPR_INFO,
"chand=%p calld=%p: queuing to wait for resolution",
2416 MaybeAddCallToResolverQueuedCallsLocked(
elem);
2421 service_config_applied_ =
true;
2422 *
error = ApplyServiceConfigToCallLocked(
elem, initial_metadata_batch);
2424 MaybeRemoveCallFromResolverQueuedCallsLocked(
elem);
2443 "chand=%p calld=%p: creating dynamic call stack on channel_stack=%p",
2444 chand,
this, channel_stack);
2450 "chand=%p calld=%p: failed to create dynamic call: error=%s",
2453 PendingBatchesFail(
elem,
error, YieldCallCombiner);
2456 PendingBatchesResume(
elem);
2469 if (
batch_ ==
nullptr)
return;
2483 " value:",
value.as_string_view())
2490 if (
batch_ ==
nullptr)
return {};
2492 batch_->Encode(&encoder);
2493 return encoder.
Take();
2498 if (
batch_ ==
nullptr)
return absl::nullopt;
2510 template <
class Which>
2512 auto value_slice = Which::Encode(
value);
2523 std::vector<std::pair<std::string, std::string>>
Take() {
2528 std::vector<std::pair<std::string, std::string>>
out_;
2544 auto it = call_attributes.find(
type);
2560 if (
lb_call_->backend_metric_data_ ==
nullptr &&
2561 lb_call_->recv_trailing_metadata_ !=
nullptr) {
2562 if (
const auto*
md =
lb_call_->recv_trailing_metadata_->get_pointer(
2569 return lb_call_->backend_metric_data_;
2582 return static_cast<char*
>(
arena_->Alloc(
size));
2602 if (call_tracer ==
nullptr)
return nullptr;
2612 bool is_transparent_retry)
2615 ?
"LoadBalancedCall"
2625 on_call_destruction_complete_(on_call_destruction_complete),
2626 call_dispatch_controller_(call_dispatch_controller),
2627 call_attempt_tracer_(
2628 GetCallAttemptTracer(
args.
context, is_transparent_retry)) {
2644 if (on_call_destruction_complete_ !=
nullptr) {
2658 if (call_attempt_tracer_ !=
nullptr) {
2661 call_attempt_tracer_->RecordEnd(latency);
2682 const size_t idx = GetBatchIndex(
batch);
2685 "chand=%p lb_call=%p: adding pending batch at index %" PRIuPTR,
2706 YieldCallCombinerPredicate yield_call_combiner_predicate) {
2709 failure_error_ =
error;
2711 size_t num_batches = 0;
2716 "chand=%p lb_call=%p: failing %" PRIuPTR
" pending batches: %s",
2722 if (
batch !=
nullptr) {
2725 FailPendingBatchInCallCombiner,
batch,
2726 grpc_schedule_on_exec_ctx);
2728 "PendingBatchesFail");
2732 if (yield_call_combiner_predicate(closures)) {
2753 size_t num_batches = 0;
2758 "chand=%p lb_call=%p: starting %" PRIuPTR
2759 " pending batches on subchannel_call=%p",
2760 chand_,
this, num_batches, subchannel_call_.get());
2765 if (
batch !=
nullptr) {
2768 ResumePendingBatchInCallCombiner,
batch,
2769 grpc_schedule_on_exec_ctx);
2771 "resuming pending batch from LB call");
2784 "chand=%p lb_call=%p: batch started from above: %s, "
2785 "call_attempt_tracer_=%p",
2787 call_attempt_tracer_);
2790 if (call_attempt_tracer_ !=
nullptr) {
2793 call_attempt_tracer_->RecordCancel(
2797 call_attempt_tracer_->RecordSendInitialMetadata(
2803 SendInitialMetadataOnComplete,
this,
nullptr);
2807 call_attempt_tracer_->RecordSendMessage(
2811 call_attempt_tracer_->RecordSendTrailingMetadata(
2838 transport_stream_stats_ =
2851 if (subchannel_call_ !=
nullptr) {
2854 "chand=%p lb_call=%p: starting batch on subchannel_call=%p",
2855 chand_,
this, subchannel_call_.get());
2857 subchannel_call_->StartTransportStreamOpBatch(
batch);
2865 gpr_log(
GPR_INFO,
"chand=%p lb_call=%p: failing batch with error: %s",
2887 PendingBatchesFail(
GRPC_ERROR_REF(cancel_error_), NoYieldCallCombiner);
2894 PendingBatchesAdd(
batch);
2900 "chand=%p lb_call=%p: grabbing data plane mutex to perform pick",
2908 "chand=%p lb_call=%p: saved batch, yielding call combiner",
2912 "batch does not include send_initial_metadata");
2921 "chand=%p lb_call=%p: got on_complete for send_initial_metadata: "
2925 self->call_attempt_tracer_->RecordOnDoneSendInitialMetadata(
2926 self->peer_string_);
2928 self->original_send_initial_metadata_on_complete_,
2937 "chand=%p lb_call=%p: got recv_initial_metadata_ready: error=%s",
2942 self->call_attempt_tracer_->RecordReceivedInitialMetadata(
2943 self->recv_initial_metadata_, 0 );
2953 gpr_log(
GPR_INFO,
"chand=%p lb_call=%p: got recv_message_ready: error=%s",
2956 if (
self->recv_message_->has_value()) {
2957 self->call_attempt_tracer_->RecordReceivedMessage(**
self->recv_message_);
2968 "chand=%p lb_call=%p: got recv_trailing_metadata_ready: error=%s "
2969 "call_attempt_tracer_=%p lb_subchannel_call_tracker_=%p "
2970 "failure_error_=%s",
2972 self->call_attempt_tracer_,
self->lb_subchannel_call_tracker_.get(),
2976 if (
self->call_attempt_tracer_ !=
nullptr ||
2977 self->lb_subchannel_call_tracker_ !=
nullptr) {
2989 const auto&
md = *
self->recv_trailing_metadata_;
3000 self->RecordCallCompletion(
status);
3004 error =
self->failure_error_;
3016 if (call_attempt_tracer_ !=
nullptr) {
3017 call_attempt_tracer_->RecordReceivedTrailingMetadata(
3022 if (lb_subchannel_call_tracker_ !=
nullptr) {
3027 lb_subchannel_call_tracker_->Finish(
args);
3028 lb_subchannel_call_tracker_.reset();
3043 "chand=%p lb_call=%p: create subchannel_call=%p: error=%s",
chand_,
3046 if (on_call_destruction_complete_ !=
nullptr) {
3047 subchannel_call_->SetAfterCallStackDestroy(on_call_destruction_complete_);
3048 on_call_destruction_complete_ =
nullptr;
3051 PendingBatchesFail(
error, YieldCallCombiner);
3053 PendingBatchesResume();
3076 auto* lb_call =
self->
lb_call_.get();
3077 auto* chand = lb_call->chand_;
3082 "chand=%p lb_call=%p: cancelling queued pick: "
3083 "error=%s self=%p calld->pick_canceller=%p",
3085 lb_call->lb_call_canceller_);
3088 lb_call->call_dispatch_controller_->Commit();
3090 lb_call->MaybeRemoveCallFromLbQueuedCallsLocked();
3093 YieldCallCombinerIfPendingBatchesFound);
3104 void ClientChannel::LoadBalancedCall::MaybeRemoveCallFromLbQueuedCallsLocked() {
3105 if (!queued_pending_lb_pick_)
return;
3107 gpr_log(
GPR_INFO,
"chand=%p lb_call=%p: removing from queued picks list",
3111 queued_pending_lb_pick_ =
false;
3113 lb_call_canceller_ =
nullptr;
3116 void ClientChannel::LoadBalancedCall::MaybeAddCallToLbQueuedCallsLocked() {
3117 if (queued_pending_lb_pick_)
return;
3122 queued_pending_lb_pick_ =
true;
3123 queued_call_.lb_call =
this;
3126 lb_call_canceller_ =
new LbQueuedCallCanceller(
Ref());
3141 "chand=%p lb_call=%p: failed to pick subchannel: error=%s",
3147 self->call_dispatch_controller_->Commit();
3148 self->CreateSubchannelCall();
3157 pick_complete =
self->PickSubchannelLocked(&
error);
3159 if (pick_complete) {
3160 PickDone(
self,
error);
3165 bool ClientChannel::LoadBalancedCall::PickSubchannelLocked(
3174 const uint32_t send_initial_metadata_flags =
3178 pick_args.
path =
path_.as_string_view();
3179 LbCallState lb_call_state(
this);
3181 Metadata initial_metadata(initial_metadata_batch);
3184 return HandlePickResult<bool>(
3191 "chand=%p lb_call=%p: LB pick succeeded: subchannel=%p",
3197 SubchannelWrapper*
subchannel =
static_cast<SubchannelWrapper*
>(
3207 "chand=%p lb_call=%p: subchannel returned by LB picker "
3208 "has no connected subchannel; queueing pick",
3211 MaybeAddCallToLbQueuedCallsLocked();
3214 lb_subchannel_call_tracker_ =
3216 if (lb_subchannel_call_tracker_ !=
nullptr) {
3217 lb_subchannel_call_tracker_->Start();
3219 MaybeRemoveCallFromLbQueuedCallsLocked();
3223 [
this](LoadBalancingPolicy::PickResult::Queue* )
3229 MaybeAddCallToLbQueuedCallsLocked();
3233 [
this, send_initial_metadata_flags,
3238 chand_,
this, fail_pick->status.ToString().c_str());
3242 if ((send_initial_metadata_flags &
3247 "Failed to pick subchannel", &lb_error, 1);
3249 MaybeRemoveCallFromLbQueuedCallsLocked();
3254 MaybeAddCallToLbQueuedCallsLocked();
3258 [
this, &
error](LoadBalancingPolicy::PickResult::Drop* drop_pick)
3262 chand_,
this, drop_pick->status.ToString().c_str());
3267 MaybeRemoveCallFromLbQueuedCallsLocked();
ClientChannelControlHelper(ClientChannel *chand)
void AddTraceEvent(TraceSeverity severity, absl::string_view message) override ABSL_EXCLUSIVE_LOCKS_REQUIRED(*chand_ -> work_serializer_)
void MaybeRemoveCallFromLbQueuedCallsLocked() ABSL_EXCLUSIVE_LOCKS_REQUIRED(&ClientChannel void MaybeAddCallToLbQueuedCallsLocked() ABSL_EXCLUSIVE_LOCKS_REQUIRED(&ClientChannel ClientChannel * chand_
static ClientChannel * GetFromChannel(Channel *channel)
std::vector< Json > Array
#define GRPC_CLOSURE_INIT(closure, cb, cb_arg, scheduler)
WatcherWrapper(std::unique_ptr< SubchannelInterface::ConnectivityStateWatcherInterface > watcher, RefCountedPtr< SubchannelWrapper > parent)
const grpc_channel_args * args
MetadataInterface * initial_metadata
char * grpc_channel_args_find_string(const grpc_channel_args *args, const char *name)
#define MAX_PENDING_BATCHES
std::string default_authority_
BackendMetricAccessor(LoadBalancedCall *lb_call)
void RemoveResolverQueuedCall(ResolverQueuedCall *to_remove, grpc_polling_entity *pollent) ABSL_EXCLUSIVE_LOCKS_REQUIRED(resolution_mu_)
TraceSeverity
Adds a trace message associated with the channel.
LoadBalancedCall(ClientChannel *chand, const grpc_call_element_args &args, grpc_polling_entity *pollent, grpc_closure *on_call_destruction_complete, ConfigSelector::CallDispatchController *call_dispatch_controller, bool is_transparent_retry)
const grpc_channel_args * args
std::string resolution_note
RefCountedPtr< SubchannelInterface > subchannel
The subchannel to be used for the call. Must be non-null.
static void RemoveWatcherFromExternalWatchersMap(ClientChannel *chand, grpc_closure *on_complete, bool cancel)
std::string uri_to_resolve_
void UpdateState(grpc_connectivity_state state, const absl::Status &status, std::unique_ptr< LoadBalancingPolicy::SubchannelPicker > picker) override ABSL_EXCLUSIVE_LOCKS_REQUIRED(*chand_ -> work_serializer_)
void StartTransportStreamOpBatch(grpc_transport_stream_op_batch *batch)
void PendingBatchesAdd(grpc_transport_stream_op_batch *batch)
grpc_error_handle cancel_error_
const grpc_slice & grpc_slice_ref_internal(const grpc_slice &slice)
static void StartTransportStreamOpBatch(grpc_call_element *elem, grpc_transport_stream_op_batch *batch)
void AddChildSubchannel(intptr_t child_uuid)
ResolverQueuedCallCanceller(grpc_call_element *elem)
static bool YieldCallCombiner(const CallCombinerClosureList &)
std::string ToString(StatusToStringMode mode=StatusToStringMode::kDefault) const
std::string StrCat(const AlphaNum &a, const AlphaNum &b)
const grpc_channel_args * channel_args_
reference emplace_back(Args &&... args)
const grpc_channel_args * args
Channel args.
void PendingBatchesFail(grpc_call_element *elem, grpc_error_handle error, YieldCallCombinerPredicate yield_call_combiner_predicate)
bool grpc_deadline_checking_enabled(const grpc_channel_args *channel_args)
const BackendMetricData * GetBackendMetricData() override
void StartTransportStreamOpBatch(grpc_transport_stream_op_batch *batch)
std::shared_ptr< WorkSerializer > work_serializer_
grpc_channel_args * grpc_channel_args_copy_and_remove(const grpc_channel_args *src, const char **to_remove, size_t num_to_remove)
void AddConnectivityWatcher(grpc_connectivity_state initial_state, OrphanablePtr< AsyncConnectivityStateWatcherInterface > watcher)
@ GRPC_CONTEXT_CALL_TRACER
Value is a CallTracer object.
ClientChannelFactory * client_channel_factory_
static grpc_arg CreateChannelArg(SubchannelPoolInterface *subchannel_pool)
#define GPR_TIMER_SCOPE(tag, important)
@ GRPC_ERROR_INT_LB_POLICY_DROP
LB policy drop.
static void SetPollent(grpc_call_element *elem, grpc_polling_entity *pollent)
static grpc_error_handle Init(grpc_channel_element *elem, grpc_channel_element_args *args)
constexpr const char * kKeepaliveThrottlingKey
grpc_closure * on_complete
Status CancelledError(absl::string_view message)
struct grpc_pollset_set grpc_pollset_set
void AddWatcherLocked() ABSL_EXCLUSIVE_LOCKS_REQUIRED(*chand_ -> work_serializer_)
void OnResolverErrorLocked(absl::Status status) ABSL_EXCLUSIVE_LOCKS_REQUIRED(*work_serializer_)
static bool NoYieldCallCombiner(const CallCombinerClosureList &)
T * grpc_channel_args_find_pointer(const grpc_channel_args *args, const char *name)
static Slice FromStaticString(const char *s)
#define ABSL_TS_UNCHECKED_READ(x)
@ GRPC_ERROR_INT_CHANNEL_CONNECTIVITY_STATE
channel connectivity state associated with the error
grpc_pollset_set * grpc_pollset_set_create()
RefCountedPtr< ConnectedSubchannel > connected_subchannel() const
void RequestConnection() override
#define GRPC_CHANNEL_STACK_UNREF(channel_stack, reason)
GPRAPI void gpr_free(void *ptr)
absl::string_view path
The path of the call. Indicates the RPC service and method name.
void grpc_transport_stream_op_batch_finish_with_failure(grpc_transport_stream_op_batch *batch, grpc_error_handle error, grpc_core::CallCombiner *call_combiner)
RefCountedPtr< LoadBalancingPolicy::Config > parsed_lb_config() const
RingHashSubchannelData * subchannel
~ClientChannelControlHelper() override
static bool MapName(const char *server_uri, const grpc_channel_args *args, char **name_to_resolve, grpc_channel_args **new_args)
absl::optional< grpc_core::SliceBuffer > grpc_message
void RemoveWatcherLocked() ABSL_EXCLUSIVE_LOCKS_REQUIRED(*chand_ -> work_serializer_)
ResolverQueuedCall * next
~WatcherWrapper() override
@ GRPC_CHANNEL_TRANSIENT_FAILURE
void PendingBatchesAdd(grpc_call_element *elem, grpc_transport_stream_op_batch *batch)
RefCountedPtr< ConnectedSubchannel > connected_subchannel_
grpc_channel_args * grpc_channel_args_copy_and_add_and_remove(const grpc_channel_args *src, const char **to_remove, size_t num_to_remove, const grpc_arg *to_add, size_t num_to_add)
void GetChannelInfo(grpc_channel_element *, const grpc_channel_info *)
#define GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(desc, errs, count)
static int Init(CMessage *self, PyObject *args, PyObject *kwargs)
WatcherWrapper * replacement_
#define GRPC_ARG_USE_LOCAL_SUBCHANNEL_POOL
grpc_metadata_batch * send_initial_metadata
void AddDataWatcher(std::unique_ptr< DataWatcherInterface > watcher) override ABSL_EXCLUSIVE_LOCKS_REQUIRED(*chand_ -> work_serializer_)
absl::string_view GetAuthority() override
Returns the channel authority.
grpc_arg grpc_channel_arg_string_create(char *name, char *value)
gpr_cycle_counter call_start_time_
ConnectivityStateChange PopConnectivityStateChange()
void Notify(grpc_connectivity_state state, const absl::Status &) override
#define GRPC_CLOSURE_CREATE(cb, cb_arg, scheduler)
static int64_t start_time
#define GRPC_ERROR_CANCELLED
void AddWatcherLocked() ABSL_EXCLUSIVE_LOCKS_REQUIRED(*chand_ -> work_serializer_)
void grpc_pollset_set_del_pollset_set(grpc_pollset_set *bag, grpc_pollset_set *item)
bool PickSubchannelLocked(grpc_error_handle *error) ABSL_EXCLUSIVE_LOCKS_REQUIRED(&ClientChannel voi AsyncPickDone)(grpc_error_handle error)
const std::string & parsed_deprecated_lb_policy() const
bool grpc_channel_args_find_bool(const grpc_channel_args *args, const char *name, bool default_value)
OrphanablePtr< LoadBalancingPolicy > CreateLbPolicyLocked(const grpc_channel_args &args) ABSL_EXCLUSIVE_LOCKS_REQUIRED(*work_serializer_)
#define GRPC_TRACE_FLAG_ENABLED(f)
OrphanablePtr< ClientChannel::LoadBalancedCall > lb_call_
void AddResolverQueuedCall(ResolverQueuedCall *call, grpc_polling_entity *pollent) ABSL_EXCLUSIVE_LOCKS_REQUIRED(resolution_mu_)
static RefCountedPtr< DynamicFilters > Create(const grpc_channel_args *args, std::vector< const grpc_channel_filter * > filters)
#define T(upbtypeconst, upbtype, ctype, default_value)
static const grpc_channel_filter kFilterVtable
#define GRPC_ARG_HEALTH_CHECK_SERVICE_NAME
std::atomic< bool > done_
RefCountedPtr< ServiceConfig > default_service_config_
#define GRPC_ARG_CLIENT_CHANNEL
grpc_core::ScopedArenaPtr arena
const size_t service_config_parser_index_
grpc_closure * original_recv_trailing_metadata_ready_
const CallAttributes & call_attributes() const
#define GRPC_ARG_CONFIG_SELECTOR
#define GRPC_ARG_CHANNELZ_CHANNEL_NODE
void grpc_client_channel_stop_backup_polling(grpc_pollset_set *interested_parties)
static void RecvMessageReady(void *arg, grpc_error_handle error)
grpc_connectivity_state CheckConnectivityState(bool try_to_connect)
The result of picking a subchannel for a call.
ResolverResultHandler(ClientChannel *chand)
static RefCountedPtr< ConfigSelector > GetFromChannelArgs(const grpc_channel_args &args)
void SetConnectivityState(grpc_connectivity_state state)
static void FailPendingBatchInCallCombiner(void *arg, grpc_error_handle error)
void UpdateStateAndPickerLocked(grpc_connectivity_state state, const absl::Status &status, const char *reason, std::unique_ptr< LoadBalancingPolicy::SubchannelPicker > picker) ABSL_EXCLUSIVE_LOCKS_REQUIRED(*work_serializer_)
bool(* YieldCallCombinerPredicate)(const CallCombinerClosureList &closures)
static void PickDone(void *arg, grpc_error_handle error)
grpc_closure resolution_done_closure_
#define GRPC_ARG_ENABLE_RETRIES
RefCountedPtr< Config > config
The LB policy config.
ABSL_NAMESPACE_BEGIN ABSL_MUST_USE_RESULT bool SimpleAtoi(absl::string_view str, int_type *out)
void ApplyUpdateInControlPlaneWorkSerializer() ABSL_EXCLUSIVE_LOCKS_REQUIRED(*parent_ -> chand_->work_serializer_)
#define GRPC_CALL_STACK_UNREF(call_stack, reason)
#define GRPC_ARG_DEFAULT_AUTHORITY
static void CheckResolution(void *arg, grpc_error_handle error)
std::unique_ptr< SubchannelCallTrackerInterface > subchannel_call_tracker
RefCountedPtr< Call > CreateCall(Call::Args args, grpc_error_handle *error)
void AddLbQueuedCall(LbQueuedCall *call, grpc_polling_entity *pollent) ABSL_EXCLUSIVE_LOCKS_REQUIRED(data_plane_mu_)
grpc_handler_private_op_data handler_private
static void CancelLocked(void *arg, grpc_error_handle error)
grpc_channel_element * grpc_channel_stack_last_element(grpc_channel_stack *channel_stack)
wrapped_grpc_channel * channel
#define GRPC_INITIAL_METADATA_WAIT_FOR_READY
void PendingBatchesResume()
RefCountedPtr< SubchannelInterface > CreateSubchannel(ServerAddress address, const grpc_channel_args &args) override ABSL_EXCLUSIVE_LOCKS_REQUIRED(*chand_ -> work_serializer_)
Creates a new subchannel with the specified channel args.
Arguments used when picking a subchannel for a call.
constexpr absl::remove_reference_t< T > && move(T &&t) noexcept
static size_t GetBatchIndex(grpc_transport_stream_op_batch *batch)
RefCountedPtr< LoadBalancedCall > lb_call_
LbQueuedCallCanceller(RefCountedPtr< LoadBalancedCall > lb_call)
std::string StrJoin(Iterator start, Iterator end, absl::string_view sep, Formatter &&fmt)
static const CoreConfiguration & Get()
constexpr bool has_value() const noexcept
ConnectivityWatcherRemover(ClientChannel *chand, AsyncConnectivityStateWatcherInterface *watcher)
grpc_transport_stream_op_batch * pending_batches_[MAX_PENDING_BATCHES]
def c_str(s, encoding='ascii')
JSON (JavaScript Object Notation).
void WatchConnectivityState(std::unique_ptr< ConnectivityStateWatcherInterface > watcher) override ABSL_EXCLUSIVE_LOCKS_REQUIRED(*chand_ -> work_serializer_)
Mutex external_watchers_mu_
#define GRPC_ARG_SERVER_URI
void grpc_channel_stack_no_post_init(grpc_channel_stack *, grpc_channel_element *)
CallData(grpc_call_element *elem, const ClientChannel &chand, const grpc_call_element_args &args)
grpc_call_stack * owning_call_
#define GRPC_INITIAL_METADATA_WAIT_FOR_READY_EXPLICITLY_SET
RefCountedPtr< ConnectivityStateWatcherInterface > Ref() GRPC_MUST_USE_RESULT
GPRAPI void gpr_log(const char *file, int line, gpr_log_severity severity, const char *format,...) GPR_PRINT_FORMAT_CHECK(4
void RemoveWatcherLocked() ABSL_EXCLUSIVE_LOCKS_REQUIRED(*chand_ -> work_serializer_)
#define ABSL_EXCLUSIVE_LOCKS_REQUIRED(...)
TraceFlag grpc_client_channel_trace(false, "client_channel")
Results returned by the resolver.
static void RecvTrailingMetadataReady(void *arg, grpc_error_handle error)
channelz::ChannelNode * channelz_node_
OrphanablePtr< LoadBalancedCall > CreateLoadBalancedCall(const grpc_call_element_args &args, grpc_polling_entity *pollent, grpc_closure *on_call_destruction_complete, ConfigSelector::CallDispatchController *call_dispatch_controller, bool is_transparent_retry)
static void SendInitialMetadataOnComplete(void *arg, grpc_error_handle error)
void UpdateServiceConfigInControlPlaneLocked(RefCountedPtr< ServiceConfig > service_config, RefCountedPtr< ConfigSelector > config_selector, std::string lb_policy_name) ABSL_EXCLUSIVE_LOCKS_REQUIRED(*work_serializer_)
#define GRPC_ARG_LB_POLICY_NAME
void grpc_pollset_set_destroy(grpc_pollset_set *pollset_set)
static const char * GetChannelConnectivityStateChangeString(grpc_connectivity_state state)
grpc_error_handle cancel_error
grpc_error_handle static ApplyServiceConfigToCallLocked(grpc_call_element *elem, grpc_metadata_batch *initial_metadata) ABSL_EXCLUSIVE_LOCKS_REQUIRED(&ClientChannel voi ResolutionDone)(void *arg, grpc_error_handle error)
void push_back(const_reference v)
static RefCountedPtr< GlobalSubchannelPool > instance()
RefCountedPtr< Subchannel > subchannel_
static RefCountedPtr< SubchannelCall > Create(Args args, grpc_error_handle *error)
ExternalConnectivityWatcher(ClientChannel *chand, grpc_polling_entity pollent, grpc_connectivity_state *state, grpc_closure *on_complete, grpc_closure *watcher_timer_init)
#define GRPC_CALL_COMBINER_STOP(call_combiner, reason)
~ExternalConnectivityWatcher() override
static void StartTransportOp(grpc_channel_element *, grpc_transport_op *)
grpc_byte_buffer * recv_message_
void PendingBatchesResume(grpc_call_element *elem)
void grpc_polling_entity_del_from_pollset_set(grpc_polling_entity *pollent, grpc_pollset_set *pss_dst)
static size_t GetBatchIndex(grpc_transport_stream_op_batch *batch)
virtual bool Equals(const ConfigSelector *other) const =0
std::string GetDefaultAuthority(absl::string_view target) const
Returns the default authority to pass from a client for target.
void OnConnectivityStateChange() override
void grpc_channel_args_destroy(grpc_channel_args *a)
GPRAPI grpc_slice grpc_slice_from_static_string(const char *source)
bool grpc_error_get_int(grpc_error_handle err, grpc_error_ints which, intptr_t *p)
grpc_transport_stream_op_batch_payload * payload
grpc_metadata_batch * send_trailing_metadata
CallCombiner * call_combiner_
~ResolverResultHandler() override
void TryToConnectLocked() ABSL_EXCLUSIVE_LOCKS_REQUIRED(*work_serializer_)
static void RecvInitialMetadataReady(void *arg, grpc_error_handle error)
grpc_transport_stream_op_batch * batch
grpc_polling_entity pollent_
void UpdateServiceConfigInDataPlaneLocked() ABSL_EXCLUSIVE_LOCKS_REQUIRED(*work_serializer_)
void AddTraceEvent(ChannelTrace::Severity severity, const grpc_slice &data)
size_type size() const noexcept
RefCountedPtr< ConnectivityStateWatcherInterface > watcher_
void CreateResolverLocked() ABSL_EXCLUSIVE_LOCKS_REQUIRED(*work_serializer_)
static void StartTransportOp(grpc_channel_element *elem, grpc_transport_op *op)
LoadBalancedCall * lb_call_
static channelz::ChannelTrace::Severity ConvertSeverityEnum(TraceSeverity severity)
static void Destroy(grpc_channel_element *elem)
WatcherWrapper * MakeReplacement()
static void GetChannelInfo(grpc_channel_element *elem, const grpc_channel_info *info)
void PendingBatchesFail(grpc_error_handle error, YieldCallCombinerPredicate yield_call_combiner_predicate)
grpc_closure recv_trailing_metadata_ready_
ValueType
Type of the value held by a Value object.
Args used to instantiate an LB policy.
static RefCountedPtr< ServiceConfig > Create(const grpc_channel_args *args, absl::string_view json_string, grpc_error_handle *error)
#define GRPC_ERROR_CREATE_FROM_STATIC_STRING(desc)
bool grpc_channel_args_want_minimal_stack(const grpc_channel_args *args)
const grpc_channel_args * channel_args() override
const grpc_channel_filter kRetryFilterVtable
grpc_metadata_array recv_trailing_metadata_
TraceFlag grpc_client_channel_lb_call_trace(false, "client_channel_lb_call")
@ GRPC_CHANNEL_CONNECTING
ClientChannel(grpc_channel_element_args *args, grpc_error_handle *error)
void RemoveLbQueuedCall(LbQueuedCall *to_remove, grpc_polling_entity *pollent) ABSL_EXCLUSIVE_LOCKS_REQUIRED(data_plane_mu_)
static RefCountedPtr< LoadBalancingPolicy::Config > ParseLoadBalancingConfig(const Json &json, grpc_error_handle *error)
std::shared_ptr< WorkSerializer > work_serializer
The work_serializer under which all LB policy calls will be run.
absl::string_view GetCallAttribute(UniqueTypeName type)
OrphanablePtr< Resolver > CreateResolver(absl::string_view target, const grpc_channel_args *args, grpc_pollset_set *pollset_set, std::shared_ptr< WorkSerializer > work_serializer, std::unique_ptr< Resolver::ResultHandler > result_handler) const
void RunClosures(CallCombiner *call_combiner)
void CancelConnectivityStateWatch(ConnectivityStateWatcherInterface *watcher) override ABSL_EXCLUSIVE_LOCKS_REQUIRED(*chand_ -> work_serializer_)
void ReportResult(Resolver::Result result) override ABSL_EXCLUSIVE_LOCKS_REQUIRED(*chand_ -> work_serializer_)
Reports a result to the channel.
virtual CallAttemptTracer * StartNewAttempt(bool is_transparent_retry)=0
void grpc_deadline_state_client_start_transport_stream_op_batch(grpc_call_element *elem, grpc_transport_stream_op_batch *op)
#define GRPC_CHANNEL_STACK_REF(channel_stack, reason)
static constexpr Duration Zero()
grpc_core::TraceFlag grpc_trace_channel(false, "channel")
ABSL_NAMESPACE_BEGIN class ABSL_MUST_USE_RESULT Status
void grpc_client_channel_start_backup_polling(grpc_pollset_set *interested_parties)
bool recv_initial_metadata
std::string grpc_transport_stream_op_batch_string(grpc_transport_stream_op_batch *op)
std::map< std::string, Json > Object
constexpr const T & value() const &
static bool YieldCallCombinerIfPendingBatchesFound(const CallCombinerClosureList &closures)
grpc_closure * original_recv_initial_metadata_ready_
static void ResumePendingBatchInCallCombiner(void *arg, grpc_error_handle ignored)
void CreateSubchannelCall()
grpc_connectivity_state state
#define GRPC_ARG_INHIBIT_HEALTH_CHECKING
grpc_call_element * elem_
TraceFlag grpc_client_channel_call_trace(false, "client_channel_call")
#define GPR_ARRAY_SIZE(array)
std::unique_ptr< SubchannelPicker > picker_
bool send_trailing_metadata
char ** service_config_json
GPRAPI grpc_slice grpc_slice_from_copied_buffer(const char *source, size_t len)
const char * ConnectivityStateName(grpc_connectivity_state state)
#define GRPC_ERROR_REF(err)
void ThrottleKeepaliveTime(int new_keepalive_time)
grpc_error_handle absl_status_to_grpc_error(absl::Status status)
grpc_pollset_set * interested_parties() override
void Destroy(grpc_transport *)
const bool deadline_checking_enabled_
static void Destroy(grpc_call_element *elem, const grpc_call_final_info *final_info, grpc_closure *then_schedule_closure)
grpc_connectivity_state initial_state_
grpc_error_handle grpc_error_set_int(grpc_error_handle src, grpc_error_ints which, intptr_t value)
grpc_closure recv_initial_metadata_ready_
int QsortCompare(const T &a, const T &b)
void Add(grpc_closure *closure, grpc_error_handle error, const char *reason)
AsyncConnectivityStateWatcherInterface * watcher_
#define GRPC_ARG_SERVICE_CONFIG
const absl::optional< std::string > & health_check_service_name() const
static grpc_error_handle Init(grpc_call_element *elem, const grpc_call_element_args *args)
grpc_deadline_state deadline_state_
@ GRPC_CONTEXT_SERVICE_CONFIG_CALL_DATA
Holds a pointer to ServiceConfigCallData associated with this call.
RefCountedPtr< DynamicFilters > dynamic_filters_
RefCountedPtr< SubchannelWrapper > parent_
std::string grpc_error_std_string(grpc_error_handle error)
void DestroyResolverAndLbPolicyLocked() ABSL_EXCLUSIVE_LOCKS_REQUIRED(*work_serializer_)
absl::StatusOr< ServerAddressList > addresses
RefCountedPtr< ServiceConfig > service_config_
gpr_timespec gpr_cycle_counter_sub(gpr_cycle_counter a, gpr_cycle_counter b)
void StartTransportOpLocked(grpc_transport_op *op) ABSL_EXCLUSIVE_LOCKS_REQUIRED(*work_serializer_)
#define GRPC_ERROR_CREATE_FROM_CPP_STRING(desc)
grpc_call_context_element * call_context_
void CreateDynamicCall(grpc_call_element *elem)
std::unique_ptr< T, Deleter > OrphanablePtr
void grpc_deadline_state_reset(grpc_call_element *elem, grpc_core::Timestamp new_deadline)
BackendMetricData backend_metric_data_
void grpc_pollset_set_add_pollset_set(grpc_pollset_set *bag, grpc_pollset_set *item)
grpc_call_context_element * call_context_
void grpc_polling_entity_add_to_pollset_set(grpc_polling_entity *pollent, grpc_pollset_set *pss_dst)
grpc_core::SliceBuffer * send_message
ResolverQueuedCall *resolver_queued_calls_ ABSL_GUARDED_BY(resolution_mu_)
grpc_metadata_batch * recv_initial_metadata
grpc_closure recv_trailing_metadata_ready_
static void CancelLocked(void *arg, grpc_error_handle error)
void RunClosuresWithoutYielding(CallCombiner *call_combiner)
bool send_initial_metadata
const ResolverRegistry & resolver_registry() const
absl::optional< grpc_core::SliceBuffer > * recv_message
grpc_slice grpc_slice_from_cpp_string(std::string str)
void RemoveConnectivityWatcher(AsyncConnectivityStateWatcherInterface *watcher)
OrphanablePtr< Resolver > resolver_
ABSL_MUST_USE_RESULT bool ok() const
static Timestamp FromCycleCounterRoundUp(gpr_cycle_counter c)
#define GRPC_ERROR_UNREF(err)
static void Run(const DebugLocation &location, grpc_closure *closure, grpc_error_handle error)
PendingBatch pending_batches_[MAX_PENDING_BATCHES]
grpc_connectivity_state state_
void CreateOrUpdateLbPolicyLocked(RefCountedPtr< LoadBalancingPolicy::Config > lb_policy_config, const absl::optional< std::string > &health_check_service_name, Resolver::Result result) ABSL_EXCLUSIVE_LOCKS_REQUIRED(*work_serializer_)
std::unique_ptr< ChannelControlHelper > channel_control_helper
static const grpc_channel_filter kFilterVtable
bool CheckResolutionLocked(grpc_call_element *elem, grpc_error_handle *error) ABSL_EXCLUSIVE_LOCKS_REQUIRED(&ClientChannel voi AsyncResolutionDone)(grpc_call_element *elem, grpc_error_handle error)
absl::optional< absl::Cord > GetPayload(absl::string_view type_url) const
static void PickSubchannel(void *arg, grpc_error_handle error)
char * AllocateString(size_t size) override
CallCombiner * call_combiner_
grpc_pollset_set * interested_parties_
void RemoveChildSubchannel(intptr_t child_uuid)
bool recv_trailing_metadata
GPRAPI char * gpr_strdup(const char *src)
void OnResolverResultChangedLocked(Resolver::Result result) ABSL_EXCLUSIVE_LOCKS_REQUIRED(*work_serializer_)
~LoadBalancedCall() override
absl::optional< std::string > health_check_service_name_
#define GRPC_ARG_SERVICE_CONFIG_OBJ
void Fail(const char *msg)
BackendMetricData * AllocateBackendMetricData() override
void grpc_error_get_status(grpc_error_handle error, grpc_core::Timestamp deadline, grpc_status_code *code, std::string *message, grpc_http2_error_code *http_error, const char **error_string)
~SubchannelWrapper() override
grpc::ClientContext context
static void ResumePendingBatchInCallCombiner(void *arg, grpc_error_handle ignored)
grpc_closure on_complete_
SubchannelWrapper(ClientChannel *chand, RefCountedPtr< Subchannel > subchannel, absl::optional< std::string > health_check_service_name)
std::vector< std::pair< std::string, std::string > > Metadata
static bool LoadBalancingPolicyExists(const char *name, bool *requires_config)
grpc_metadata_batch * recv_trailing_metadata
#define GRPC_ARG_KEEPALIVE_TIME_MS
void ResetBackoff() override
grpc_transport_stream_op_batch batch_
RefCountedPtr< DynamicFilters::Call > dynamic_call_
#define GRPC_CALL_STACK_REF(call_stack, reason)
absl::Status grpc_error_to_absl_status(grpc_error_handle error)
PHP_PROTO_OBJECT_FREE_END PHP_PROTO_OBJECT_DTOR_END intern self
int grpc_channel_args_find_integer(const grpc_channel_args *args, const char *name, const grpc_integer_options options)
internal::RefMatcher< T & > Ref(T &x)
grpc_closure * original_recv_message_ready_
RefCountedPtr< GrpcLb > lb_policy_
grpc_closure recv_message_ready_
grpc_arg grpc_channel_arg_pointer_create(char *name, void *value, const grpc_arg_pointer_vtable *vtable)
BackendMetricAllocator(Arena *arena)
void grpc_pollset_set_add_pollset(grpc_pollset_set *pollset_set, grpc_pollset *pollset)
grpc_error_handle DoPingLocked(grpc_transport_op *op) ABSL_EXCLUSIVE_LOCKS_REQUIRED(*work_serializer_)
static void send_initial_metadata(void)
void MaybeRemoveCallFromResolverQueuedCallsLocked(grpc_call_element *elem) ABSL_EXCLUSIVE_LOCKS_REQUIRED(&ClientChannel void static MaybeAddCallToResolverQueuedCallsLocked(grpc_call_element *elem) ABSL_EXCLUSIVE_LOCKS_REQUIRED(&ClientChannel voi RecvTrailingMetadataReadyForConfigSelectorCommitCallback)(void *arg, grpc_error_handle error)
std::unique_ptr< SubchannelInterface::ConnectivityStateWatcherInterface > watcher_
static void Run(const DebugLocation &location, grpc_closure *closure, grpc_error_handle error)
grpc_metadata_array recv_initial_metadata_
OrphanablePtr< AsyncConnectivityStateWatcherInterface > watcher_
grpc_channel_stack * owning_stack_
ConnectivityWatcherAdder(ClientChannel *chand, grpc_connectivity_state initial_state, OrphanablePtr< AsyncConnectivityStateWatcherInterface > watcher)
grpc_polling_entity * pollent_
const BackendMetricData * ParseBackendMetricData(absl::string_view serialized_load_report, BackendMetricAllocatorInterface *allocator)
grpc_channel_args * grpc_channel_args_copy_and_add(const grpc_channel_args *src, const grpc_arg *to_add, size_t num_to_add)
RefCountedPtr< SubchannelInterface > Ref() GRPC_MUST_USE_RESULT
static struct rpc_state state
void grpc_slice_unref_internal(const grpc_slice &slice)
grpc_polling_entity pollent_
grpc_call_stack * owning_call_
struct grpc_transport_stream_op_batch_payload::@46 cancel_stream
void RequestReresolution() override ABSL_EXCLUSIVE_LOCKS_REQUIRED(*chand_ -> work_serializer_)
Requests that the resolver re-resolve.
grpc_closure * original_recv_trailing_metadata_ready_
void RecordCallCompletion(absl::Status status)
#define GRPC_ERROR_IS_NONE(err)
static void FailPendingBatchInCallCombiner(void *arg, grpc_error_handle error)
grpc
Author(s):
autogenerated on Fri May 16 2025 02:57:55