ev_apple.cc
Go to the documentation of this file.
1 /*
2  *
3  * Copyright 2020 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
25 
27 
29 
30 #ifdef GRPC_APPLE_EV
31 
32 #include <CoreFoundation/CoreFoundation.h>
33 
34 #include <list>
35 
36 #include "absl/time/time.h"
37 
38 #include "src/core/lib/gprpp/thd.h"
41 
42 grpc_core::DebugOnlyTraceFlag grpc_apple_polling_trace(false, "apple_polling");
43 
44 #ifndef NDEBUG
45 #define GRPC_POLLING_TRACE(format, ...) \
46  if (GRPC_TRACE_FLAG_ENABLED(grpc_apple_polling_trace)) { \
47  gpr_log(GPR_DEBUG, "(polling) " format, __VA_ARGS__); \
48  }
49 #else
50 #define GRPC_POLLING_TRACE(...)
51 #endif // NDEBUG
52 
53 #define GRPC_POLLSET_KICK_BROADCAST ((grpc_pollset_worker*)1)
54 
55 struct GlobalRunLoopContext {
56  grpc_core::CondVar init_cv;
57  grpc_core::CondVar input_source_cv;
58 
60 
61  // Whether an input source registration is pending. Protected by mu.
62  bool input_source_registered = false;
63 
64  // The reference to the global run loop object. Protected by mu.
65  CFRunLoopRef run_loop;
66 
67  // Whether the pollset has been globally shut down. Protected by mu.
68  bool is_shutdown = false;
69 };
70 
71 struct GrpcAppleWorker {
72  // The condition varible to kick the worker. Works with the pollset's lock
73  // (GrpcApplePollset.mu).
75 
76  // Whether the worker is kicked. Protected by the pollset's lock
77  // (GrpcApplePollset.mu).
78  bool kicked = false;
79 };
80 
81 struct GrpcApplePollset {
83 
84  // Tracks the current workers in the pollset. Protected by mu.
85  std::list<GrpcAppleWorker*> workers;
86 
87  // Whether the pollset is shut down. Protected by mu.
88  bool is_shutdown = false;
89 
90  // Closure to call when shutdown is done. Protected by mu.
91  grpc_closure* shutdown_closure;
92 
93  // Whether there's an outstanding kick that was not processed. Protected by
94  // mu.
95  bool kicked_without_poller = false;
96 };
97 
98 static GlobalRunLoopContext* gGlobalRunLoopContext = nullptr;
99 static grpc_core::Thread* gGlobalRunLoopThread = nullptr;
100 
104 static void grpc_apple_register_read_stream_queue(
105  CFReadStreamRef read_stream, dispatch_queue_t dispatch_queue) {
106  CFReadStreamSetDispatchQueue(read_stream, dispatch_queue);
107 }
108 
112 static void grpc_apple_register_write_stream_queue(
113  CFWriteStreamRef write_stream, dispatch_queue_t dispatch_queue) {
114  CFWriteStreamSetDispatchQueue(write_stream, dispatch_queue);
115 }
116 
120 static void grpc_apple_register_read_stream_run_loop(
121  CFReadStreamRef read_stream, dispatch_queue_t dispatch_queue) {
122  GRPC_POLLING_TRACE("Register read stream: %p", read_stream);
123  grpc_core::MutexLock lock(&gGlobalRunLoopContext->mu);
124  CFReadStreamScheduleWithRunLoop(read_stream, gGlobalRunLoopContext->run_loop,
125  kCFRunLoopDefaultMode);
126  gGlobalRunLoopContext->input_source_registered = true;
127  gGlobalRunLoopContext->input_source_cv.Signal();
128 }
129 
133 static void grpc_apple_register_write_stream_run_loop(
134  CFWriteStreamRef write_stream, dispatch_queue_t dispatch_queue) {
135  GRPC_POLLING_TRACE("Register write stream: %p", write_stream);
136  grpc_core::MutexLock lock(&gGlobalRunLoopContext->mu);
137  CFWriteStreamScheduleWithRunLoop(
138  write_stream, gGlobalRunLoopContext->run_loop, kCFRunLoopDefaultMode);
139  gGlobalRunLoopContext->input_source_registered = true;
140  gGlobalRunLoopContext->input_source_cv.Signal();
141 }
142 
148 static void (*grpc_apple_register_read_stream_impl)(
149  CFReadStreamRef, dispatch_queue_t) = grpc_apple_register_read_stream_queue;
150 static void (*grpc_apple_register_write_stream_impl)(CFWriteStreamRef,
151  dispatch_queue_t) =
152  grpc_apple_register_write_stream_queue;
153 
154 void grpc_apple_register_read_stream(CFReadStreamRef read_stream,
155  dispatch_queue_t dispatch_queue) {
156  grpc_apple_register_read_stream_impl(read_stream, dispatch_queue);
157 }
158 
159 void grpc_apple_register_write_stream(CFWriteStreamRef write_stream,
160  dispatch_queue_t dispatch_queue) {
161  grpc_apple_register_write_stream_impl(write_stream, dispatch_queue);
162 }
163 
166 static void GlobalRunLoopFunc(void* arg) {
167  grpc_core::LockableAndReleasableMutexLock lock(&gGlobalRunLoopContext->mu);
168  gGlobalRunLoopContext->run_loop = CFRunLoopGetCurrent();
169  gGlobalRunLoopContext->init_cv.Signal();
170 
171  while (!gGlobalRunLoopContext->is_shutdown) {
172  // CFRunLoopRun() will return immediately if no stream is registered on it.
173  // So we wait on a conditional variable until a stream is registered;
174  // otherwise we'll be running a spinning loop.
175  while (!gGlobalRunLoopContext->input_source_registered) {
176  gGlobalRunLoopContext->input_source_cv.Wait(&gGlobalRunLoopContext->mu);
177  }
178  gGlobalRunLoopContext->input_source_registered = false;
179  lock.Release();
180  CFRunLoopRun();
181  lock.Lock();
182  }
183  lock.Release();
184 }
185 
186 // pollset implementation
187 
188 static void pollset_global_init(void) {
189  gGlobalRunLoopContext = new GlobalRunLoopContext;
190 
191  grpc_apple_register_read_stream_impl =
192  grpc_apple_register_read_stream_run_loop;
193  grpc_apple_register_write_stream_impl =
194  grpc_apple_register_write_stream_run_loop;
195 
196  grpc_core::MutexLock lock(&gGlobalRunLoopContext->mu);
197  gGlobalRunLoopThread =
198  new grpc_core::Thread("apple_ev", GlobalRunLoopFunc, nullptr);
199  gGlobalRunLoopThread->Start();
200  while (gGlobalRunLoopContext->run_loop == NULL)
201  gGlobalRunLoopContext->init_cv.Wait(&gGlobalRunLoopContext->mu);
202 }
203 
204 static void pollset_global_shutdown(void) {
205  {
206  grpc_core::MutexLock lock(&gGlobalRunLoopContext->mu);
207  gGlobalRunLoopContext->is_shutdown = true;
208  CFRunLoopStop(gGlobalRunLoopContext->run_loop);
209  }
210  gGlobalRunLoopThread->Join();
211  delete gGlobalRunLoopThread;
212  delete gGlobalRunLoopContext;
213 }
214 
224  grpc_core::Timestamp deadline) {
225  GRPC_POLLING_TRACE("pollset work: %p, worker: %p, deadline: %" PRIu64,
226  pollset, worker,
228  GrpcApplePollset* apple_pollset =
229  reinterpret_cast<GrpcApplePollset*>(pollset);
230  GrpcAppleWorker actual_worker;
231  if (worker) {
232  *worker = reinterpret_cast<grpc_pollset_worker*>(&actual_worker);
233  }
234 
235  if (apple_pollset->kicked_without_poller) {
236  // Process the outstanding kick and reset the flag. Do not block.
237  apple_pollset->kicked_without_poller = false;
238  } else {
239  // Block until kicked, timed out, or the pollset shuts down.
240  apple_pollset->workers.push_front(&actual_worker);
241  auto it = apple_pollset->workers.begin();
242 
243  while (!actual_worker.kicked && !apple_pollset->is_shutdown) {
244  if (actual_worker.cv.WaitWithDeadline(
245  &apple_pollset->mu, grpc_core::ToAbslTime(deadline.as_timespec(
246  GPR_CLOCK_REALTIME)))) {
247  // timed out
248  break;
249  }
250  }
251 
252  apple_pollset->workers.erase(it);
253 
254  // If the pollset is shut down asynchronously and this is the last pending
255  // worker, the shutdown process is complete at this moment and the shutdown
256  // callback will be called.
257  if (apple_pollset->is_shutdown && apple_pollset->workers.empty()) {
258  grpc_core::ExecCtx::Run(DEBUG_LOCATION, apple_pollset->shutdown_closure,
260  }
261  }
262 
263  return GRPC_ERROR_NONE;
264 }
265 
268 static void kick_worker(GrpcAppleWorker* worker) {
269  worker->kicked = true;
270  worker->cv.Signal();
271 }
272 
277  grpc_pollset_worker* specific_worker) {
278  GrpcApplePollset* apple_pollset =
279  reinterpret_cast<GrpcApplePollset*>(pollset);
280 
281  GRPC_POLLING_TRACE("pollset kick: %p, worker:%p", pollset, specific_worker);
282 
283  if (specific_worker == nullptr) {
284  if (apple_pollset->workers.empty()) {
285  apple_pollset->kicked_without_poller = true;
286  } else {
287  GrpcAppleWorker* actual_worker = apple_pollset->workers.front();
288  kick_worker(actual_worker);
289  }
290  } else if (specific_worker == GRPC_POLLSET_KICK_BROADCAST) {
291  for (auto& actual_worker : apple_pollset->workers) {
292  kick_worker(actual_worker);
293  }
294  } else {
295  GrpcAppleWorker* actual_worker =
296  reinterpret_cast<GrpcAppleWorker*>(specific_worker);
297  kick_worker(actual_worker);
298  }
299 
300  return GRPC_ERROR_NONE;
301 }
302 
303 static void pollset_init(grpc_pollset* pollset, gpr_mu** mu) {
304  GRPC_POLLING_TRACE("pollset init: %p", pollset);
305  GrpcApplePollset* apple_pollset = new (pollset) GrpcApplePollset();
306  *mu = grpc_core::GetUnderlyingGprMu(&apple_pollset->mu);
307 }
308 
311 static void pollset_shutdown(grpc_pollset* pollset, grpc_closure* closure) {
312  GRPC_POLLING_TRACE("pollset shutdown: %p", pollset);
313 
314  GrpcApplePollset* apple_pollset =
315  reinterpret_cast<GrpcApplePollset*>(pollset);
316  apple_pollset->is_shutdown = true;
317  (void)pollset_kick(pollset, GRPC_POLLSET_KICK_BROADCAST);
318 
319  // If there is any worker blocked, shutdown will be done asynchronously.
320  if (apple_pollset->workers.empty()) {
322  } else {
323  apple_pollset->shutdown_closure = closure;
324  }
325 }
326 
327 static void pollset_destroy(grpc_pollset* pollset) {
328  GRPC_POLLING_TRACE("pollset destroy: %p", pollset);
329  GrpcApplePollset* apple_pollset =
330  reinterpret_cast<GrpcApplePollset*>(pollset);
331  apple_pollset->~GrpcApplePollset();
332 }
333 
334 size_t pollset_size(void) { return sizeof(GrpcApplePollset); }
335 
336 grpc_pollset_vtable grpc_apple_pollset_vtable = {
337  pollset_global_init, pollset_global_shutdown,
340  pollset_kick, pollset_size};
341 
342 // pollset_set implementation
343 
344 grpc_pollset_set* pollset_set_create(void) { return nullptr; }
345 void pollset_set_destroy(grpc_pollset_set* pollset_set) {}
346 void pollset_set_add_pollset(grpc_pollset_set* pollset_set,
347  grpc_pollset* pollset) {}
348 void pollset_set_del_pollset(grpc_pollset_set* pollset_set,
349  grpc_pollset* pollset) {}
350 void pollset_set_add_pollset_set(grpc_pollset_set* bag,
351  grpc_pollset_set* item) {}
352 void pollset_set_del_pollset_set(grpc_pollset_set* bag,
353  grpc_pollset_set* item) {}
354 
355 grpc_pollset_set_vtable grpc_apple_pollset_set_vtable = {
356  pollset_set_create, pollset_set_destroy,
357  pollset_set_add_pollset, pollset_set_del_pollset,
358  pollset_set_add_pollset_set, pollset_set_del_pollset_set};
359 
360 #endif
grpc_pollset_worker
struct grpc_pollset_worker grpc_pollset_worker
Definition: pollset.h:39
grpc_pollset_vtable
Definition: pollset.h:41
grpc_core::CondVar
Definition: src/core/lib/gprpp/sync.h:126
regen-readme.it
it
Definition: regen-readme.py:15
GRPC_ERROR_NONE
#define GRPC_ERROR_NONE
Definition: error.h:234
grpc::testing::pollset_work
static grpc_error_handle pollset_work(grpc_pollset *ps, grpc_pollset_worker **, grpc_core::Timestamp deadline)
Definition: bm_cq_multiple_threads.cc:73
grpc_core::ToAbslTime
absl::Time ToAbslTime(gpr_timespec ts)
Definition: src/core/lib/gprpp/time_util.cc:68
grpc_core::MutexLock
Definition: src/core/lib/gprpp/sync.h:88
grpc_pollset_set
struct grpc_pollset_set grpc_pollset_set
Definition: iomgr_fwd.h:23
grpc_core::Timestamp
Definition: src/core/lib/gprpp/time.h:62
time_util.h
grpc_pollset_set_vtable
Definition: pollset_set.h:32
DEBUG_LOCATION
#define DEBUG_LOCATION
Definition: debug_location.h:41
grpc_core::LockableAndReleasableMutexLock
Definition: src/core/lib/gprpp/sync.h:165
mu
Mutex mu
Definition: server_config_selector_filter.cc:74
worker
Definition: worker.py:1
closure
grpc_closure closure
Definition: src/core/lib/surface/server.cc:466
grpc_core::Timestamp::milliseconds_after_process_epoch
uint64_t milliseconds_after_process_epoch() const
Definition: src/core/lib/gprpp/time.h:109
arg
Definition: cmdline.cc:40
grpc_core::Thread::Join
void Join()
Definition: thd.h:141
grpc_core::Thread::Start
void Start()
Definition: thd.h:125
grpc_core::TraceFlag
Definition: debug/trace.h:63
grpc_core::Mutex
Definition: src/core/lib/gprpp/sync.h:61
cv
unsigned cv
Definition: cxa_demangle.cpp:4908
gpr_mu
pthread_mutex_t gpr_mu
Definition: impl/codegen/sync_posix.h:47
port.h
grpc::testing::pollset_destroy
static void pollset_destroy(grpc_pollset *ps)
Definition: bm_cq_multiple_threads.cc:59
thd.h
grpc_core::GetUnderlyingGprMu
gpr_mu * GetUnderlyingGprMu(Mutex *mutex)
Definition: src/core/lib/gprpp/sync.h:86
closure
Definition: proxy.cc:59
grpc::testing::pollset_shutdown
static void pollset_shutdown(grpc_pollset *, grpc_closure *closure)
Definition: bm_cq_multiple_threads.cc:50
grpc_core::Thread
Definition: thd.h:43
grpc_core::ExecCtx::Run
static void Run(const DebugLocation &location, grpc_closure *closure, grpc_error_handle error)
Definition: exec_ctx.cc:98
workers
struct child_worker * workers
grpc_error
Definition: error_internal.h:42
grpc::testing::pollset_kick
static grpc_error_handle pollset_kick(grpc_pollset *, grpc_pollset_worker *)
Definition: bm_cq_multiple_threads.cc:61
GPR_CLOCK_REALTIME
@ GPR_CLOCK_REALTIME
Definition: gpr_types.h:39
grpc_pollset
Definition: bm_cq_multiple_threads.cc:37
grpc_closure
Definition: closure.h:56
ev_apple.h
grpc_core::Timestamp::as_timespec
gpr_timespec as_timespec(gpr_clock_type type) const
Definition: src/core/lib/gprpp/time.cc:157
port_platform.h
grpc::testing::pollset_init
static void pollset_init(grpc_pollset *ps, gpr_mu **mu)
Definition: bm_cq_multiple_threads.cc:54


grpc
Author(s):
autogenerated on Fri May 16 2025 02:58:19