observable.h
Go to the documentation of this file.
1 // Copyright 2021 gRPC authors.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #ifndef GRPC_CORE_LIB_PROMISE_OBSERVABLE_H
16 #define GRPC_CORE_LIB_PROMISE_OBSERVABLE_H
17 
19 
20 #include <stdint.h>
21 
22 #include <limits>
23 #include <memory>
24 #include <type_traits>
25 #include <utility>
26 
27 #include "absl/base/thread_annotations.h"
28 #include "absl/types/optional.h"
29 #include "absl/types/variant.h"
30 
36 
37 namespace grpc_core {
38 
39 namespace promise_detail {
40 
44 
45 } // namespace promise_detail
46 
48  public:
50 
51  protected:
53 };
54 
55 namespace promise_detail {
56 
57 // Shared state between Observable and Observer.
58 template <typename T>
60  public:
62  : value_(std::move(value)) {}
63 
64  // Publish that we're closed.
65  void Close() {
66  mu_.Lock();
68  value_.reset();
69  auto wakeup = waiters_.TakeWakeupSet();
70  mu_.Unlock();
71  wakeup.Wakeup();
72  }
73 
74  // Synchronously publish a new value, and wake any waiters.
75  void Push(T value) {
76  mu_.Lock();
77  version_++;
79  auto wakeup = waiters_.TakeWakeupSet();
80  mu_.Unlock();
81  wakeup.Wakeup();
82  }
83 
85  MutexLock lock(&mu_);
86  if (!Started()) return Pending();
87  *version_seen = version_;
88  return value_;
89  }
90 
92  MutexLock lock(&mu_);
93  if (!NextValueReady(version_seen)) return Pending();
94  return value_;
95  }
96 
98  if (*version_seen == kTombstoneVersion) return Pending();
99 
100  MutexLock lock(&mu_);
101  if (!NextValueReady(version_seen)) return Pending();
102  // Watch needs to be woken up if the value changes even if it's ready now.
103  waiters_.AddPending(Activity::current()->MakeNonOwningWaker());
104  return value_;
105  }
106 
107  private:
108  // Returns true if an initial value is set.
109  // If one is not set, add ourselves as pending to waiters_, and return false.
111  if (!value_.has_value()) {
112  if (version_ != kTombstoneVersion) {
113  // We allow initial no-value, which does not indicate closure.
114  waiters_.AddPending(Activity::current()->MakeNonOwningWaker());
115  return false;
116  }
117  }
118  return true;
119  }
120 
121  // If no value is ready, add ourselves as pending to waiters_ and return
122  // false.
123  // If the next value is ready, update the last version seen and return true.
124  bool NextValueReady(ObservableVersion* version_seen)
126  if (!Started()) return false;
127  if (version_ == *version_seen) {
128  waiters_.AddPending(Activity::current()->MakeNonOwningWaker());
129  return false;
130  }
131  *version_seen = version_;
132  return true;
133  }
134 
136  WaitSet waiters_ ABSL_GUARDED_BY(mu_);
139 };
140 
141 // Promise implementation for Observer::Get.
142 template <typename T>
144  public:
146  : version_seen_(version_seen), state_(state) {}
147 
149  return state_->PollGet(version_seen_);
150  }
151 
152  private:
155 };
156 
157 // Promise implementation for Observer::Next.
158 template <typename T>
160  public:
162  : version_seen_(version_seen), state_(state) {}
163 
165  return state_->PollNext(version_seen_);
166  }
167 
168  private:
171 };
172 
173 template <typename T, typename F>
174 class ObservableWatch final : private WatchCommitter {
175  private:
177  std::declval<T>(), std::declval<WatchCommitter*>()))>;
178  using Result = typename Promise::Result;
179 
180  public:
181  explicit ObservableWatch(F factory, std::shared_ptr<ObservableState<T>> state)
182  : state_(std::move(state)), factory_(std::move(factory)) {}
183  ObservableWatch(const ObservableWatch&) = delete;
184  ObservableWatch& operator=(const ObservableWatch&) = delete;
186  : state_(std::move(other.state_)),
187  promise_(std::move(other.promise_)),
188  factory_(std::move(other.factory_)) {}
190 
191  Poll<Result> operator()() {
192  auto r = state_->PollWatch(&version_seen_);
193  if (auto* p = absl::get_if<kPollReadyIdx>(&r)) {
194  if (p->has_value()) {
195  promise_ = Promise(factory_(std::move(**p), this));
196  } else {
197  promise_ = {};
198  }
199  }
200  if (promise_.has_value()) {
201  return (*promise_)();
202  } else {
203  return Pending();
204  }
205  }
206 
207  private:
208  std::shared_ptr<ObservableState<T>> state_;
211 };
212 
213 } // namespace promise_detail
214 
215 template <typename T>
217 
218 // Observer watches an Observable for updates.
219 // It can see either the latest value or wait for a new value, but is not
220 // guaranteed to see every value pushed to the Observable.
221 template <typename T>
222 class Observer {
223  public:
224  Observer(const Observer&) = delete;
225  Observer& operator=(const Observer&) = delete;
226  Observer(Observer&& other) noexcept
227  : version_seen_(other.version_seen_), state_(std::move(other.state_)) {}
228  Observer& operator=(Observer&& other) noexcept {
229  version_seen_ = other.version_seen_;
230  state_ = std::move(other.state_);
231  return *this;
232  }
233 
234  // Return a promise that will produce an optional<T>.
235  // If the Observable is still present, this will be a value T, but if the
236  // Observable has been closed, this will be nullopt. Borrows data from the
237  // Observer, so this value must stay valid until the promise is resolved. Only
238  // one Next, Get call is allowed to be outstanding at a time.
241  }
242 
243  // Return a promise that will produce the next unseen value as an optional<T>.
244  // If the Observable is still present, this will be a value T, but if the
245  // Observable has been closed, this will be nullopt. Borrows data from the
246  // Observer, so this value must stay valid until the promise is resolved. Only
247  // one Next, Get call is allowed to be outstanding at a time.
250  }
251 
252  private:
254  friend class Observable<T>;
255  explicit Observer(std::shared_ptr<State> state) : state_(state) {}
257  std::shared_ptr<State> state_;
258 };
259 
260 // Observable models a single writer multiple reader broadcast channel.
261 // Readers can observe the latest value, or await a new latest value, but they
262 // are not guaranteed to observe every value.
263 template <typename T>
264 class Observable {
265  public:
266  Observable() : state_(std::make_shared<State>(absl::nullopt)) {}
267  explicit Observable(T value)
268  : state_(std::make_shared<State>(std::move(value))) {}
269  ~Observable() { state_->Close(); }
270  Observable(const Observable&) = delete;
271  Observable& operator=(const Observable&) = delete;
272 
273  // Push a new value into the observable.
274  void Push(T value) { state_->Push(std::move(value)); }
275 
276  // Create a new Observer - which can pull the current state from this
277  // Observable.
279 
280  // Create a new Watch - a promise that pushes state into the passed in promise
281  // factory. The promise factory takes two parameters - the current value and a
282  // commit token. If the commit token is used (the Commit function on it is
283  // called), then no further Watch updates are provided.
284  template <typename F>
287  }
288 
289  private:
291  std::shared_ptr<State> state_;
292 };
293 
294 } // namespace grpc_core
295 
296 #endif // GRPC_CORE_LIB_PROMISE_OBSERVABLE_H
grpc_core::promise_detail::ObservableState::Close
void Close()
Definition: observable.h:65
grpc_core::promise_detail::ObservableState::PollGet
Poll< absl::optional< T > > PollGet(ObservableVersion *version_seen)
Definition: observable.h:84
grpc_core::Observer::Next
promise_detail::ObservableNext< T > Next()
Definition: observable.h:248
grpc_core::promise_detail::ObservableState::ABSL_GUARDED_BY
WaitSet waiters_ ABSL_GUARDED_BY(mu_)
grpc_core::promise_detail::ObservableWatch::ObservableWatch
ObservableWatch(ObservableWatch &&other) noexcept
Definition: observable.h:185
asyncio_get_stats.default
default
Definition: asyncio_get_stats.py:38
grpc_core::Observer::Observer
Observer(std::shared_ptr< State > state)
Definition: observable.h:255
grpc_core
Definition: call_metric_recorder.h:31
grpc_core::Activity::current
static Activity * current()
Definition: activity.h:124
grpc_core::MutexLock
Definition: src/core/lib/gprpp/sync.h:88
grpc_core::promise_detail::ObservableNext::state_
ObservableState< T > * state_
Definition: observable.h:170
grpc_core::promise_detail::ObservableGet::version_seen_
ObservableVersion * version_seen_
Definition: observable.h:153
grpc_core::Observable::MakeObserver
Observer< T > MakeObserver()
Definition: observable.h:278
grpc_core::Observer::operator=
Observer & operator=(const Observer &)=delete
grpc_core::WatchCommitter::Commit
void Commit()
Definition: observable.h:49
grpc_core::Observable::operator=
Observable & operator=(const Observable &)=delete
grpc_core::promise_detail::ObservableWatch::Result
typename Promise::Result Result
Definition: observable.h:178
grpc_core::promise_detail::ObservableWatch::factory_
F factory_
Definition: observable.h:210
grpc_core::promise_detail::ObservableState
Definition: observable.h:59
T
#define T(upbtypeconst, upbtype, ctype, default_value)
grpc_core::promise_detail::ObservableWatch::Promise
PromiseLike< decltype(std::declval< F >()(std::declval< T >(), std::declval< WatchCommitter * >()))> Promise
Definition: observable.h:177
grpc_core::promise_detail::ObservableState::PollNext
Poll< absl::optional< T > > PollNext(ObservableVersion *version_seen)
Definition: observable.h:91
grpc_core::promise_detail::ObservableNext::ObservableNext
ObservableNext(ObservableVersion *version_seen, ObservableState< T > *state)
Definition: observable.h:161
grpc_core::Pending
Definition: poll.h:29
wait_set.h
grpc_core::promise_detail::ObservableNext::version_seen_
ObservableVersion * version_seen_
Definition: observable.h:169
grpc_core::Observable::Watch
promise_detail::ObservableWatch< T, F > Watch(F f)
Definition: observable.h:285
version_
std::string version_
Definition: abseil-cpp/absl/time/internal/cctz/src/time_zone_info.cc:666
grpc_core::WatchCommitter
Definition: observable.h:47
absl::move
constexpr absl::remove_reference_t< T > && move(T &&t) noexcept
Definition: abseil-cpp/absl/utility/utility.h:221
grpc_core::Mutex::Lock
void Lock() ABSL_EXCLUSIVE_LOCK_FUNCTION()
Definition: src/core/lib/gprpp/sync.h:69
max
int max
Definition: bloaty/third_party/zlib/examples/enough.c:170
grpc_core::promise_detail::ObservableGet::ObservableGet
ObservableGet(ObservableVersion *version_seen, ObservableState< T > *state)
Definition: observable.h:145
grpc_core::Observer
Definition: observable.h:222
grpc_core::WaitSet
Definition: wait_set.h:40
ABSL_EXCLUSIVE_LOCKS_REQUIRED
#define ABSL_EXCLUSIVE_LOCKS_REQUIRED(...)
Definition: abseil-cpp/absl/base/thread_annotations.h:145
grpc_core::Observable::~Observable
~Observable()
Definition: observable.h:269
uint64_t
unsigned __int64 uint64_t
Definition: stdint-msvc2008.h:90
absl::optional
Definition: abseil-cpp/absl/types/internal/optional.h:61
grpc_core::Observer::version_seen_
promise_detail::ObservableVersion version_seen_
Definition: observable.h:256
grpc_core::Observable::Observable
Observable(T value)
Definition: observable.h:267
grpc_core::promise_detail::kTombstoneVersion
static constexpr ObservableVersion kTombstoneVersion
Definition: observable.h:42
grpc_core::promise_detail::ObservableGet::state_
ObservableState< T > * state_
Definition: observable.h:154
F
#define F(b, c, d)
Definition: md4.c:112
grpc_core::Observable::Observable
Observable()
Definition: observable.h:266
stdint.h
grpc_core::promise_detail::ObservableWatch::promise_
absl::optional< Promise > promise_
Definition: observable.h:209
grpc_core::promise_detail::ObservableState::Push
void Push(T value)
Definition: observable.h:75
grpc_core::promise_detail::ObservableGet
Definition: observable.h:143
grpc_core::promise_detail::ObservableWatch::ObservableWatch
ObservableWatch(F factory, std::shared_ptr< ObservableState< T >> state)
Definition: observable.h:181
value_
int value_
Definition: orphanable_test.cc:38
grpc_core::promise_detail::PromiseLike::Result
typename PollTraits< decltype(WrapInPoll(f_()))>::Type Result
Definition: promise_like.h:75
value
const char * value
Definition: hpack_parser_table.cc:165
grpc_core::Mutex
Definition: src/core/lib/gprpp/sync.h:61
grpc_core::Mutex::Unlock
void Unlock() ABSL_UNLOCK_FUNCTION()
Definition: src/core/lib/gprpp/sync.h:70
grpc_core::promise_detail::ObservableState::ObservableState
ObservableState(absl::optional< T > value)
Definition: observable.h:61
promise_like.h
grpc_core::Observable::state_
std::shared_ptr< State > state_
Definition: observable.h:291
grpc_core::promise_detail::ObservableState::NextValueReady
bool NextValueReady(ObservableVersion *version_seen) ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_)
Definition: observable.h:124
grpc_core::promise_detail::ObservableVersion
uint64_t ObservableVersion
Definition: observable.h:41
poll.h
grpc_core::promise_detail::ObservableWatch
Definition: observable.h:174
grpc_core::promise_detail::ObservableNext::operator()
Poll< absl::optional< T > > operator()()
Definition: observable.h:164
grpc_core::WatchCommitter::version_seen_
promise_detail::ObservableVersion version_seen_
Definition: observable.h:52
fix_build_deps.r
r
Definition: fix_build_deps.py:491
std
Definition: grpcpp/impl/codegen/async_unary_call.h:407
grpc_core::Observer::Observer
Observer(Observer &&other) noexcept
Definition: observable.h:226
grpc_core::Observer::Get
promise_detail::ObservableGet< T > Get()
Definition: observable.h:239
grpc_core::promise_detail::ObservableNext
Definition: observable.h:159
state
Definition: bloaty/third_party/zlib/contrib/blast/blast.c:41
grpc_core::Observer::state_
std::shared_ptr< State > state_
Definition: observable.h:257
grpc_core::promise_detail::ObservableState::mu_
Mutex mu_
Definition: observable.h:135
absl
Definition: abseil-cpp/absl/algorithm/algorithm.h:31
grpc_core::Observable::Push
void Push(T value)
Definition: observable.h:274
grpc_core::promise_detail::ObservableGet::operator()
Poll< absl::optional< T > > operator()()
Definition: observable.h:148
grpc_core::Observable
Definition: observable.h:216
activity.h
absl::variant
Definition: abseil-cpp/absl/types/internal/variant.h:46
grpc_core::promise_detail::PromiseLike
Definition: promise_like.h:67
grpc_core::promise_detail::ObservableState::Started
bool Started() ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_)
Definition: observable.h:110
grpc_core::Observer::operator=
Observer & operator=(Observer &&other) noexcept
Definition: observable.h:228
sync.h
grpc_core::promise_detail::ObservableWatch::state_
std::shared_ptr< ObservableState< T > > state_
Definition: observable.h:208
grpc_core::Observer::Observer
Observer(const Observer &)=delete
port_platform.h
grpc_core::promise_detail::ObservableWatch::operator=
ObservableWatch & operator=(const ObservableWatch &)=delete
grpc_core::promise_detail::ObservableState::PollWatch
Poll< absl::optional< T > > PollWatch(ObservableVersion *version_seen)
Definition: observable.h:97


grpc
Author(s):
autogenerated on Thu Mar 13 2025 03:00:45