pipe.h
Go to the documentation of this file.
1 // Copyright 2021 gRPC authors.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #ifndef GRPC_CORE_LIB_PROMISE_PIPE_H
16 #define GRPC_CORE_LIB_PROMISE_PIPE_H
17 
19 
20 #include <stdint.h>
21 
22 #include "absl/types/optional.h"
23 
24 #include <grpc/support/log.h>
25 
30 
31 namespace grpc_core {
32 
33 template <typename T>
34 struct Pipe;
35 
36 namespace pipe_detail {
37 
38 template <typename T>
39 class Push;
40 template <typename T>
41 class Next;
42 
43 // Center sits between a sender and a receiver to provide a one-deep buffer of
44 // Ts
45 template <typename T>
46 class Center {
47  public:
48  // Initialize with one send ref (held by PipeSender) and one recv ref (held by
49  // PipeReceiver)
50  Center() {
51  send_refs_ = 1;
52  recv_refs_ = 1;
53  has_value_ = false;
54  }
55 
56  // Add one ref to the send side of this object, and return this.
58  send_refs_++;
59  return this;
60  }
61 
62  // Add one ref to the recv side of this object, and return this.
64  recv_refs_++;
65  return this;
66  }
67 
68  // Drop a send side ref
69  // If no send refs remain, wake due to send closure
70  // If no refs remain, destroy this object
71  void UnrefSend() {
73  send_refs_--;
74  if (0 == send_refs_) {
75  on_full_.Wake();
76  on_empty_.Wake();
77  if (0 == recv_refs_) {
78  this->~Center();
79  }
80  }
81  }
82 
83  // Drop a recv side ref
84  // If no recv refs remain, wake due to recv closure
85  // If no refs remain, destroy this object
86  void UnrefRecv() {
88  recv_refs_--;
89  if (0 == recv_refs_) {
90  on_full_.Wake();
91  on_empty_.Wake();
92  if (0 == send_refs_) {
93  this->~Center();
94  } else if (has_value_) {
95  ResetValue();
96  }
97  }
98  }
99 
100  // Try to push *value into the pipe.
101  // Return Pending if there is no space.
102  // Return true if the value was pushed.
103  // Return false if the recv end is closed.
106  if (recv_refs_ == 0) return false;
107  if (has_value_) return on_empty_.pending();
108  has_value_ = true;
109  value_ = std::move(*value);
110  on_full_.Wake();
111  return true;
112  }
113 
114  // Try to receive a value from the pipe.
115  // Return Pending if there is no value.
116  // Return the value if one was retrieved.
117  // Return nullopt if the send end is closed and no value had been pushed.
120  if (!has_value_) {
121  if (send_refs_ == 0) return absl::nullopt;
122  return on_full_.pending();
123  }
124  has_value_ = false;
125  on_empty_.Wake();
126  return std::move(value_);
127  }
128 
129  private:
130  void ResetValue() {
131  // Fancy dance to move out of value in the off chance that we reclaim some
132  // memory earlier.
133  [](T) {}(std::move(value_));
134  has_value_ = false;
135  }
137  // Number of sending objects.
138  // 0 => send is closed.
139  // 1 ref each for PipeSender and Push.
141  // Number of receiving objects.
142  // 0 => recv is closed.
143  // 1 ref each for PipeReceiver and Next.
145  // True iff there is a value in the pipe.
146  bool has_value_ : 1;
149 };
150 
151 } // namespace pipe_detail
152 
153 // Send end of a Pipe.
154 template <typename T>
155 class PipeSender {
156  public:
157  PipeSender(const PipeSender&) = delete;
158  PipeSender& operator=(const PipeSender&) = delete;
159 
160  PipeSender(PipeSender&& other) noexcept : center_(other.center_) {
161  other.center_ = nullptr;
162  }
163  PipeSender& operator=(PipeSender&& other) noexcept {
164  if (center_ != nullptr) center_->UnrefSend();
165  center_ = other.center_;
166  other.center_ = nullptr;
167  return *this;
168  }
169 
171  if (center_ != nullptr) center_->UnrefSend();
172  }
173 
174  // Send a single message along the pipe.
175  // Returns a promise that will resolve to a bool - true if the message was
176  // sent, false if it could never be sent. Blocks the promise until the
177  // receiver is either closed or able to receive another message.
179 
180  private:
181  friend struct Pipe<T>;
182  explicit PipeSender(pipe_detail::Center<T>* center) : center_(center) {}
184 };
185 
186 // Receive end of a Pipe.
187 template <typename T>
189  public:
190  PipeReceiver(const PipeReceiver&) = delete;
191  PipeReceiver& operator=(const PipeReceiver&) = delete;
192 
193  PipeReceiver(PipeReceiver&& other) noexcept : center_(other.center_) {
194  other.center_ = nullptr;
195  }
196  PipeReceiver& operator=(PipeReceiver&& other) noexcept {
197  if (center_ != nullptr) center_->UnrefRecv();
198  center_ = other.center_;
199  other.center_ = nullptr;
200  return *this;
201  }
203  if (center_ != nullptr) center_->UnrefRecv();
204  }
205 
206  // Receive a single message from the pipe.
207  // Returns a promise that will resolve to an optional<T> - with a value if a
208  // message was received, or no value if the other end of the pipe was closed.
209  // Blocks the promise until the receiver is either closed or a message is
210  // available.
212 
213  private:
214  friend struct Pipe<T>;
215  explicit PipeReceiver(pipe_detail::Center<T>* center) : center_(center) {}
217 };
218 
219 namespace pipe_detail {
220 
221 // Implementation of PipeSender::Push promise.
222 template <typename T>
223 class Push {
224  public:
225  Push(const Push&) = delete;
226  Push& operator=(const Push&) = delete;
227  Push(Push&& other) noexcept
228  : center_(other.center_), push_(std::move(other.push_)) {
229  other.center_ = nullptr;
230  }
231  Push& operator=(Push&& other) noexcept {
232  if (center_ != nullptr) center_->UnrefSend();
233  center_ = other.center_;
234  other.center_ = nullptr;
235  push_ = std::move(other.push_);
236  return *this;
237  }
238 
239  ~Push() {
240  if (center_ != nullptr) center_->UnrefSend();
241  }
242 
243  Poll<bool> operator()() { return center_->Push(&push_); }
244 
245  private:
246  friend class PipeSender<T>;
247  explicit Push(pipe_detail::Center<T>* center, T push)
248  : center_(center), push_(std::move(push)) {}
251 };
252 
253 // Implementation of PipeReceiver::Next promise.
254 template <typename T>
255 class Next {
256  public:
257  Next(const Next&) = delete;
258  Next& operator=(const Next&) = delete;
259  Next(Next&& other) noexcept : center_(other.center_) {
260  other.center_ = nullptr;
261  }
262  Next& operator=(Next&& other) noexcept {
263  if (center_ != nullptr) center_->UnrefRecv();
264  center_ = other.center_;
265  other.center_ = nullptr;
266  return *this;
267  }
268 
269  ~Next() {
270  if (center_ != nullptr) center_->UnrefRecv();
271  }
272 
274 
275  private:
276  friend class PipeReceiver<T>;
277  explicit Next(pipe_detail::Center<T>* center) : center_(center) {}
279 };
280 
281 } // namespace pipe_detail
282 
283 template <typename T>
285  return pipe_detail::Push<T>(center_->RefSend(), std::move(value));
286 }
287 
288 template <typename T>
290  return pipe_detail::Next<T>(center_->RefRecv());
291 }
292 
293 // A Pipe is an intra-Activity communications channel that transmits T's from
294 // one end to the other.
295 // It is only safe to use a Pipe within the context of a single Activity.
296 // No synchronization is performed internally.
297 // The primary Pipe data structure is allocated from an arena, so the activity
298 // must have an arena as part of its context.
299 // By performing that allocation we can ensure stable pointer to shared data
300 // allowing PipeSender/PipeReceiver/Push/Next to be relatively simple in their
301 // implementation.
302 // This type has been optimized with the expectation that there are relatively
303 // few pipes per activity. If this assumption does not hold then a design
304 // allowing inline filtering of pipe contents (instead of connecting pipes with
305 // polling code) would likely be more appropriate.
306 template <typename T>
307 struct Pipe {
308  Pipe() : Pipe(GetContext<Arena>()->New<pipe_detail::Center<T>>()) {}
309  Pipe(const Pipe&) = delete;
310  Pipe& operator=(const Pipe&) = delete;
311  Pipe(Pipe&&) noexcept = default;
312  Pipe& operator=(Pipe&&) noexcept = default;
313 
316 
317  private:
318  explicit Pipe(pipe_detail::Center<T>* center)
319  : sender(center), receiver(center) {}
320 };
321 
322 } // namespace grpc_core
323 
324 #endif // GRPC_CORE_LIB_PROMISE_PIPE_H
grpc_core::pipe_detail::Push::Push
Push(pipe_detail::Center< T > *center, T push)
Definition: pipe.h:247
grpc_core::pipe_detail::Next::operator=
Next & operator=(const Next &)=delete
grpc_core::PipeSender::PipeSender
PipeSender(PipeSender &&other) noexcept
Definition: pipe.h:160
grpc_core::pipe_detail::Push
Definition: pipe.h:39
log.h
grpc_core::Pipe::Pipe
Pipe()
Definition: pipe.h:308
grpc_core::PipeReceiver::PipeReceiver
PipeReceiver(const PipeReceiver &)=delete
grpc_core::pipe_detail::Center
Definition: pipe.h:46
GPR_DEBUG_ASSERT
#define GPR_DEBUG_ASSERT(x)
Definition: include/grpc/impl/codegen/log.h:103
asyncio_get_stats.default
default
Definition: asyncio_get_stats.py:38
grpc_core::pipe_detail::Next::Next
Next(const Next &)=delete
grpc_core::PipeReceiver::PipeReceiver
PipeReceiver(PipeReceiver &&other) noexcept
Definition: pipe.h:193
grpc_core::PipeReceiver::Next
pipe_detail::Next< T > Next()
Definition: pipe.h:289
grpc_core::pipe_detail::Center::ResetValue
void ResetValue()
Definition: pipe.h:130
grpc_core::pipe_detail::Center::send_refs_
uint8_t send_refs_
Definition: pipe.h:140
grpc_core::pipe_detail::Center::RefSend
Center * RefSend()
Definition: pipe.h:57
grpc_core
Definition: call_metric_recorder.h:31
grpc_core::PipeSender::~PipeSender
~PipeSender()
Definition: pipe.h:170
grpc_core::pipe_detail::Push::center_
Center< T > * center_
Definition: pipe.h:249
grpc_core::PipeSender::operator=
PipeSender & operator=(PipeSender &&other) noexcept
Definition: pipe.h:163
grpc_core::PipeReceiver
Definition: pipe.h:188
grpc_core::pipe_detail::Push::~Push
~Push()
Definition: pipe.h:239
grpc_core::pipe_detail::Next::center_
Center< T > * center_
Definition: pipe.h:278
arena.h
New
T * New(Args &&... args)
Definition: third_party/boringssl-with-bazel/src/ssl/internal.h:195
grpc_core::PipeReceiver::~PipeReceiver
~PipeReceiver()
Definition: pipe.h:202
grpc_core::IntraActivityWaiter
Definition: intra_activity_waiter.h:27
grpc_core::pipe_detail::Next::Next
Next(Next &&other) noexcept
Definition: pipe.h:259
grpc_core::pipe_detail::Push::Push
Push(Push &&other) noexcept
Definition: pipe.h:227
uint8_t
unsigned char uint8_t
Definition: stdint-msvc2008.h:78
grpc_core::Arena
Definition: src/core/lib/resource_quota/arena.h:45
grpc_core::pipe_detail::Push::push_
T push_
Definition: pipe.h:250
grpc_core::GetContext
T * GetContext()
Definition: core/lib/promise/context.h:74
T
#define T(upbtypeconst, upbtype, ctype, default_value)
grpc_core::Pipe::receiver
PipeReceiver< T > receiver
Definition: pipe.h:315
grpc_core::pipe_detail::Push::operator=
Push & operator=(Push &&other) noexcept
Definition: pipe.h:231
absl::move
constexpr absl::remove_reference_t< T > && move(T &&t) noexcept
Definition: abseil-cpp/absl/utility/utility.h:221
grpc_core::pipe_detail::Push::operator=
Push & operator=(const Push &)=delete
grpc_core::pipe_detail::Center::value_
T value_
Definition: pipe.h:136
grpc_core::pipe_detail::Next::operator()
Poll< absl::optional< T > > operator()()
Definition: pipe.h:273
grpc_core::Pipe::sender
PipeSender< T > sender
Definition: pipe.h:314
grpc_core::pipe_detail::Next
Definition: pipe.h:41
intra_activity_waiter.h
grpc_core::pipe_detail::Center::UnrefRecv
void UnrefRecv()
Definition: pipe.h:86
grpc_core::PipeSender
Definition: pipe.h:155
grpc_core::pipe_detail::Push::Push
Push(const Push &)=delete
grpc_core::pipe_detail::Next::operator=
Next & operator=(Next &&other) noexcept
Definition: pipe.h:262
grpc_core::PipeReceiver::operator=
PipeReceiver & operator=(PipeReceiver &&other) noexcept
Definition: pipe.h:196
grpc_core::PipeSender::center_
pipe_detail::Center< T > * center_
Definition: pipe.h:183
stdint.h
grpc_core::Pipe::operator=
Pipe & operator=(const Pipe &)=delete
grpc_core::PipeSender::operator=
PipeSender & operator=(const PipeSender &)=delete
push
int push(void *desc, unsigned char *buf, unsigned len)
Definition: bloaty/third_party/zlib/test/infcover.c:463
grpc_core::pipe_detail::Next::~Next
~Next()
Definition: pipe.h:269
value
const char * value
Definition: hpack_parser_table.cc:165
grpc_core::PipeSender::PipeSender
PipeSender(pipe_detail::Center< T > *center)
Definition: pipe.h:182
grpc_core::pipe_detail::Center::recv_refs_
uint8_t recv_refs_
Definition: pipe.h:144
grpc_core::pipe_detail::Center::Center
Center()
Definition: pipe.h:50
grpc_core::pipe_detail::Center::Push
Poll< bool > Push(T *value)
Definition: pipe.h:104
grpc_core::pipe_detail::Center::on_full_
IntraActivityWaiter on_full_
Definition: pipe.h:148
grpc_core::Pipe
Definition: pipe.h:34
grpc_core::pipe_detail::Center::has_value_
bool has_value_
Definition: pipe.h:146
grpc_core::PipeReceiver::PipeReceiver
PipeReceiver(pipe_detail::Center< T > *center)
Definition: pipe.h:215
poll.h
grpc_core::PipeReceiver::operator=
PipeReceiver & operator=(const PipeReceiver &)=delete
grpc_core::IntraActivityWaiter::pending
Pending pending()
Definition: intra_activity_waiter.h:31
std
Definition: grpcpp/impl/codegen/async_unary_call.h:407
grpc_core::IntraActivityWaiter::Wake
void Wake()
Definition: intra_activity_waiter.h:36
grpc_core::PipeReceiver::center_
pipe_detail::Center< T > * center_
Definition: pipe.h:216
grpc_core::pipe_detail::Center::Next
Poll< absl::optional< T > > Next()
Definition: pipe.h:118
context.h
grpc_core::pipe_detail::Center::on_empty_
IntraActivityWaiter on_empty_
Definition: pipe.h:147
grpc_core::pipe_detail::Center::UnrefSend
void UnrefSend()
Definition: pipe.h:71
grpc_core::PipeSender::PipeSender
PipeSender(const PipeSender &)=delete
grpc_core::pipe_detail::Push::operator()
Poll< bool > operator()()
Definition: pipe.h:243
grpc_core::pipe_detail::Next::Next
Next(pipe_detail::Center< T > *center)
Definition: pipe.h:277
grpc_core::pipe_detail::Center::RefRecv
Center * RefRecv()
Definition: pipe.h:63
absl::variant
Definition: abseil-cpp/absl/types/internal/variant.h:46
grpc_core::PipeSender::Push
pipe_detail::Push< T > Push(T value)
Definition: pipe.h:284
port_platform.h


grpc
Author(s):
autogenerated on Thu Mar 13 2025 03:00:52