32 #include <CoreFoundation/CoreFoundation.h>
36 #include "absl/time/time.h"
45 #define GRPC_POLLING_TRACE(format, ...) \
46 if (GRPC_TRACE_FLAG_ENABLED(grpc_apple_polling_trace)) { \
47 gpr_log(GPR_DEBUG, "(polling) " format, __VA_ARGS__); \
50 #define GRPC_POLLING_TRACE(...)
53 #define GRPC_POLLSET_KICK_BROADCAST ((grpc_pollset_worker*)1)
55 struct GlobalRunLoopContext {
62 bool input_source_registered =
false;
65 CFRunLoopRef run_loop;
68 bool is_shutdown =
false;
71 struct GrpcAppleWorker {
81 struct GrpcApplePollset {
85 std::list<GrpcAppleWorker*>
workers;
88 bool is_shutdown =
false;
95 bool kicked_without_poller =
false;
98 static GlobalRunLoopContext* gGlobalRunLoopContext =
nullptr;
104 static void grpc_apple_register_read_stream_queue(
105 CFReadStreamRef read_stream, dispatch_queue_t dispatch_queue) {
106 CFReadStreamSetDispatchQueue(read_stream, dispatch_queue);
112 static void grpc_apple_register_write_stream_queue(
113 CFWriteStreamRef write_stream, dispatch_queue_t dispatch_queue) {
114 CFWriteStreamSetDispatchQueue(write_stream, dispatch_queue);
120 static void grpc_apple_register_read_stream_run_loop(
121 CFReadStreamRef read_stream, dispatch_queue_t dispatch_queue) {
122 GRPC_POLLING_TRACE(
"Register read stream: %p", read_stream);
124 CFReadStreamScheduleWithRunLoop(read_stream, gGlobalRunLoopContext->run_loop,
125 kCFRunLoopDefaultMode);
126 gGlobalRunLoopContext->input_source_registered =
true;
127 gGlobalRunLoopContext->input_source_cv.Signal();
133 static void grpc_apple_register_write_stream_run_loop(
134 CFWriteStreamRef write_stream, dispatch_queue_t dispatch_queue) {
135 GRPC_POLLING_TRACE(
"Register write stream: %p", write_stream);
137 CFWriteStreamScheduleWithRunLoop(
138 write_stream, gGlobalRunLoopContext->run_loop, kCFRunLoopDefaultMode);
139 gGlobalRunLoopContext->input_source_registered =
true;
140 gGlobalRunLoopContext->input_source_cv.Signal();
148 static void (*grpc_apple_register_read_stream_impl)(
149 CFReadStreamRef, dispatch_queue_t) = grpc_apple_register_read_stream_queue;
150 static void (*grpc_apple_register_write_stream_impl)(CFWriteStreamRef,
152 grpc_apple_register_write_stream_queue;
154 void grpc_apple_register_read_stream(CFReadStreamRef read_stream,
155 dispatch_queue_t dispatch_queue) {
156 grpc_apple_register_read_stream_impl(read_stream, dispatch_queue);
159 void grpc_apple_register_write_stream(CFWriteStreamRef write_stream,
160 dispatch_queue_t dispatch_queue) {
161 grpc_apple_register_write_stream_impl(write_stream, dispatch_queue);
166 static void GlobalRunLoopFunc(
void*
arg) {
168 gGlobalRunLoopContext->run_loop = CFRunLoopGetCurrent();
169 gGlobalRunLoopContext->init_cv.Signal();
171 while (!gGlobalRunLoopContext->is_shutdown) {
175 while (!gGlobalRunLoopContext->input_source_registered) {
176 gGlobalRunLoopContext->input_source_cv.Wait(&gGlobalRunLoopContext->mu);
178 gGlobalRunLoopContext->input_source_registered =
false;
188 static void pollset_global_init(
void) {
189 gGlobalRunLoopContext =
new GlobalRunLoopContext;
191 grpc_apple_register_read_stream_impl =
192 grpc_apple_register_read_stream_run_loop;
193 grpc_apple_register_write_stream_impl =
194 grpc_apple_register_write_stream_run_loop;
197 gGlobalRunLoopThread =
199 gGlobalRunLoopThread->
Start();
200 while (gGlobalRunLoopContext->run_loop == NULL)
201 gGlobalRunLoopContext->init_cv.Wait(&gGlobalRunLoopContext->mu);
204 static void pollset_global_shutdown(
void) {
207 gGlobalRunLoopContext->is_shutdown =
true;
208 CFRunLoopStop(gGlobalRunLoopContext->run_loop);
210 gGlobalRunLoopThread->
Join();
211 delete gGlobalRunLoopThread;
212 delete gGlobalRunLoopContext;
225 GRPC_POLLING_TRACE(
"pollset work: %p, worker: %p, deadline: %" PRIu64,
228 GrpcApplePollset* apple_pollset =
229 reinterpret_cast<GrpcApplePollset*
>(pollset);
230 GrpcAppleWorker actual_worker;
235 if (apple_pollset->kicked_without_poller) {
237 apple_pollset->kicked_without_poller =
false;
240 apple_pollset->workers.push_front(&actual_worker);
241 auto it = apple_pollset->workers.begin();
243 while (!actual_worker.kicked && !apple_pollset->is_shutdown) {
244 if (actual_worker.cv.WaitWithDeadline(
252 apple_pollset->workers.erase(
it);
257 if (apple_pollset->is_shutdown && apple_pollset->workers.empty()) {
268 static void kick_worker(GrpcAppleWorker*
worker) {
278 GrpcApplePollset* apple_pollset =
279 reinterpret_cast<GrpcApplePollset*
>(pollset);
281 GRPC_POLLING_TRACE(
"pollset kick: %p, worker:%p", pollset, specific_worker);
283 if (specific_worker ==
nullptr) {
284 if (apple_pollset->workers.empty()) {
285 apple_pollset->kicked_without_poller =
true;
287 GrpcAppleWorker* actual_worker = apple_pollset->workers.front();
288 kick_worker(actual_worker);
290 }
else if (specific_worker == GRPC_POLLSET_KICK_BROADCAST) {
291 for (
auto& actual_worker : apple_pollset->workers) {
292 kick_worker(actual_worker);
295 GrpcAppleWorker* actual_worker =
296 reinterpret_cast<GrpcAppleWorker*
>(specific_worker);
297 kick_worker(actual_worker);
304 GRPC_POLLING_TRACE(
"pollset init: %p", pollset);
305 GrpcApplePollset* apple_pollset =
new (pollset) GrpcApplePollset();
312 GRPC_POLLING_TRACE(
"pollset shutdown: %p", pollset);
314 GrpcApplePollset* apple_pollset =
315 reinterpret_cast<GrpcApplePollset*
>(pollset);
316 apple_pollset->is_shutdown =
true;
317 (void)
pollset_kick(pollset, GRPC_POLLSET_KICK_BROADCAST);
320 if (apple_pollset->workers.empty()) {
323 apple_pollset->shutdown_closure =
closure;
328 GRPC_POLLING_TRACE(
"pollset destroy: %p", pollset);
329 GrpcApplePollset* apple_pollset =
330 reinterpret_cast<GrpcApplePollset*
>(pollset);
331 apple_pollset->~GrpcApplePollset();
334 size_t pollset_size(
void) {
return sizeof(GrpcApplePollset); }
337 pollset_global_init, pollset_global_shutdown,
356 pollset_set_create, pollset_set_destroy,
357 pollset_set_add_pollset, pollset_set_del_pollset,
358 pollset_set_add_pollset_set, pollset_set_del_pollset_set};