subchannel_stream_client.h
Go to the documentation of this file.
1 //
2 // Copyright 2018 gRPC authors.
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 // http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 //
16 
17 #ifndef GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_SUBCHANNEL_STREAM_CLIENT_H
18 #define GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_SUBCHANNEL_STREAM_CLIENT_H
19 
21 
22 #include <atomic>
23 #include <memory>
24 
25 #include "absl/base/thread_annotations.h"
26 #include "absl/status/status.h"
27 #include "absl/strings/string_view.h"
28 #include "absl/types/optional.h"
29 
31 #include <grpc/slice.h>
32 #include <grpc/status.h>
33 
52 
53 namespace grpc_core {
54 
55 // Represents a streaming call on a subchannel that should be maintained
56 // open at all times.
57 // If the call fails with UNIMPLEMENTED, no further attempts are made.
58 // If the call fails with any other status (including OK), we retry the
59 // call with appropriate backoff.
60 // The backoff state is reset when we receive a message on a stream.
61 //
62 // Currently, this assumes server-side streaming, but it could be extended
63 // to support full bidi streaming if there is a need in the future.
65  : public InternallyRefCounted<SubchannelStreamClient> {
66  public:
67  // Interface implemented by caller. Thread safety is provided for the
68  // implementation; only one method will be called by any thread at any
69  // one time (including destruction).
70  //
71  // The address of the SubchannelStreamClient object is passed to most
72  // methods for logging purposes.
74  public:
75  virtual ~CallEventHandler() = default;
76 
77  // Returns the path for the streaming call.
78  virtual Slice GetPathLocked()
80  // Called when a new call attempt is being started.
81  virtual void OnCallStartLocked(SubchannelStreamClient* client)
83  // Called when a previous call attempt has failed and the retry
84  // timer is started before the next attempt.
85  virtual void OnRetryTimerStartLocked(SubchannelStreamClient* client)
87  // Returns the message payload to send from the client.
88  virtual grpc_slice EncodeSendMessageLocked()
90  // Called whenever a message is received from the server.
91  virtual absl::Status RecvMessageReadyLocked(
92  SubchannelStreamClient* client, absl::string_view serialized_message)
94  // Called when a stream fails.
95  virtual void RecvTrailingMetadataReadyLocked(SubchannelStreamClient* client,
98  };
99 
100  // If tracer is non-null, it enables trace logging, with the specified
101  // string being the first part of the log message.
102  // Does not take ownership of interested_parties; the caller is responsible
103  // for ensuring that it will outlive the SubchannelStreamClient.
105  RefCountedPtr<ConnectedSubchannel> connected_subchannel,
106  grpc_pollset_set* interested_parties,
107  std::unique_ptr<CallEventHandler> event_handler, const char* tracer);
108 
109  ~SubchannelStreamClient() override;
110 
111  void Orphan() override;
112 
113  private:
114  // Contains a call to the backend and all the data related to the call.
115  class CallState : public Orphanable {
116  public:
118  grpc_pollset_set* interested_parties);
119  ~CallState() override;
120 
121  void Orphan() override;
122 
123  void StartCallLocked()
125 
126  private:
127  void Cancel();
128 
131 
132  void CallEndedLocked(bool retry)
134 
135  void RecvMessageReady();
136 
137  static void OnComplete(void* arg, grpc_error_handle error);
139  static void RecvMessageReady(void* arg, grpc_error_handle error);
141  static void StartCancel(void* arg, grpc_error_handle error);
142  static void OnCancelComplete(void* arg, grpc_error_handle error);
143 
145 
148 
152 
153  // The streaming call to the backend. Always non-null.
154  // Refs are tracked manually; when the last ref is released, the
155  // CallState object will be automatically destroyed.
157 
162 
164 
165  // send_initial_metadata
167 
168  // send_message
170 
171  // send_trailing_metadata
173 
174  // recv_initial_metadata
177 
178  // recv_message
181  std::atomic<bool> seen_response_{false};
182 
183  // True if the cancel_stream batch has been started.
184  std::atomic<bool> cancelled_{false};
185 
186  // recv_trailing_metadata
190 
191  // Closure for call stack destruction.
193  };
194 
195  void StartCall();
197 
199  static void OnRetryTimer(void* arg, grpc_error_handle error);
200 
203  const char* tracer_;
205 
207  std::unique_ptr<CallEventHandler> event_handler_ ABSL_GUARDED_BY(mu_);
208 
209  // The data associated with the current health check call. It holds a ref
210  // to this SubchannelStreamClient object.
212 
213  // Call retry state.
216  grpc_closure retry_timer_callback_ ABSL_GUARDED_BY(mu_);
218 };
219 
220 } // namespace grpc_core
221 
222 #endif // GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_SUBCHANNEL_STREAM_CLIENT_H
slice.h
grpc_core::SubchannelStreamClient::CallState::arena_
ScopedArenaPtr arena_
Definition: subchannel_stream_client.h:149
grpc_core::SubchannelStreamClient::CallState::OnComplete
static void OnComplete(void *arg, grpc_error_handle error)
Definition: subchannel_stream_client.cc:350
grpc_core::CallCombiner
Definition: call_combiner.h:50
grpc_core::SubchannelStreamClient::CallState::send_trailing_metadata_
grpc_metadata_batch send_trailing_metadata_
Definition: subchannel_stream_client.h:172
orphanable.h
grpc_core::SubchannelStreamClient::ABSL_GUARDED_BY
std::unique_ptr< CallEventHandler > event_handler_ ABSL_GUARDED_BY(mu_)
metadata_batch.h
backoff.h
grpc_core::ConnectedSubchannel
Definition: subchannel.h:67
const
#define const
Definition: bloaty/third_party/zlib/zconf.h:230
grpc_event_engine::experimental::MemoryAllocator
Definition: memory_allocator.h:35
grpc_core::Orphanable
Definition: orphanable.h:39
polling_entity.h
slice.h
false
#define false
Definition: setup_once.h:323
subchannel.h
grpc_core
Definition: call_metric_recorder.h:31
client
Definition: examples/python/async_streaming/client.py:1
grpc_core::Slice
Definition: src/core/lib/slice/slice.h:282
grpc_pollset_set
struct grpc_pollset_set grpc_pollset_set
Definition: iomgr_fwd.h:23
grpc_core::SubchannelStreamClient::CallState::StartBatch
void StartBatch(grpc_transport_stream_op_batch *batch)
Definition: subchannel_stream_client.cc:305
grpc_core::SubchannelStreamClient::CallState::Cancel
void Cancel()
Definition: subchannel_stream_client.cc:337
absl::string_view
Definition: abseil-cpp/absl/strings/string_view.h:167
grpc_core::SubchannelStreamClient::interested_parties_
grpc_pollset_set * interested_parties_
Definition: subchannel_stream_client.h:202
error
grpc_error_handle error
Definition: retry_filter.cc:499
grpc_status_code
grpc_status_code
Definition: include/grpc/impl/codegen/status.h:28
grpc_core::SubchannelStreamClient::mu_
Mutex mu_
Definition: subchannel_stream_client.h:206
arena.h
closure.h
status
absl::Status status
Definition: rls.cc:251
GRPC_CONTEXT_COUNT
@ GRPC_CONTEXT_COUNT
Definition: core/lib/channel/context.h:48
grpc_core::SubchannelStreamClient::CallEventHandler::~CallEventHandler
virtual ~CallEventHandler()=default
grpc_core::SubchannelStreamClient::CallState::CallEndedLocked
void CallEndedLocked(bool retry) ABSL_EXCLUSIVE_LOCKS_REQUIRED(&subchannel_stream_client_ -> mu_)
Definition: subchannel_stream_client.cc:448
grpc_timer
Definition: iomgr/timer.h:33
grpc_core::SubchannelStreamClient::CallState::CallState
CallState(RefCountedPtr< SubchannelStreamClient > client, grpc_pollset_set *interested_parties)
Definition: subchannel_stream_client.cc:166
grpc_core::SubchannelStreamClient::CallState::RecvInitialMetadataReady
static void RecvInitialMetadataReady(void *arg, grpc_error_handle error)
Definition: subchannel_stream_client.cc:359
grpc_core::SubchannelStreamClient::CallState::cancelled_
std::atomic< bool > cancelled_
Definition: subchannel_stream_client.h:184
status.h
retry
void retry(grpc_end2end_test_config config)
Definition: retry.cc:319
grpc_core::ScopedArenaPtr
std::unique_ptr< Arena, ScopedArenaDeleter > ScopedArenaPtr
Definition: src/core/lib/resource_quota/arena.h:129
context.h
grpc_core::SubchannelStreamClient::CallState::RecvTrailingMetadataReady
static void RecvTrailingMetadataReady(void *arg, grpc_error_handle error)
Definition: subchannel_stream_client.cc:414
grpc_core::SubchannelStreamClient::CallState::recv_initial_metadata_ready_
grpc_closure recv_initial_metadata_ready_
Definition: subchannel_stream_client.h:176
grpc_core::SubchannelStreamClient::CallState::call_
SubchannelCall * call_
Definition: subchannel_stream_client.h:156
grpc_core::RefCountedPtr
Definition: ref_counted_ptr.h:35
grpc_core::SubchannelStreamClient::CallEventHandler
Definition: subchannel_stream_client.h:73
grpc_core::SubchannelStreamClient::CallState::payload_
grpc_transport_stream_op_batch_payload payload_
Definition: subchannel_stream_client.h:158
grpc_core::SubchannelStreamClient::CallState::recv_message_batch_
grpc_transport_stream_op_batch recv_message_batch_
Definition: subchannel_stream_client.h:160
ABSL_EXCLUSIVE_LOCKS_REQUIRED
#define ABSL_EXCLUSIVE_LOCKS_REQUIRED(...)
Definition: abseil-cpp/absl/base/thread_annotations.h:145
grpc_core::SubchannelStreamClient::StartCall
void StartCall()
Definition: subchannel_stream_client.cc:103
grpc_core::SubchannelStreamClient::~SubchannelStreamClient
~SubchannelStreamClient() override
Definition: subchannel_stream_client.cc:80
call_combiner.h
absl::optional
Definition: abseil-cpp/absl/types/internal/optional.h:61
grpc_core::SubchannelStreamClient::CallState::subchannel_stream_client_
RefCountedPtr< SubchannelStreamClient > subchannel_stream_client_
Definition: subchannel_stream_client.h:146
arg
Definition: cmdline.cc:40
grpc_slice
Definition: include/grpc/impl/codegen/slice.h:65
grpc_core::SubchannelStreamClient::CallState::pollent_
grpc_polling_entity pollent_
Definition: subchannel_stream_client.h:147
error.h
grpc_core::SubchannelStreamClient::CallState::StartCancel
static void StartCancel(void *arg, grpc_error_handle error)
Definition: subchannel_stream_client.cc:327
grpc_polling_entity
Definition: polling_entity.h:38
batch
grpc_transport_stream_op_batch * batch
Definition: retry_filter.cc:243
grpc_core::InternallyRefCounted
Definition: orphanable.h:73
grpc_core::SubchannelStreamClient::CallState::~CallState
~CallState() override
Definition: subchannel_stream_client.cc:180
grpc_core::SubchannelStreamClient::CallState::OnCancelComplete
static void OnCancelComplete(void *arg, grpc_error_handle error)
Definition: subchannel_stream_client.cc:320
grpc_core::SubchannelStreamClient::SubchannelStreamClient
SubchannelStreamClient(RefCountedPtr< ConnectedSubchannel > connected_subchannel, grpc_pollset_set *interested_parties, std::unique_ptr< CallEventHandler > event_handler, const char *tracer)
Definition: subchannel_stream_client.cc:50
retry_timer_callback_pending_
bool retry_timer_callback_pending_
Definition: grpclb.cc:536
grpc_core::BackOff
Definition: backoff.h:32
grpc_core::SubchannelStreamClient::CallState::recv_message_ready_
grpc_closure recv_message_ready_
Definition: subchannel_stream_client.h:180
grpc_core::SubchannelStreamClient::CallState::call_combiner_
CallCombiner call_combiner_
Definition: subchannel_stream_client.h:150
grpc_core::SubchannelStreamClient::CallState::recv_trailing_metadata_batch_
grpc_transport_stream_op_batch recv_trailing_metadata_batch_
Definition: subchannel_stream_client.h:161
grpc_core::SubchannelStreamClient::CallState::RecvMessageReady
void RecvMessageReady()
Definition: subchannel_stream_client.cc:367
grpc_transport_stream_op_batch_payload
Definition: transport.h:341
grpc_core::Mutex
Definition: src/core/lib/gprpp/sync.h:61
grpc_core::SliceBuffer
Definition: src/core/lib/slice/slice_buffer.h:44
grpc_core::SubchannelStreamClient
Definition: subchannel_stream_client.h:64
grpc_core::SubchannelStreamClient::Orphan
void Orphan() override
Definition: subchannel_stream_client.cc:87
grpc_core::SubchannelStreamClient::CallState::recv_initial_metadata_
grpc_metadata_batch recv_initial_metadata_
Definition: subchannel_stream_client.h:175
grpc_core::SubchannelStreamClient::CallState::send_initial_metadata_
grpc_metadata_batch send_initial_metadata_
Definition: subchannel_stream_client.h:166
grpc_transport_stream_stats
Definition: transport.h:249
grpc_core::SubchannelStreamClient::CallState::collect_stats_
grpc_transport_stream_stats collect_stats_
Definition: subchannel_stream_client.h:188
grpc_core::SubchannelStreamClient::CallState
Definition: subchannel_stream_client.h:115
grpc_core::SubchannelStreamClient::CallState::seen_response_
std::atomic< bool > seen_response_
Definition: subchannel_stream_client.h:181
absl::Status
Definition: third_party/abseil-cpp/absl/status/status.h:424
retry_backoff_
BackOff retry_backoff_
Definition: retry_filter.cc:566
grpc_core::OrphanablePtr
std::unique_ptr< T, Deleter > OrphanablePtr
Definition: orphanable.h:64
grpc_core::SubchannelStreamClient::CallState::StartBatchInCallCombiner
static void StartBatchInCallCombiner(void *arg, grpc_error_handle error)
Definition: subchannel_stream_client.cc:298
std
Definition: grpcpp/impl/codegen/async_unary_call.h:407
grpc_call_context_element
Definition: core/lib/channel/context.h:51
ref_counted_ptr.h
grpc_core::SubchannelStreamClient::call_allocator_
MemoryAllocator call_allocator_
Definition: subchannel_stream_client.h:204
grpc_core::SubchannelStreamClient::tracer_
const char * tracer_
Definition: subchannel_stream_client.h:203
transport.h
memory_quota.h
timer.h
grpc_core::SubchannelStreamClient::StartRetryTimerLocked
void StartRetryTimerLocked() ABSL_EXCLUSIVE_LOCKS_REQUIRED(&mu_)
Definition: subchannel_stream_client.cc:122
grpc_core::SubchannelCall
Definition: subchannel.h:96
grpc_core::SubchannelStreamClient::CallState::batch_
grpc_transport_stream_op_batch batch_
Definition: subchannel_stream_client.h:159
memory_allocator.h
grpc_core::SubchannelStreamClient::StartCallLocked
void StartCallLocked() ABSL_EXCLUSIVE_LOCKS_REQUIRED(&mu_)
Definition: subchannel_stream_client.cc:108
grpc_core::SubchannelStreamClient::CallState::recv_trailing_metadata_ready_
grpc_closure recv_trailing_metadata_ready_
Definition: subchannel_stream_client.h:189
iomgr_fwd.h
grpc_core::SubchannelStreamClient::CallState::on_complete_
grpc_closure on_complete_
Definition: subchannel_stream_client.h:163
grpc_core::SubchannelStreamClient::OnRetryTimer
static void OnRetryTimer(void *arg, grpc_error_handle error)
Definition: subchannel_stream_client.cc:144
grpc_error
Definition: error_internal.h:42
grpc_core::SubchannelStreamClient::CallState::context_
grpc_call_context_element context_[GRPC_CONTEXT_COUNT]
Definition: subchannel_stream_client.h:151
grpc_core::SubchannelStreamClient::CallState::send_message_
SliceBuffer send_message_
Definition: subchannel_stream_client.h:169
grpc_metadata_batch
Definition: metadata_batch.h:1259
grpc_transport_stream_op_batch
Definition: transport.h:284
grpc_closure
Definition: closure.h:56
grpc_core::SubchannelStreamClient::CallState::after_call_stack_destruction_
grpc_closure after_call_stack_destruction_
Definition: subchannel_stream_client.h:192
slice_buffer.h
grpc_core::SubchannelStreamClient::CallState::Orphan
void Orphan() override
Definition: subchannel_stream_client.cc:198
grpc_core::SubchannelStreamClient::CallState::AfterCallStackDestruction
static void AfterCallStackDestruction(void *arg, grpc_error_handle error)
Definition: subchannel_stream_client.cc:314
sync.h
grpc_core::SubchannelStreamClient::connected_subchannel_
RefCountedPtr< ConnectedSubchannel > connected_subchannel_
Definition: subchannel_stream_client.h:201
grpc_core::SubchannelStreamClient::CallState::recv_message_
absl::optional< SliceBuffer > recv_message_
Definition: subchannel_stream_client.h:179
retry_timer_
grpc_timer retry_timer_
Definition: retry_filter.cc:606
port_platform.h
grpc_core::SubchannelStreamClient::CallState::recv_trailing_metadata_
grpc_metadata_batch recv_trailing_metadata_
Definition: subchannel_stream_client.h:187


grpc
Author(s):
autogenerated on Thu Mar 13 2025 03:01:27