subchannel_list.h
Go to the documentation of this file.
1 //
2 // Copyright 2015 gRPC authors.
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 // http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 //
16 
17 #ifndef GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_LB_POLICY_SUBCHANNEL_LIST_H
18 #define GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_LB_POLICY_SUBCHANNEL_LIST_H
19 
21 
22 #include <inttypes.h>
23 #include <string.h>
24 
25 #include <memory>
26 #include <string>
27 #include <utility>
28 #include <vector>
29 
30 #include "absl/status/status.h"
31 #include "absl/types/optional.h"
32 
35 #include <grpc/support/log.h>
36 
46 
47 // Code for maintaining a list of subchannels within an LB policy.
48 //
49 // To use this, callers must create their own subclasses, like so:
50 /*
51 
52 class MySubchannelList; // Forward declaration.
53 
54 class MySubchannelData
55  : public SubchannelData<MySubchannelList, MySubchannelData> {
56  public:
57  void ProcessConnectivityChangeLocked(
58  absl::optional<grpc_connectivity_state> old_state,
59  grpc_connectivity_state new_state) override {
60  // ...code to handle connectivity changes...
61  }
62 };
63 
64 class MySubchannelList
65  : public SubchannelList<MySubchannelList, MySubchannelData> {
66 };
67 
68 */
69 // All methods will be called from within the client_channel work serializer.
70 
71 namespace grpc_core {
72 
73 // Forward declaration.
74 template <typename SubchannelListType, typename SubchannelDataType>
76 
77 // Stores data for a particular subchannel in a subchannel list.
78 // Callers must create a subclass that implements the
79 // ProcessConnectivityChangeLocked() method.
80 template <typename SubchannelListType, typename SubchannelDataType>
82  public:
83  // Returns a pointer to the subchannel list containing this object.
84  SubchannelListType* subchannel_list() const {
85  return static_cast<SubchannelListType*>(subchannel_list_);
86  }
87 
88  // Returns the index into the subchannel list of this object.
89  size_t Index() const {
90  return static_cast<size_t>(static_cast<const SubchannelDataType*>(this) -
91  subchannel_list_->subchannel(0));
92  }
93 
94  // Returns a pointer to the subchannel.
95  SubchannelInterface* subchannel() const { return subchannel_.get(); }
96 
97  // Returns the cached connectivity state, if any.
99  return connectivity_state_;
100  }
102 
103  // Resets the connection backoff.
104  void ResetBackoffLocked();
105 
106  // Cancels any pending connectivity watch and unrefs the subchannel.
107  void ShutdownLocked();
108 
109  protected:
112  const ServerAddress& address,
114 
115  virtual ~SubchannelData();
116 
117  // This method will be invoked once soon after instantiation to report
118  // the current connectivity state, and it will then be invoked again
119  // whenever the connectivity state changes.
120  virtual void ProcessConnectivityChangeLocked(
122  grpc_connectivity_state new_state) = 0;
123 
124  private:
125  // For accessing StartConnectivityWatchLocked().
126  friend class SubchannelList<SubchannelListType, SubchannelDataType>;
127 
128  // Watcher for subchannel connectivity state.
129  class Watcher
131  public:
135  : subchannel_data_(subchannel_data),
137 
138  ~Watcher() override {
139  subchannel_list_.reset(DEBUG_LOCATION, "Watcher dtor");
140  }
141 
143  absl::Status status) override;
144 
146  return subchannel_list_->policy()->interested_parties();
147  }
148 
149  private:
152  };
153 
154  // Starts watching the connectivity state of the subchannel.
155  // ProcessConnectivityChangeLocked() will be called whenever the
156  // connectivity state changes.
158 
159  // Cancels watching the connectivity state of the subchannel.
160  void CancelConnectivityWatchLocked(const char* reason);
161 
162  // Unrefs the subchannel.
163  void UnrefSubchannelLocked(const char* reason);
164 
165  // Backpointer to owning subchannel list. Not owned.
167  // The subchannel.
169  // Will be non-null when the subchannel's state is being watched.
171  nullptr;
172  // Data updated by the watcher.
175 };
176 
177 // A list of subchannels.
178 template <typename SubchannelListType, typename SubchannelDataType>
179 class SubchannelList : public InternallyRefCounted<SubchannelListType> {
180  public:
181  // Starts watching the connectivity state of all subchannels.
182  // Must be called immediately after instantiation.
183  void StartWatchingLocked();
184 
185  // The number of subchannels in the list.
186  size_t num_subchannels() const { return subchannels_.size(); }
187 
188  // The data for the subchannel at a particular index.
189  SubchannelDataType* subchannel(size_t index) {
190  return subchannels_[index].get();
191  }
192 
193  // Returns true if the subchannel list is shutting down.
194  bool shutting_down() const { return shutting_down_; }
195 
196  // Accessors.
197  LoadBalancingPolicy* policy() const { return policy_; }
198  const char* tracer() const { return tracer_; }
199 
200  // Resets connection backoff of all subchannels.
201  void ResetBackoffLocked();
202 
203  void Orphan() override {
204  ShutdownLocked();
206  }
207 
208  protected:
210  ServerAddressList addresses,
212  const grpc_channel_args& args);
213 
214  virtual ~SubchannelList();
215 
216  virtual void ShutdownLocked();
217 
218  private:
219  // For accessing Ref() and Unref().
220  friend class SubchannelData<SubchannelListType, SubchannelDataType>;
221 
222  // Backpointer to owning policy.
224 
225  const char* tracer_;
226 
227  // The list of subchannels.
228  // We use ManualConstructor here to support SubchannelDataType classes
229  // that are not copyable.
230  std::vector<ManualConstructor<SubchannelDataType>> subchannels_;
231 
232  // Is this list shutting down? This may be true due to the shutdown of the
233  // policy itself or because a newer update has arrived while this one hadn't
234  // finished processing.
235  bool shutting_down_ = false;
236 };
237 
238 //
239 // implementation -- no user-servicable parts below
240 //
241 
242 //
243 // SubchannelData::Watcher
244 //
245 
246 template <typename SubchannelListType, typename SubchannelDataType>
250  if (GPR_UNLIKELY(subchannel_list_->tracer() != nullptr)) {
251  gpr_log(
252  GPR_INFO,
253  "[%s %p] subchannel list %p index %" PRIuPTR " of %" PRIuPTR
254  " (subchannel %p): connectivity changed: old_state=%s, new_state=%s, "
255  "status=%s, shutting_down=%d, pending_watcher=%p",
256  subchannel_list_->tracer(), subchannel_list_->policy(),
258  subchannel_list_->num_subchannels(),
259  subchannel_data_->subchannel_.get(),
260  (subchannel_data_->connectivity_state_.has_value()
261  ? ConnectivityStateName(*subchannel_data_->connectivity_state_)
262  : "N/A"),
263  ConnectivityStateName(new_state), status.ToString().c_str(),
264  subchannel_list_->shutting_down(), subchannel_data_->pending_watcher_);
265  }
266  if (!subchannel_list_->shutting_down() &&
267  subchannel_data_->pending_watcher_ != nullptr) {
269  subchannel_data_->connectivity_state_;
270  subchannel_data_->connectivity_state_ = new_state;
271  subchannel_data_->connectivity_status_ = status;
272  // Call the subclass's ProcessConnectivityChangeLocked() method.
273  subchannel_data_->ProcessConnectivityChangeLocked(old_state, new_state);
274  }
275 }
276 
277 //
278 // SubchannelData
279 //
280 
281 template <typename SubchannelListType, typename SubchannelDataType>
284  const ServerAddress& /*address*/,
287 
288 template <typename SubchannelListType, typename SubchannelDataType>
290  GPR_ASSERT(subchannel_ == nullptr);
291 }
292 
293 template <typename SubchannelListType, typename SubchannelDataType>
295  UnrefSubchannelLocked(const char* reason) {
296  if (subchannel_ != nullptr) {
297  if (GPR_UNLIKELY(subchannel_list_->tracer() != nullptr)) {
299  "[%s %p] subchannel list %p index %" PRIuPTR " of %" PRIuPTR
300  " (subchannel %p): unreffing subchannel (%s)",
301  subchannel_list_->tracer(), subchannel_list_->policy(),
302  subchannel_list_, Index(), subchannel_list_->num_subchannels(),
303  subchannel_.get(), reason);
304  }
305  subchannel_.reset();
306  }
307 }
308 
309 template <typename SubchannelListType, typename SubchannelDataType>
310 void SubchannelData<SubchannelListType,
311  SubchannelDataType>::ResetBackoffLocked() {
312  if (subchannel_ != nullptr) {
313  subchannel_->ResetBackoff();
314  }
315 }
316 
317 template <typename SubchannelListType, typename SubchannelDataType>
318 void SubchannelData<SubchannelListType,
319  SubchannelDataType>::StartConnectivityWatchLocked() {
320  if (GPR_UNLIKELY(subchannel_list_->tracer() != nullptr)) {
322  "[%s %p] subchannel list %p index %" PRIuPTR " of %" PRIuPTR
323  " (subchannel %p): starting watch",
324  subchannel_list_->tracer(), subchannel_list_->policy(),
325  subchannel_list_, Index(), subchannel_list_->num_subchannels(),
326  subchannel_.get());
327  }
328  GPR_ASSERT(pending_watcher_ == nullptr);
329  pending_watcher_ =
330  new Watcher(this, subchannel_list()->Ref(DEBUG_LOCATION, "Watcher"));
331  subchannel_->WatchConnectivityState(
332  std::unique_ptr<SubchannelInterface::ConnectivityStateWatcherInterface>(
333  pending_watcher_));
334 }
335 
336 template <typename SubchannelListType, typename SubchannelDataType>
338  CancelConnectivityWatchLocked(const char* reason) {
339  if (pending_watcher_ != nullptr) {
340  if (GPR_UNLIKELY(subchannel_list_->tracer() != nullptr)) {
342  "[%s %p] subchannel list %p index %" PRIuPTR " of %" PRIuPTR
343  " (subchannel %p): canceling connectivity watch (%s)",
344  subchannel_list_->tracer(), subchannel_list_->policy(),
345  subchannel_list_, Index(), subchannel_list_->num_subchannels(),
346  subchannel_.get(), reason);
347  }
348  subchannel_->CancelConnectivityStateWatch(pending_watcher_);
349  pending_watcher_ = nullptr;
350  }
351 }
352 
353 template <typename SubchannelListType, typename SubchannelDataType>
355  CancelConnectivityWatchLocked("shutdown");
356  UnrefSubchannelLocked("shutdown");
357 }
358 
359 //
360 // SubchannelList
361 //
362 
363 template <typename SubchannelListType, typename SubchannelDataType>
365  LoadBalancingPolicy* policy, const char* tracer,
366  ServerAddressList addresses,
368  const grpc_channel_args& args)
369  : InternallyRefCounted<SubchannelListType>(tracer),
370  policy_(policy),
371  tracer_(tracer) {
372  if (GPR_UNLIKELY(tracer_ != nullptr)) {
374  "[%s %p] Creating subchannel list %p for %" PRIuPTR " subchannels",
375  tracer_, policy, this, addresses.size());
376  }
377  subchannels_.reserve(addresses.size());
378  // Create a subchannel for each address.
379  for (ServerAddress address : addresses) {
381  helper->CreateSubchannel(address, args);
382  if (subchannel == nullptr) {
383  // Subchannel could not be created.
384  if (GPR_UNLIKELY(tracer_ != nullptr)) {
386  "[%s %p] could not create subchannel for address %s, ignoring",
387  tracer_, policy_, address.ToString().c_str());
388  }
389  continue;
390  }
391  if (GPR_UNLIKELY(tracer_ != nullptr)) {
393  "[%s %p] subchannel list %p index %" PRIuPTR
394  ": Created subchannel %p for address %s",
395  tracer_, policy_, this, subchannels_.size(), subchannel.get(),
396  address.ToString().c_str());
397  }
398  subchannels_.emplace_back();
399  subchannels_.back().Init(this, std::move(address), std::move(subchannel));
400  }
401 }
402 
403 template <typename SubchannelListType, typename SubchannelDataType>
405  if (GPR_UNLIKELY(tracer_ != nullptr)) {
406  gpr_log(GPR_INFO, "[%s %p] Destroying subchannel_list %p", tracer_, policy_,
407  this);
408  }
409  for (auto& sd : subchannels_) {
410  sd.Destroy();
411  }
412 }
413 
414 template <typename SubchannelListType, typename SubchannelDataType>
415 void SubchannelList<SubchannelListType,
416  SubchannelDataType>::StartWatchingLocked() {
417  for (auto& sd : subchannels_) {
418  sd->StartConnectivityWatchLocked();
419  }
420 }
421 
422 template <typename SubchannelListType, typename SubchannelDataType>
424  if (GPR_UNLIKELY(tracer_ != nullptr)) {
425  gpr_log(GPR_INFO, "[%s %p] Shutting down subchannel_list %p", tracer_,
426  policy_, this);
427  }
429  shutting_down_ = true;
430  for (auto& sd : subchannels_) {
431  sd->ShutdownLocked();
432  }
433 }
434 
435 template <typename SubchannelListType, typename SubchannelDataType>
436 void SubchannelList<SubchannelListType,
437  SubchannelDataType>::ResetBackoffLocked() {
438  for (auto& sd : subchannels_) {
439  sd->ResetBackoffLocked();
440  }
441 }
442 
443 } // namespace grpc_core
444 
445 #endif // GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_LB_POLICY_SUBCHANNEL_LIST_H
grpc_core::SubchannelData
Definition: subchannel_list.h:81
grpc_core::SubchannelData::connectivity_status
absl::Status connectivity_status()
Definition: subchannel_list.h:101
GPR_INFO
#define GPR_INFO
Definition: include/grpc/impl/codegen/log.h:56
grpc_core::SubchannelList
Definition: subchannel_list.h:75
grpc_core::SubchannelData::subchannel_
RefCountedPtr< SubchannelInterface > subchannel_
Definition: subchannel_list.h:168
orphanable.h
log.h
subchannel_list_
OrphanablePtr< PickFirstSubchannelList > subchannel_list_
Definition: pick_first.cc:165
absl::Status::ToString
std::string ToString(StatusToStringMode mode=StatusToStringMode::kDefault) const
Definition: third_party/abseil-cpp/absl/status/status.h:821
grpc_core::LoadBalancingPolicy
Definition: lb_policy.h:95
connectivity_state.h
grpc_core::LoadBalancingPolicy::ChannelControlHelper::CreateSubchannel
virtual RefCountedPtr< SubchannelInterface > CreateSubchannel(ServerAddress address, const grpc_channel_args &args)=0
Creates a new subchannel with the specified channel args.
grpc_core::RefCountedPtr::get
T * get() const
Definition: ref_counted_ptr.h:146
grpc_core::SubchannelData::subchannel_list_
SubchannelList< SubchannelListType, SubchannelDataType > * subchannel_list_
Definition: subchannel_list.h:166
grpc_core::SubchannelList::shutting_down_
bool shutting_down_
Definition: subchannel_list.h:235
grpc_core::SubchannelList::Orphan
void Orphan() override
Definition: subchannel_list.h:203
grpc_core::InternallyRefCounted::Unref
void Unref()
Definition: orphanable.h:100
grpc_core
Definition: call_metric_recorder.h:31
grpc_core::SubchannelData::Watcher::Watcher
Watcher(SubchannelData< SubchannelListType, SubchannelDataType > *subchannel_data, RefCountedPtr< SubchannelListType > subchannel_list)
Definition: subchannel_list.h:132
grpc_core::SubchannelData::SubchannelData
SubchannelData(SubchannelList< SubchannelListType, SubchannelDataType > *subchannel_list, const ServerAddress &address, RefCountedPtr< SubchannelInterface > subchannel)
Definition: subchannel_list.h:282
grpc_pollset_set
struct grpc_pollset_set grpc_pollset_set
Definition: iomgr_fwd.h:23
string.h
grpc_core::RefCountedPtr::reset
void reset(T *value=nullptr)
Definition: ref_counted_ptr.h:111
grpc_core::SubchannelData::ShutdownLocked
void ShutdownLocked()
Definition: subchannel_list.h:354
grpc_core::SubchannelList::num_subchannels
size_t num_subchannels() const
Definition: subchannel_list.h:186
lb_policy.h
grpc_core::ServerAddress
Definition: server_address.h:49
status
absl::Status status
Definition: rls.cc:251
subchannel_
RefCountedPtr< Subchannel > subchannel_
Definition: oob_backend_metric.cc:112
grpc_channel_args
Definition: grpc_types.h:132
subchannel_interface.h
grpc_core::SubchannelData::Watcher::OnConnectivityStateChange
void OnConnectivityStateChange(grpc_connectivity_state new_state, absl::Status status) override
Definition: subchannel_list.h:248
grpc_core::SubchannelList::subchannels_
std::vector< ManualConstructor< SubchannelDataType > > subchannels_
Definition: subchannel_list.h:230
grpc_connectivity_state
grpc_connectivity_state
Definition: include/grpc/impl/codegen/connectivity_state.h:30
grpc_types.h
grpc_core::SubchannelList::~SubchannelList
virtual ~SubchannelList()
Definition: subchannel_list.h:404
grpc_core::SubchannelData::Watcher
Definition: subchannel_list.h:129
grpc_core::SubchannelData::~SubchannelData
virtual ~SubchannelData()
Definition: subchannel_list.h:289
DEBUG_LOCATION
#define DEBUG_LOCATION
Definition: debug_location.h:41
grpc_core::SubchannelData::Watcher::subchannel_list_
RefCountedPtr< SubchannelListType > subchannel_list_
Definition: subchannel_list.h:151
asyncio_get_stats.args
args
Definition: asyncio_get_stats.py:40
grpc_core::RefCountedPtr
Definition: ref_counted_ptr.h:35
absl::move
constexpr absl::remove_reference_t< T > && move(T &&t) noexcept
Definition: abseil-cpp/absl/utility/utility.h:221
GPR_ASSERT
#define GPR_ASSERT(x)
Definition: include/grpc/impl/codegen/log.h:94
grpc_core::SubchannelList::tracer_
const char * tracer_
Definition: subchannel_list.h:225
grpc_core::SubchannelData::StartConnectivityWatchLocked
void StartConnectivityWatchLocked()
Definition: subchannel_list.h:319
grpc_core::SubchannelData::Watcher::subchannel_data_
SubchannelData< SubchannelListType, SubchannelDataType > * subchannel_data_
Definition: subchannel_list.h:150
grpc_core::SubchannelList::SubchannelList
SubchannelList(LoadBalancingPolicy *policy, const char *tracer, ServerAddressList addresses, LoadBalancingPolicy::ChannelControlHelper *helper, const grpc_channel_args &args)
Definition: subchannel_list.h:364
gpr_log
GPRAPI void gpr_log(const char *file, int line, gpr_log_severity severity, const char *format,...) GPR_PRINT_FORMAT_CHECK(4
GPR_UNLIKELY
#define GPR_UNLIKELY(x)
Definition: impl/codegen/port_platform.h:770
connectivity_state.h
grpc_core::SubchannelList::policy
LoadBalancingPolicy * policy() const
Definition: subchannel_list.h:197
grpc_core::SubchannelData::Watcher::~Watcher
~Watcher() override
Definition: subchannel_list.h:138
grpc_core::SubchannelData::Index
size_t Index() const
Definition: subchannel_list.h:89
absl::optional< grpc_connectivity_state >
grpc_core::SubchannelList::tracer
const char * tracer() const
Definition: subchannel_list.h:198
server_address.h
grpc_core::SubchannelData::pending_watcher_
SubchannelInterface::ConnectivityStateWatcherInterface * pending_watcher_
Definition: subchannel_list.h:170
grpc_core::SubchannelData::Watcher::interested_parties
grpc_pollset_set * interested_parties() override
Definition: subchannel_list.h:145
grpc_core::InternallyRefCounted
Definition: orphanable.h:73
grpc_core::ServerAddressList
std::vector< ServerAddress > ServerAddressList
Definition: server_address.h:120
grpc_core::SubchannelData::subchannel
SubchannelInterface * subchannel() const
Definition: subchannel_list.h:95
manual_constructor.h
grpc_core::SubchannelData::connectivity_state
absl::optional< grpc_connectivity_state > connectivity_state()
Definition: subchannel_list.h:98
grpc_core::SubchannelList::StartWatchingLocked
void StartWatchingLocked()
Definition: subchannel_list.h:416
shutting_down_
bool shutting_down_
Definition: grpclb.cc:516
grpc_core::ConnectivityStateName
const char * ConnectivityStateName(grpc_connectivity_state state)
Definition: connectivity_state.cc:38
grpc_core::SubchannelData::connectivity_state_
absl::optional< grpc_connectivity_state > connectivity_state_
Definition: subchannel_list.h:173
debug_location.h
absl::Status
Definition: third_party/abseil-cpp/absl/status/status.h:424
index
int index
Definition: bloaty/third_party/protobuf/php/ext/google/protobuf/protobuf.h:1184
subchannels_
std::set< SubchannelWrapper * > subchannels_
Definition: outlier_detection.cc:312
std
Definition: grpcpp/impl/codegen/async_unary_call.h:407
grpc_core::SubchannelData::UnrefSubchannelLocked
void UnrefSubchannelLocked(const char *reason)
Definition: subchannel_list.h:295
grpc_core::SubchannelData::ResetBackoffLocked
void ResetBackoffLocked()
Definition: subchannel_list.h:311
grpc_core::LoadBalancingPolicy::ChannelControlHelper
Definition: lb_policy.h:275
grpc_core::SubchannelList::ResetBackoffLocked
void ResetBackoffLocked()
Definition: subchannel_list.h:437
ref_counted_ptr.h
grpc_core::SubchannelData::ProcessConnectivityChangeLocked
virtual void ProcessConnectivityChangeLocked(absl::optional< grpc_connectivity_state > old_state, grpc_connectivity_state new_state)=0
grpc_core::SubchannelList::subchannel
SubchannelDataType * subchannel(size_t index)
Definition: subchannel_list.h:189
grpc_core::SubchannelList::shutting_down
bool shutting_down() const
Definition: subchannel_list.h:194
grpc_core::SubchannelData::connectivity_status_
absl::Status connectivity_status_
Definition: subchannel_list.h:174
iomgr_fwd.h
google::protobuf::python::descriptor::Index
static PyObject * Index(PyContainer *self, PyObject *item)
Definition: bloaty/third_party/protobuf/python/google/protobuf/pyext/descriptor_containers.cc:672
grpc_core::SubchannelList::ShutdownLocked
virtual void ShutdownLocked()
Definition: subchannel_list.h:423
testing::Ref
internal::RefMatcher< T & > Ref(T &x)
Definition: cares/cares/test/gmock-1.8.0/gmock/gmock.h:8628
grpc_core::SubchannelData::subchannel_list
SubchannelListType * subchannel_list() const
Definition: subchannel_list.h:84
grpc_core::SubchannelInterface
Definition: subchannel_interface.h:37
grpc_core::SubchannelData::CancelConnectivityWatchLocked
void CancelConnectivityWatchLocked(const char *reason)
Definition: subchannel_list.h:338
grpc_core::SubchannelInterface::ConnectivityStateWatcherInterface
Definition: subchannel_interface.h:39
grpc_core::SubchannelList::policy_
LoadBalancingPolicy * policy_
Definition: subchannel_list.h:223
port_platform.h


grpc
Author(s):
autogenerated on Thu Mar 13 2025 03:01:27