iocp_windows.cc
Go to the documentation of this file.
1 /*
2  *
3  * Copyright 2015 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
20 
22 
23 #ifdef GRPC_WINSOCK_SOCKET
24 
25 #include <winsock2.h>
26 
27 #include <limits>
28 
29 #include <grpc/support/alloc.h>
30 #include <grpc/support/log.h>
32 
34 #include "src/core/lib/gprpp/thd.h"
39 
40 static ULONG g_iocp_kick_token;
41 static OVERLAPPED g_iocp_custom_overlap;
42 
43 static gpr_atm g_custom_events = 0;
44 
45 static HANDLE g_iocp;
46 
47 static DWORD deadline_to_millis_timeout(grpc_core::Timestamp deadline) {
48  if (deadline == grpc_core::Timestamp::InfFuture()) {
49  return INFINITE;
50  }
52  if (deadline < now) return 0;
53  grpc_core::Duration timeout = deadline - now;
54  if (timeout.millis() > std::numeric_limits<DWORD>::max()) return INFINITE;
55  return static_cast<DWORD>(timeout.millis());
56 }
57 
58 grpc_iocp_work_status grpc_iocp_work(grpc_core::Timestamp deadline) {
59  BOOL success;
60  DWORD bytes = 0;
61  DWORD flags = 0;
62  ULONG_PTR completion_key;
63  LPOVERLAPPED overlapped;
64  grpc_winsocket* socket;
65  grpc_winsocket_callback_info* info;
67  success =
68  GetQueuedCompletionStatus(g_iocp, &bytes, &completion_key, &overlapped,
69  deadline_to_millis_timeout(deadline));
71  if (success == 0 && overlapped == NULL) {
72  return GRPC_IOCP_WORK_TIMEOUT;
73  }
74  GPR_ASSERT(completion_key && overlapped);
75  if (overlapped == &g_iocp_custom_overlap) {
76  gpr_atm_full_fetch_add(&g_custom_events, -1);
77  if (completion_key == (ULONG_PTR)&g_iocp_kick_token) {
78  /* We were awoken from a kick. */
79  return GRPC_IOCP_WORK_KICK;
80  }
81  gpr_log(GPR_ERROR, "Unknown custom completion key.");
82  abort();
83  }
84 
85  socket = (grpc_winsocket*)completion_key;
86  if (overlapped == &socket->write_info.overlapped) {
87  info = &socket->write_info;
88  } else if (overlapped == &socket->read_info.overlapped) {
89  info = &socket->read_info;
90  } else {
91  abort();
92  }
93  if (socket->shutdown_called) {
94  info->bytes_transferred = 0;
95  info->wsa_error = WSA_OPERATION_ABORTED;
96  } else {
97  success = WSAGetOverlappedResult(socket->socket, &info->overlapped, &bytes,
98  FALSE, &flags);
99  info->bytes_transferred = bytes;
100  info->wsa_error = success ? 0 : WSAGetLastError();
101  }
102  GPR_ASSERT(overlapped == &info->overlapped);
103  grpc_socket_become_ready(socket, info);
104  return GRPC_IOCP_WORK_WORK;
105 }
106 
107 void grpc_iocp_init(void) {
108  g_iocp =
109  CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, (ULONG_PTR)NULL, 0);
110  GPR_ASSERT(g_iocp);
111 }
112 
113 void grpc_iocp_kick(void) {
114  BOOL success;
115 
116  gpr_atm_full_fetch_add(&g_custom_events, 1);
117  success = PostQueuedCompletionStatus(g_iocp, 0, (ULONG_PTR)&g_iocp_kick_token,
118  &g_iocp_custom_overlap);
119  GPR_ASSERT(success);
120 }
121 
122 void grpc_iocp_flush(void) {
124  grpc_iocp_work_status work_status;
125 
126  do {
127  work_status = grpc_iocp_work(grpc_core::Timestamp::InfPast());
128  } while (work_status == GRPC_IOCP_WORK_KICK ||
129  grpc_core::ExecCtx::Get()->Flush());
130 }
131 
132 void grpc_iocp_shutdown(void) {
134  while (gpr_atm_acq_load(&g_custom_events)) {
135  grpc_iocp_work(grpc_core::Timestamp::InfFuture());
137  }
138 
139  GPR_ASSERT(CloseHandle(g_iocp));
140 }
141 
142 void grpc_iocp_add_socket(grpc_winsocket* socket) {
143  HANDLE ret;
144  if (socket->added_to_iocp) return;
145  ret = CreateIoCompletionPort((HANDLE)socket->socket, g_iocp,
146  (uintptr_t)socket, 0);
147  if (!ret) {
148  char* utf8_message = gpr_format_message(WSAGetLastError());
149  gpr_log(GPR_ERROR, "Unable to add socket to iocp: %s", utf8_message);
150  gpr_free(utf8_message);
151  __debugbreak();
152  abort();
153  }
154  socket->added_to_iocp = 1;
155  GPR_ASSERT(ret == g_iocp);
156 }
157 
158 #endif /* GRPC_WINSOCK_SOCKET */
now
static double now(void)
Definition: test/core/fling/client.cc:130
gpr_atm_full_fetch_add
#define gpr_atm_full_fetch_add(p, delta)
Definition: impl/codegen/atm_gcc_atomic.h:62
log.h
iomgr_internal.h
grpc_core::Timestamp
Definition: src/core/lib/gprpp/time.h:62
gpr_free
GPRAPI void gpr_free(void *ptr)
Definition: alloc.cc:51
gpr_format_message
GPRAPI char * gpr_format_message(int messageid)
socket_windows.h
grpc_core::Timestamp::InfPast
static constexpr Timestamp InfPast()
Definition: src/core/lib/gprpp/time.h:83
iocp_windows.h
stats.h
BOOL
int BOOL
Definition: undname.c:46
GPR_ASSERT
#define GPR_ASSERT(x)
Definition: include/grpc/impl/codegen/log.h:94
max
int max
Definition: bloaty/third_party/zlib/examples/enough.c:170
grpc_core::ExecCtx::Flush
bool Flush()
Definition: exec_ctx.cc:69
gpr_log
GPRAPI void gpr_log(const char *file, int line, gpr_log_severity severity, const char *format,...) GPR_PRINT_FORMAT_CHECK(4
gpr_atm_acq_load
#define gpr_atm_acq_load(p)
Definition: impl/codegen/atm_gcc_atomic.h:52
GRPC_STATS_INC_SYSCALL_POLL
#define GRPC_STATS_INC_SYSCALL_POLL()
Definition: stats_data.h:188
uintptr_t
_W64 unsigned int uintptr_t
Definition: stdint-msvc2008.h:119
GPR_ERROR
#define GPR_ERROR
Definition: include/grpc/impl/codegen/log.h:57
grpc_core::ExecCtx
Definition: exec_ctx.h:97
gpr_atm
intptr_t gpr_atm
Definition: impl/codegen/atm_gcc_atomic.h:32
FALSE
const BOOL FALSE
Definition: undname.c:47
bytes
uint8 bytes[10]
Definition: bloaty/third_party/protobuf/src/google/protobuf/io/coded_stream_unittest.cc:153
absl::flags_internal
Definition: abseil-cpp/absl/flags/commandlineflag.h:40
port.h
exec_ctx
grpc_core::ExecCtx exec_ctx
Definition: end2end_binder_transport_test.cc:75
ret
UniquePtr< SSL_SESSION > ret
Definition: ssl_x509.cc:1029
alloc.h
thd.h
INVALID_HANDLE_VALUE
#define INVALID_HANDLE_VALUE
Definition: bloaty/third_party/zlib/contrib/minizip/iowin32.c:21
log_windows.h
timer.h
test_server.socket
socket
Definition: test_server.py:65
grpc_core::Timestamp::InfFuture
static constexpr Timestamp InfFuture()
Definition: src/core/lib/gprpp/time.h:79
grpc_core::ExecCtx::Now
Timestamp Now()
Definition: exec_ctx.cc:90
grpc_core::Duration
Definition: src/core/lib/gprpp/time.h:122
timeout
uv_timer_t timeout
Definition: libuv/docs/code/uvwget/main.c:9
grpc_core::ExecCtx::Get
static ExecCtx * Get()
Definition: exec_ctx.h:205
grpc_core::ExecCtx::InvalidateNow
void InvalidateNow()
Definition: exec_ctx.h:188
port_platform.h


grpc
Author(s):
autogenerated on Fri May 16 2025 02:59:08