pollset_windows.cc
Go to the documentation of this file.
1 /*
2  *
3  * Copyright 2015 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
20 
22 
23 #ifdef GRPC_WINSOCK_SOCKET
24 
25 #include <grpc/support/log.h>
26 
27 #include "src/core/lib/gprpp/thd.h"
32 
33 #define GRPC_POLLSET_KICK_BROADCAST ((grpc_pollset_worker*)1)
34 
36 
37 gpr_mu grpc_polling_mu;
38 static grpc_pollset_worker* g_active_poller;
39 static grpc_pollset_worker g_global_root_worker;
40 
41 static void pollset_global_init(void) {
42  gpr_mu_init(&grpc_polling_mu);
43  g_active_poller = NULL;
44  g_global_root_worker.links[GRPC_POLLSET_WORKER_LINK_GLOBAL].next =
45  g_global_root_worker.links[GRPC_POLLSET_WORKER_LINK_GLOBAL].prev =
46  &g_global_root_worker;
47 }
48 
49 static void pollset_global_shutdown(void) { gpr_mu_destroy(&grpc_polling_mu); }
50 
51 static void remove_worker(grpc_pollset_worker* worker,
52  grpc_pollset_worker_link_type type) {
53  worker->links[type].prev->links[type].next = worker->links[type].next;
54  worker->links[type].next->links[type].prev = worker->links[type].prev;
55  worker->links[type].next = worker->links[type].prev = worker;
56 }
57 
58 static int has_workers(grpc_pollset_worker* root,
59  grpc_pollset_worker_link_type type) {
60  return root->links[type].next != root;
61 }
62 
63 static grpc_pollset_worker* pop_front_worker(
64  grpc_pollset_worker* root, grpc_pollset_worker_link_type type) {
65  if (has_workers(root, type)) {
66  grpc_pollset_worker* w = root->links[type].next;
67  remove_worker(w, type);
68  return w;
69  } else {
70  return NULL;
71  }
72 }
73 
74 static void push_front_worker(grpc_pollset_worker* root,
75  grpc_pollset_worker_link_type type,
77  worker->links[type].prev = root;
78  worker->links[type].next = worker->links[type].prev->links[type].next;
79  worker->links[type].prev->links[type].next =
80  worker->links[type].next->links[type].prev = worker;
81 }
82 
83 static size_t pollset_size(void) { return sizeof(grpc_pollset); }
84 
85 /* There isn't really any such thing as a pollset under Windows, due to the
86  nature of the IO completion ports. We're still going to provide a minimal
87  set of features for the sake of the rest of grpc. But grpc_pollset_work
88  won't actually do any polling, and return as quickly as possible. */
89 
90 static void pollset_init(grpc_pollset* pollset, gpr_mu** mu) {
91  *mu = &grpc_polling_mu;
92  pollset->root_worker.links[GRPC_POLLSET_WORKER_LINK_POLLSET].next =
93  pollset->root_worker.links[GRPC_POLLSET_WORKER_LINK_POLLSET].prev =
94  &pollset->root_worker;
95 }
96 
97 static void pollset_shutdown(grpc_pollset* pollset, grpc_closure* closure) {
98  pollset->shutting_down = 1;
99  grpc_pollset_kick(pollset, GRPC_POLLSET_KICK_BROADCAST);
100  if (!pollset->is_iocp_worker) {
102  } else {
103  pollset->on_shutdown = closure;
104  }
105 }
106 
107 static void pollset_destroy(grpc_pollset* pollset) {}
108 
110  grpc_pollset_worker** worker_hdl,
111  grpc_core::Timestamp deadline) {
113  if (worker_hdl) *worker_hdl = &worker;
114 
115  int added_worker = 0;
116  worker.links[GRPC_POLLSET_WORKER_LINK_POLLSET].next =
117  worker.links[GRPC_POLLSET_WORKER_LINK_POLLSET].prev =
118  worker.links[GRPC_POLLSET_WORKER_LINK_GLOBAL].next =
119  worker.links[GRPC_POLLSET_WORKER_LINK_GLOBAL].prev = NULL;
120  worker.kicked = 0;
121  worker.pollset = pollset;
122  gpr_cv_init(&worker.cv);
123  if (!pollset->kicked_without_pollers && !pollset->shutting_down) {
124  if (g_active_poller == NULL) {
125  grpc_pollset_worker* next_worker;
126  /* become poller */
127  pollset->is_iocp_worker = 1;
128  g_active_poller = &worker;
129  gpr_mu_unlock(&grpc_polling_mu);
130  grpc_iocp_work(deadline);
132  gpr_mu_lock(&grpc_polling_mu);
133  pollset->is_iocp_worker = 0;
134  g_active_poller = NULL;
135  /* try to get a worker from this pollsets worker list */
136  next_worker = pop_front_worker(&pollset->root_worker,
137  GRPC_POLLSET_WORKER_LINK_POLLSET);
138  if (next_worker == NULL) {
139  /* try to get a worker from the global list */
140  next_worker = pop_front_worker(&g_global_root_worker,
141  GRPC_POLLSET_WORKER_LINK_GLOBAL);
142  }
143  if (next_worker != NULL) {
144  next_worker->kicked = 1;
145  gpr_cv_signal(&next_worker->cv);
146  }
147 
148  if (pollset->shutting_down && pollset->on_shutdown != NULL) {
149  grpc_core::ExecCtx::Run(DEBUG_LOCATION, pollset->on_shutdown,
151  pollset->on_shutdown = NULL;
152  }
153  goto done;
154  }
155  push_front_worker(&g_global_root_worker, GRPC_POLLSET_WORKER_LINK_GLOBAL,
156  &worker);
157  push_front_worker(&pollset->root_worker, GRPC_POLLSET_WORKER_LINK_POLLSET,
158  &worker);
159  added_worker = 1;
160  while (!worker.kicked) {
161  if (gpr_cv_wait(&worker.cv, &grpc_polling_mu,
162  deadline.as_timespec(GPR_CLOCK_REALTIME))) {
164  break;
165  }
167  }
168  } else {
169  pollset->kicked_without_pollers = 0;
170  }
171 done:
172  if (!grpc_closure_list_empty(*grpc_core::ExecCtx::Get()->closure_list())) {
173  gpr_mu_unlock(&grpc_polling_mu);
175  gpr_mu_lock(&grpc_polling_mu);
176  }
177  if (added_worker) {
178  remove_worker(&worker, GRPC_POLLSET_WORKER_LINK_GLOBAL);
179  remove_worker(&worker, GRPC_POLLSET_WORKER_LINK_POLLSET);
180  }
181  gpr_cv_destroy(&worker.cv);
182  if (worker_hdl) *worker_hdl = NULL;
183  return GRPC_ERROR_NONE;
184 }
185 
187  grpc_pollset_worker* specific_worker) {
188  bool should_kick_global = false;
189  if (specific_worker != NULL) {
190  if (specific_worker == GRPC_POLLSET_KICK_BROADCAST) {
191  should_kick_global = true;
192  for (specific_worker =
193  p->root_worker.links[GRPC_POLLSET_WORKER_LINK_POLLSET].next;
194  specific_worker != &p->root_worker;
195  specific_worker =
196  specific_worker->links[GRPC_POLLSET_WORKER_LINK_POLLSET].next) {
197  specific_worker->kicked = 1;
198  should_kick_global = false;
199  gpr_cv_signal(&specific_worker->cv);
200  }
201  p->kicked_without_pollers = 1;
202  if (p->is_iocp_worker) {
203  grpc_iocp_kick();
204  should_kick_global = false;
205  }
206  } else {
207  if (p->is_iocp_worker && g_active_poller == specific_worker) {
208  grpc_iocp_kick();
209  } else {
210  specific_worker->kicked = 1;
211  gpr_cv_signal(&specific_worker->cv);
212  }
213  }
214  } else {
215  specific_worker =
216  pop_front_worker(&p->root_worker, GRPC_POLLSET_WORKER_LINK_POLLSET);
217  if (specific_worker != NULL) {
218  grpc_pollset_kick(p, specific_worker);
219  } else if (p->is_iocp_worker) {
220  grpc_iocp_kick();
221  } else {
222  p->kicked_without_pollers = 1;
223  should_kick_global = true;
224  }
225  }
226  if (should_kick_global && g_active_poller == NULL) {
227  grpc_pollset_worker* next_global_worker = pop_front_worker(
228  &g_global_root_worker, GRPC_POLLSET_WORKER_LINK_GLOBAL);
229  if (next_global_worker != NULL) {
230  next_global_worker->kicked = 1;
231  gpr_cv_signal(&next_global_worker->cv);
232  }
233  }
234  return GRPC_ERROR_NONE;
235 }
236 
237 grpc_pollset_vtable grpc_windows_pollset_vtable = {
238  pollset_global_init, pollset_global_shutdown,
241  pollset_kick, pollset_size};
242 
243 #endif /* GRPC_WINSOCK_SOCKET */
grpc_pollset_worker
struct grpc_pollset_worker grpc_pollset_worker
Definition: pollset.h:39
gpr_cv_signal
GPRAPI void gpr_cv_signal(gpr_cv *cv)
grpc_pollset_vtable
Definition: pollset.h:41
gpr_mu_unlock
GPRAPI void gpr_mu_unlock(gpr_mu *mu)
pollset.h
GRPC_ERROR_NONE
#define GRPC_ERROR_NONE
Definition: error.h:234
grpc::testing::pollset_work
static grpc_error_handle pollset_work(grpc_pollset *ps, grpc_pollset_worker **, grpc_core::Timestamp deadline)
Definition: bm_cq_multiple_threads.cc:73
log.h
grpc_trace_fd_refcount
grpc_core::DebugOnlyTraceFlag grpc_trace_fd_refcount
pollset_windows.h
iomgr_internal.h
worker
static void worker(void *arg)
Definition: threadpool.c:57
grpc_core::Timestamp
Definition: src/core/lib/gprpp/time.h:62
xds_manager.p
p
Definition: xds_manager.py:60
iocp_windows.h
gpr_mu_destroy
GPRAPI void gpr_mu_destroy(gpr_mu *mu)
DEBUG_LOCATION
#define DEBUG_LOCATION
Definition: debug_location.h:41
gpr_cv_destroy
GPRAPI void gpr_cv_destroy(gpr_cv *cv)
root
RefCountedPtr< grpc_tls_certificate_provider > root
Definition: xds_server_config_fetcher.cc:223
grpc_core::ExecCtx::Flush
bool Flush()
Definition: exec_ctx.cc:69
mu
Mutex mu
Definition: server_config_selector_filter.cc:74
gpr_mu_init
GPRAPI void gpr_mu_init(gpr_mu *mu)
worker
Definition: worker.py:1
closure
grpc_closure closure
Definition: src/core/lib/surface/server.cc:466
done
struct tab * done
Definition: bloaty/third_party/zlib/examples/enough.c:176
gpr_cv_wait
GPRAPI int gpr_cv_wait(gpr_cv *cv, gpr_mu *mu, gpr_timespec abs_deadline)
gpr_mu_lock
GPRAPI void gpr_mu_lock(gpr_mu *mu)
grpc_pollset
struct grpc_pollset grpc_pollset
Definition: pollset.h:38
grpc_pollset_kick
grpc_error_handle grpc_pollset_kick(grpc_pollset *pollset, grpc_pollset_worker *specific_worker)
Definition: pollset.cc:51
grpc_core::TraceFlag
Definition: debug/trace.h:63
gpr_mu
pthread_mutex_t gpr_mu
Definition: impl/codegen/sync_posix.h:47
port.h
grpc::testing::pollset_destroy
static void pollset_destroy(grpc_pollset *ps)
Definition: bm_cq_multiple_threads.cc:59
thd.h
closure
Definition: proxy.cc:59
grpc::testing::pollset_shutdown
static void pollset_shutdown(grpc_pollset *, grpc_closure *closure)
Definition: bm_cq_multiple_threads.cc:50
grpc_core::ExecCtx::Run
static void Run(const DebugLocation &location, grpc_closure *closure, grpc_error_handle error)
Definition: exec_ctx.cc:98
asyncio_get_stats.type
type
Definition: asyncio_get_stats.py:37
grpc_error
Definition: error_internal.h:42
grpc::testing::pollset_kick
static grpc_error_handle pollset_kick(grpc_pollset *, grpc_pollset_worker *)
Definition: bm_cq_multiple_threads.cc:61
GPR_CLOCK_REALTIME
@ GPR_CLOCK_REALTIME
Definition: gpr_types.h:39
grpc_closure_list_empty
bool grpc_closure_list_empty(grpc_closure_list closure_list)
Definition: closure.h:243
grpc_pollset
Definition: bm_cq_multiple_threads.cc:37
grpc_closure
Definition: closure.h:56
grpc_core::ExecCtx::Get
static ExecCtx * Get()
Definition: exec_ctx.h:205
grpc_core::Timestamp::as_timespec
gpr_timespec as_timespec(gpr_clock_type type) const
Definition: src/core/lib/gprpp/time.cc:157
gpr_cv_init
GPRAPI void gpr_cv_init(gpr_cv *cv)
grpc_core::ExecCtx::InvalidateNow
void InvalidateNow()
Definition: exec_ctx.h:188
port_platform.h
grpc::testing::pollset_init
static void pollset_init(grpc_pollset *ps, gpr_mu **mu)
Definition: bm_cq_multiple_threads.cc:54


grpc
Author(s):
autogenerated on Fri May 16 2025 02:59:44