grpc_ares_ev_driver_windows.cc
Go to the documentation of this file.
1 /*
2  *
3  * Copyright 2016 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
19 
20 #include "src/core/lib/iomgr/port.h" // IWYU pragma: keep
21 #if GRPC_ARES == 1 && defined(GRPC_WINDOWS_SOCKET_ARES_EV_DRIVER)
22 
23 #include <string.h>
24 
25 #include <ares.h>
26 
27 #include "absl/strings/str_format.h"
28 
29 #include <grpc/support/alloc.h>
30 #include <grpc/support/log.h>
33 #include <grpc/support/time.h>
34 
45 
46 /* TODO(apolcyn): remove this hack after fixing upstream.
47  * Our grpc/c-ares code on Windows uses the ares_set_socket_functions API,
48  * which uses "struct iovec" type, which on Windows is defined inside of
49  * a c-ares header that is not public.
50  * See https://github.com/c-ares/c-ares/issues/206. */
51 struct iovec {
52  void* iov_base;
53  size_t iov_len;
54 };
55 
56 namespace grpc_core {
57 
58 /* c-ares reads and takes action on the error codes of the
59  * "virtual socket operations" in this file, via the WSAGetLastError
60  * APIs. If code in this file wants to set a specific WSA error that
61  * c-ares should read, it must do so by calling SetWSAError() on the
62  * WSAErrorContext instance passed to it. A WSAErrorContext must only be
63  * instantiated at the top of the virtual socket function callstack. */
64 class WSAErrorContext {
65  public:
66  explicit WSAErrorContext(){};
67 
68  ~WSAErrorContext() {
69  if (error_ != 0) {
70  WSASetLastError(error_);
71  }
72  }
73 
74  /* Disallow copy and assignment operators */
75  WSAErrorContext(const WSAErrorContext&) = delete;
76  WSAErrorContext& operator=(const WSAErrorContext&) = delete;
77 
78  void SetWSAError(int error) { error_ = error; }
79 
80  private:
81  int error_ = 0;
82 };
83 
84 /* c-ares creates its own sockets and is meant to read them when readable and
85  * write them when writeable. To fit this socket usage model into the grpc
86  * windows poller (which gives notifications when attempted reads and writes are
87  * actually fulfilled rather than possible), this GrpcPolledFdWindows class
88  * takes advantage of the ares_set_socket_functions API and acts as a virtual
89  * socket. It holds its own read and write buffers which are written to and read
90  * from c-ares and are used with the grpc windows poller, and it, e.g.,
91  * manufactures virtual socket error codes when it e.g. needs to tell the c-ares
92  * library to wait for an async read. */
93 class GrpcPolledFdWindows {
94  public:
95  enum WriteState {
96  WRITE_IDLE,
97  WRITE_REQUESTED,
98  WRITE_PENDING,
99  WRITE_WAITING_FOR_VERIFICATION_UPON_RETRY,
100  };
101 
102  GrpcPolledFdWindows(ares_socket_t as, Mutex* mu, int address_family,
103  int socket_type)
104  : mu_(mu),
105  read_buf_(grpc_empty_slice()),
106  write_buf_(grpc_empty_slice()),
107  tcp_write_state_(WRITE_IDLE),
108  name_(absl::StrFormat("c-ares socket: %" PRIdPTR, as)),
109  gotten_into_driver_list_(false),
110  address_family_(address_family),
111  socket_type_(socket_type) {
112  // Closure Initialization
113  GRPC_CLOSURE_INIT(&outer_read_closure_,
114  &GrpcPolledFdWindows::OnIocpReadable, this,
115  grpc_schedule_on_exec_ctx);
116  GRPC_CLOSURE_INIT(&outer_write_closure_,
117  &GrpcPolledFdWindows::OnIocpWriteable, this,
118  grpc_schedule_on_exec_ctx);
119  GRPC_CLOSURE_INIT(&on_tcp_connect_locked_,
120  &GrpcPolledFdWindows::OnTcpConnect, this,
121  grpc_schedule_on_exec_ctx);
122  winsocket_ = grpc_winsocket_create(as, name_.c_str());
123  }
124 
125  ~GrpcPolledFdWindows() {
126  grpc_slice_unref_internal(read_buf_);
127  grpc_slice_unref_internal(write_buf_);
128  GPR_ASSERT(read_closure_ == nullptr);
129  GPR_ASSERT(write_closure_ == nullptr);
130  grpc_winsocket_destroy(winsocket_);
131  }
132 
133  void ScheduleAndNullReadClosure(grpc_error_handle error) {
134  ExecCtx::Run(DEBUG_LOCATION, read_closure_, error);
135  read_closure_ = nullptr;
136  }
137 
138  void ScheduleAndNullWriteClosure(grpc_error_handle error) {
139  ExecCtx::Run(DEBUG_LOCATION, write_closure_, error);
140  write_closure_ = nullptr;
141  }
142 
143  void RegisterForOnReadableLocked(grpc_closure* read_closure) {
144  GPR_ASSERT(read_closure_ == nullptr);
145  read_closure_ = read_closure;
146  GPR_ASSERT(GRPC_SLICE_LENGTH(read_buf_) == 0);
147  grpc_slice_unref_internal(read_buf_);
148  GPR_ASSERT(!read_buf_has_data_);
149  read_buf_ = GRPC_SLICE_MALLOC(4192);
150  if (connect_done_) {
151  ContinueRegisterForOnReadableLocked();
152  } else {
153  GPR_ASSERT(pending_continue_register_for_on_readable_locked_ == false);
154  pending_continue_register_for_on_readable_locked_ = true;
155  }
156  }
157 
158  void ContinueRegisterForOnReadableLocked() {
160  "fd:|%s| ContinueRegisterForOnReadableLocked "
161  "wsa_connect_error_:%d",
162  GetName(), wsa_connect_error_);
163  GPR_ASSERT(connect_done_);
164  if (wsa_connect_error_ != 0) {
165  ScheduleAndNullReadClosure(GRPC_WSA_ERROR(wsa_connect_error_, "connect"));
166  return;
167  }
168  WSABUF buffer;
169  buffer.buf = (char*)GRPC_SLICE_START_PTR(read_buf_);
170  buffer.len = GRPC_SLICE_LENGTH(read_buf_);
171  memset(&winsocket_->read_info.overlapped, 0, sizeof(OVERLAPPED));
172  recv_from_source_addr_len_ = sizeof(recv_from_source_addr_);
173  DWORD flags = 0;
174  if (WSARecvFrom(grpc_winsocket_wrapped_socket(winsocket_), &buffer, 1,
175  nullptr, &flags, (sockaddr*)recv_from_source_addr_,
176  &recv_from_source_addr_len_,
177  &winsocket_->read_info.overlapped, nullptr)) {
178  int wsa_last_error = WSAGetLastError();
179  char* msg = gpr_format_message(wsa_last_error);
181  "fd:|%s| RegisterForOnReadableLocked WSARecvFrom error code:|%d| "
182  "msg:|%s|",
183  GetName(), wsa_last_error, msg);
184  gpr_free(msg);
185  if (wsa_last_error != WSA_IO_PENDING) {
186  ScheduleAndNullReadClosure(
187  GRPC_WSA_ERROR(wsa_last_error, "WSARecvFrom"));
188  return;
189  }
190  }
191  grpc_socket_notify_on_read(winsocket_, &outer_read_closure_);
192  }
193 
194  void RegisterForOnWriteableLocked(grpc_closure* write_closure) {
195  if (socket_type_ == SOCK_DGRAM) {
196  GRPC_CARES_TRACE_LOG("fd:|%s| RegisterForOnWriteableLocked called",
197  GetName());
198  } else {
199  GPR_ASSERT(socket_type_ == SOCK_STREAM);
201  "fd:|%s| RegisterForOnWriteableLocked called tcp_write_state_: %d",
202  GetName(), tcp_write_state_);
203  }
204  GPR_ASSERT(write_closure_ == nullptr);
205  write_closure_ = write_closure;
206  if (connect_done_) {
207  ContinueRegisterForOnWriteableLocked();
208  } else {
209  GPR_ASSERT(pending_continue_register_for_on_writeable_locked_ == false);
210  pending_continue_register_for_on_writeable_locked_ = true;
211  }
212  }
213 
214  void ContinueRegisterForOnWriteableLocked() {
216  "fd:|%s| ContinueRegisterForOnWriteableLocked "
217  "wsa_connect_error_:%d",
218  GetName(), wsa_connect_error_);
219  GPR_ASSERT(connect_done_);
220  if (wsa_connect_error_ != 0) {
221  ScheduleAndNullWriteClosure(
222  GRPC_WSA_ERROR(wsa_connect_error_, "connect"));
223  return;
224  }
225  if (socket_type_ == SOCK_DGRAM) {
226  ScheduleAndNullWriteClosure(GRPC_ERROR_NONE);
227  } else {
228  GPR_ASSERT(socket_type_ == SOCK_STREAM);
229  int wsa_error_code = 0;
230  switch (tcp_write_state_) {
231  case WRITE_IDLE:
232  ScheduleAndNullWriteClosure(GRPC_ERROR_NONE);
233  break;
234  case WRITE_REQUESTED:
235  tcp_write_state_ = WRITE_PENDING;
236  if (SendWriteBuf(nullptr, &winsocket_->write_info.overlapped,
237  &wsa_error_code) != 0) {
238  ScheduleAndNullWriteClosure(
239  GRPC_WSA_ERROR(wsa_error_code, "WSASend (overlapped)"));
240  } else {
241  grpc_socket_notify_on_write(winsocket_, &outer_write_closure_);
242  }
243  break;
244  case WRITE_PENDING:
245  case WRITE_WAITING_FOR_VERIFICATION_UPON_RETRY:
246  abort();
247  }
248  }
249  }
250 
251  bool IsFdStillReadableLocked() { return read_buf_has_data_; }
252 
253  void ShutdownLocked(grpc_error_handle error) {
254  grpc_winsocket_shutdown(winsocket_);
255  }
256 
257  ares_socket_t GetWrappedAresSocketLocked() {
258  return grpc_winsocket_wrapped_socket(winsocket_);
259  }
260 
261  const char* GetName() const { return name_.c_str(); }
262 
263  ares_ssize_t RecvFrom(WSAErrorContext* wsa_error_ctx, void* data,
264  ares_socket_t data_len, int flags,
265  struct sockaddr* from, ares_socklen_t* from_len) {
267  "fd:|%s| RecvFrom called read_buf_has_data:%d Current read buf "
268  "length:|%d|",
269  GetName(), read_buf_has_data_, GRPC_SLICE_LENGTH(read_buf_));
270  if (!read_buf_has_data_) {
271  wsa_error_ctx->SetWSAError(WSAEWOULDBLOCK);
272  return -1;
273  }
275  for (size_t i = 0; i < GRPC_SLICE_LENGTH(read_buf_) && i < data_len; i++) {
276  ((char*)data)[i] = GRPC_SLICE_START_PTR(read_buf_)[i];
277  bytes_read++;
278  }
279  read_buf_ = grpc_slice_sub_no_ref(read_buf_, bytes_read,
280  GRPC_SLICE_LENGTH(read_buf_));
281  if (GRPC_SLICE_LENGTH(read_buf_) == 0) {
282  read_buf_has_data_ = false;
283  }
284  /* c-ares overloads this recv_from virtual socket function to receive
285  * data on both UDP and TCP sockets, and from is nullptr for TCP. */
286  if (from != nullptr) {
287  GPR_ASSERT(*from_len <= recv_from_source_addr_len_);
288  memcpy(from, &recv_from_source_addr_, recv_from_source_addr_len_);
289  *from_len = recv_from_source_addr_len_;
290  }
291  return bytes_read;
292  }
293 
294  grpc_slice FlattenIovec(const struct iovec* iov, int iov_count) {
295  int total = 0;
296  for (int i = 0; i < iov_count; i++) {
297  total += iov[i].iov_len;
298  }
300  size_t cur = 0;
301  for (int i = 0; i < iov_count; i++) {
302  for (int k = 0; k < iov[i].iov_len; k++) {
303  GRPC_SLICE_START_PTR(out)[cur++] = ((char*)iov[i].iov_base)[k];
304  }
305  }
306  return out;
307  }
308 
309  int SendWriteBuf(LPDWORD bytes_sent_ptr, LPWSAOVERLAPPED overlapped,
310  int* wsa_error_code) {
311  WSABUF buf;
312  buf.len = GRPC_SLICE_LENGTH(write_buf_);
313  buf.buf = (char*)GRPC_SLICE_START_PTR(write_buf_);
314  DWORD flags = 0;
315  int out = WSASend(grpc_winsocket_wrapped_socket(winsocket_), &buf, 1,
316  bytes_sent_ptr, flags, overlapped, nullptr);
317  *wsa_error_code = WSAGetLastError();
319  "fd:|%s| SendWriteBuf WSASend buf.len:%d *bytes_sent_ptr:%d "
320  "overlapped:%p "
321  "return:%d *wsa_error_code:%d",
322  GetName(), buf.len, bytes_sent_ptr != nullptr ? *bytes_sent_ptr : 0,
323  overlapped, out, *wsa_error_code);
324  return out;
325  }
326 
327  ares_ssize_t SendV(WSAErrorContext* wsa_error_ctx, const struct iovec* iov,
328  int iov_count) {
330  "fd:|%s| SendV called connect_done_:%d wsa_connect_error_:%d",
331  GetName(), connect_done_, wsa_connect_error_);
332  if (!connect_done_) {
333  wsa_error_ctx->SetWSAError(WSAEWOULDBLOCK);
334  return -1;
335  }
336  if (wsa_connect_error_ != 0) {
337  wsa_error_ctx->SetWSAError(wsa_connect_error_);
338  return -1;
339  }
340  switch (socket_type_) {
341  case SOCK_DGRAM:
342  return SendVUDP(wsa_error_ctx, iov, iov_count);
343  case SOCK_STREAM:
344  return SendVTCP(wsa_error_ctx, iov, iov_count);
345  default:
346  abort();
347  }
348  }
349 
350  ares_ssize_t SendVUDP(WSAErrorContext* wsa_error_ctx, const struct iovec* iov,
351  int iov_count) {
352  // c-ares doesn't handle retryable errors on writes of UDP sockets.
353  // Therefore, the sendv handler for UDP sockets must only attempt
354  // to write everything inline.
355  GRPC_CARES_TRACE_LOG("fd:|%s| SendVUDP called", GetName());
356  GPR_ASSERT(GRPC_SLICE_LENGTH(write_buf_) == 0);
357  grpc_slice_unref_internal(write_buf_);
358  write_buf_ = FlattenIovec(iov, iov_count);
359  DWORD bytes_sent = 0;
360  int wsa_error_code = 0;
361  if (SendWriteBuf(&bytes_sent, nullptr, &wsa_error_code) != 0) {
362  grpc_slice_unref_internal(write_buf_);
363  write_buf_ = grpc_empty_slice();
364  wsa_error_ctx->SetWSAError(wsa_error_code);
365  char* msg = gpr_format_message(wsa_error_code);
367  "fd:|%s| SendVUDP SendWriteBuf error code:%d msg:|%s|", GetName(),
368  wsa_error_code, msg);
369  gpr_free(msg);
370  return -1;
371  }
372  write_buf_ = grpc_slice_sub_no_ref(write_buf_, bytes_sent,
373  GRPC_SLICE_LENGTH(write_buf_));
374  return bytes_sent;
375  }
376 
377  ares_ssize_t SendVTCP(WSAErrorContext* wsa_error_ctx, const struct iovec* iov,
378  int iov_count) {
379  // The "sendv" handler on TCP sockets buffers up write
380  // requests and returns an artificial WSAEWOULDBLOCK. Writing that buffer
381  // out in the background, and making further send progress in general, will
382  // happen as long as c-ares continues to show interest in writeability on
383  // this fd.
384  GRPC_CARES_TRACE_LOG("fd:|%s| SendVTCP called tcp_write_state_:%d",
385  GetName(), tcp_write_state_);
386  switch (tcp_write_state_) {
387  case WRITE_IDLE:
388  tcp_write_state_ = WRITE_REQUESTED;
389  GPR_ASSERT(GRPC_SLICE_LENGTH(write_buf_) == 0);
390  grpc_slice_unref_internal(write_buf_);
391  write_buf_ = FlattenIovec(iov, iov_count);
392  wsa_error_ctx->SetWSAError(WSAEWOULDBLOCK);
393  return -1;
394  case WRITE_REQUESTED:
395  case WRITE_PENDING:
396  wsa_error_ctx->SetWSAError(WSAEWOULDBLOCK);
397  return -1;
398  case WRITE_WAITING_FOR_VERIFICATION_UPON_RETRY:
399  // c-ares is retrying a send on data that we previously returned
400  // WSAEWOULDBLOCK for, but then subsequently wrote out in the
401  // background. Right now, we assume that c-ares is retrying the same
402  // send again. If c-ares still needs to send even more data, we'll get
403  // to it eventually.
404  grpc_slice currently_attempted = FlattenIovec(iov, iov_count);
405  GPR_ASSERT(GRPC_SLICE_LENGTH(currently_attempted) >=
406  GRPC_SLICE_LENGTH(write_buf_));
407  ares_ssize_t total_sent = 0;
408  for (size_t i = 0; i < GRPC_SLICE_LENGTH(write_buf_); i++) {
409  GPR_ASSERT(GRPC_SLICE_START_PTR(currently_attempted)[i] ==
410  GRPC_SLICE_START_PTR(write_buf_)[i]);
411  total_sent++;
412  }
413  grpc_slice_unref_internal(currently_attempted);
414  tcp_write_state_ = WRITE_IDLE;
415  return total_sent;
416  }
417  abort();
418  }
419 
420  static void OnTcpConnect(void* arg, grpc_error_handle error) {
421  GrpcPolledFdWindows* grpc_polled_fd =
422  static_cast<GrpcPolledFdWindows*>(arg);
423  MutexLock lock(grpc_polled_fd->mu_);
424  grpc_polled_fd->OnTcpConnectLocked(error);
425  }
426 
427  void OnTcpConnectLocked(grpc_error_handle error) {
429  "fd:%s InnerOnTcpConnectLocked error:|%s| "
430  "pending_register_for_readable:%d"
431  " pending_register_for_writeable:%d",
433  pending_continue_register_for_on_readable_locked_,
434  pending_continue_register_for_on_writeable_locked_);
435  GPR_ASSERT(!connect_done_);
436  connect_done_ = true;
437  GPR_ASSERT(wsa_connect_error_ == 0);
438  if (GRPC_ERROR_IS_NONE(error)) {
439  DWORD transferred_bytes = 0;
440  DWORD flags;
441  BOOL wsa_success =
442  WSAGetOverlappedResult(grpc_winsocket_wrapped_socket(winsocket_),
443  &winsocket_->write_info.overlapped,
444  &transferred_bytes, FALSE, &flags);
445  GPR_ASSERT(transferred_bytes == 0);
446  if (!wsa_success) {
447  wsa_connect_error_ = WSAGetLastError();
448  char* msg = gpr_format_message(wsa_connect_error_);
450  "fd:%s InnerOnTcpConnectLocked WSA overlapped result code:%d "
451  "msg:|%s|",
452  GetName(), wsa_connect_error_, msg);
453  gpr_free(msg);
454  }
455  } else {
456  // Spoof up an error code that will cause any future c-ares operations on
457  // this fd to abort.
458  wsa_connect_error_ = WSA_OPERATION_ABORTED;
459  }
460  if (pending_continue_register_for_on_readable_locked_) {
461  ContinueRegisterForOnReadableLocked();
462  }
463  if (pending_continue_register_for_on_writeable_locked_) {
464  ContinueRegisterForOnWriteableLocked();
465  }
466  }
467 
468  int Connect(WSAErrorContext* wsa_error_ctx, const struct sockaddr* target,
469  ares_socklen_t target_len) {
470  switch (socket_type_) {
471  case SOCK_DGRAM:
472  return ConnectUDP(wsa_error_ctx, target, target_len);
473  case SOCK_STREAM:
474  return ConnectTCP(wsa_error_ctx, target, target_len);
475  default:
476  abort();
477  }
478  }
479 
480  int ConnectUDP(WSAErrorContext* wsa_error_ctx, const struct sockaddr* target,
481  ares_socklen_t target_len) {
482  GRPC_CARES_TRACE_LOG("fd:%s ConnectUDP", GetName());
483  GPR_ASSERT(!connect_done_);
484  GPR_ASSERT(wsa_connect_error_ == 0);
485  SOCKET s = grpc_winsocket_wrapped_socket(winsocket_);
486  int out =
487  WSAConnect(s, target, target_len, nullptr, nullptr, nullptr, nullptr);
488  wsa_connect_error_ = WSAGetLastError();
489  wsa_error_ctx->SetWSAError(wsa_connect_error_);
490  connect_done_ = true;
491  char* msg = gpr_format_message(wsa_connect_error_);
492  GRPC_CARES_TRACE_LOG("fd:%s WSAConnect error code:|%d| msg:|%s|", GetName(),
493  wsa_connect_error_, msg);
494  gpr_free(msg);
495  // c-ares expects a posix-style connect API
496  return out == 0 ? 0 : -1;
497  }
498 
499  int ConnectTCP(WSAErrorContext* wsa_error_ctx, const struct sockaddr* target,
500  ares_socklen_t target_len) {
501  GRPC_CARES_TRACE_LOG("fd:%s ConnectTCP", GetName());
502  LPFN_CONNECTEX ConnectEx;
503  GUID guid = WSAID_CONNECTEX;
504  DWORD ioctl_num_bytes;
505  SOCKET s = grpc_winsocket_wrapped_socket(winsocket_);
506  if (WSAIoctl(s, SIO_GET_EXTENSION_FUNCTION_POINTER, &guid, sizeof(guid),
507  &ConnectEx, sizeof(ConnectEx), &ioctl_num_bytes, nullptr,
508  nullptr) != 0) {
509  int wsa_last_error = WSAGetLastError();
510  wsa_error_ctx->SetWSAError(wsa_last_error);
511  char* msg = gpr_format_message(wsa_last_error);
513  "fd:%s WSAIoctl(SIO_GET_EXTENSION_FUNCTION_POINTER) error code:%d "
514  "msg:|%s|",
515  GetName(), wsa_last_error, msg);
516  gpr_free(msg);
517  connect_done_ = true;
518  wsa_connect_error_ = wsa_last_error;
519  return -1;
520  }
521  grpc_resolved_address wildcard4_addr;
522  grpc_resolved_address wildcard6_addr;
523  grpc_sockaddr_make_wildcards(0, &wildcard4_addr, &wildcard6_addr);
524  grpc_resolved_address* local_address = nullptr;
525  if (address_family_ == AF_INET) {
526  local_address = &wildcard4_addr;
527  } else {
528  local_address = &wildcard6_addr;
529  }
530  if (bind(s, (struct sockaddr*)local_address->addr,
531  (int)local_address->len) != 0) {
532  int wsa_last_error = WSAGetLastError();
533  wsa_error_ctx->SetWSAError(wsa_last_error);
534  char* msg = gpr_format_message(wsa_last_error);
535  GRPC_CARES_TRACE_LOG("fd:%s bind error code:%d msg:|%s|", GetName(),
536  wsa_last_error, msg);
537  gpr_free(msg);
538  connect_done_ = true;
539  wsa_connect_error_ = wsa_last_error;
540  return -1;
541  }
542  int out = 0;
543  if (ConnectEx(s, target, target_len, nullptr, 0, nullptr,
544  &winsocket_->write_info.overlapped) == 0) {
545  out = -1;
546  int wsa_last_error = WSAGetLastError();
547  wsa_error_ctx->SetWSAError(wsa_last_error);
548  char* msg = gpr_format_message(wsa_last_error);
549  GRPC_CARES_TRACE_LOG("fd:%s ConnectEx error code:%d msg:|%s|", GetName(),
550  wsa_last_error, msg);
551  gpr_free(msg);
552  if (wsa_last_error == WSA_IO_PENDING) {
553  // c-ares only understands WSAEINPROGRESS and EWOULDBLOCK error codes on
554  // connect, but an async connect on IOCP socket will give
555  // WSA_IO_PENDING, so we need to convert.
556  wsa_error_ctx->SetWSAError(WSAEWOULDBLOCK);
557  } else {
558  // By returning a non-retryable error to c-ares at this point,
559  // we're aborting the possibility of any future operations on this fd.
560  connect_done_ = true;
561  wsa_connect_error_ = wsa_last_error;
562  return -1;
563  }
564  }
565  grpc_socket_notify_on_write(winsocket_, &on_tcp_connect_locked_);
566  return out;
567  }
568 
569  static void OnIocpReadable(void* arg, grpc_error_handle error) {
570  GrpcPolledFdWindows* polled_fd = static_cast<GrpcPolledFdWindows*>(arg);
571  (void)GRPC_ERROR_REF(error);
572  MutexLock lock(polled_fd->mu_);
573  polled_fd->OnIocpReadableLocked(error);
574  }
575 
576  // TODO(apolcyn): improve this error handling to be less conversative.
577  // An e.g. ECONNRESET error here should result in errors when
578  // c-ares reads from this socket later, but it shouldn't necessarily cancel
579  // the entire resolution attempt. Doing so will allow the "inject broken
580  // nameserver list" test to pass on Windows.
581  void OnIocpReadableLocked(grpc_error_handle error) {
582  if (GRPC_ERROR_IS_NONE(error)) {
583  if (winsocket_->read_info.wsa_error != 0) {
584  /* WSAEMSGSIZE would be due to receiving more data
585  * than our read buffer's fixed capacity. Assume that
586  * the connection is TCP and read the leftovers
587  * in subsequent c-ares reads. */
588  if (winsocket_->read_info.wsa_error != WSAEMSGSIZE) {
589  error = GRPC_WSA_ERROR(winsocket_->read_info.wsa_error,
590  "OnIocpReadableInner");
592  "fd:|%s| OnIocpReadableInner winsocket_->read_info.wsa_error "
593  "code:|%d| msg:|%s|",
594  GetName(), winsocket_->read_info.wsa_error,
596  }
597  }
598  }
599  if (GRPC_ERROR_IS_NONE(error)) {
600  read_buf_ = grpc_slice_sub_no_ref(
601  read_buf_, 0, winsocket_->read_info.bytes_transferred);
602  read_buf_has_data_ = true;
603  } else {
604  grpc_slice_unref_internal(read_buf_);
605  read_buf_ = grpc_empty_slice();
606  }
608  "fd:|%s| OnIocpReadable finishing. read buf length now:|%d|", GetName(),
609  GRPC_SLICE_LENGTH(read_buf_));
610  ScheduleAndNullReadClosure(error);
611  }
612 
613  static void OnIocpWriteable(void* arg, grpc_error_handle error) {
614  GrpcPolledFdWindows* polled_fd = static_cast<GrpcPolledFdWindows*>(arg);
615  (void)GRPC_ERROR_REF(error);
616  MutexLock lock(polled_fd->mu_);
617  polled_fd->OnIocpWriteableLocked(error);
618  }
619 
620  void OnIocpWriteableLocked(grpc_error_handle error) {
621  GRPC_CARES_TRACE_LOG("OnIocpWriteableInner. fd:|%s|", GetName());
622  GPR_ASSERT(socket_type_ == SOCK_STREAM);
623  if (GRPC_ERROR_IS_NONE(error)) {
624  if (winsocket_->write_info.wsa_error != 0) {
625  error = GRPC_WSA_ERROR(winsocket_->write_info.wsa_error,
626  "OnIocpWriteableInner");
628  "fd:|%s| OnIocpWriteableInner. winsocket_->write_info.wsa_error "
629  "code:|%d| msg:|%s|",
630  GetName(), winsocket_->write_info.wsa_error,
632  }
633  }
634  GPR_ASSERT(tcp_write_state_ == WRITE_PENDING);
635  if (GRPC_ERROR_IS_NONE(error)) {
636  tcp_write_state_ = WRITE_WAITING_FOR_VERIFICATION_UPON_RETRY;
637  write_buf_ = grpc_slice_sub_no_ref(
638  write_buf_, 0, winsocket_->write_info.bytes_transferred);
639  GRPC_CARES_TRACE_LOG("fd:|%s| OnIocpWriteableInner. bytes transferred:%d",
640  GetName(), winsocket_->write_info.bytes_transferred);
641  } else {
642  grpc_slice_unref_internal(write_buf_);
643  write_buf_ = grpc_empty_slice();
644  }
645  ScheduleAndNullWriteClosure(error);
646  }
647 
648  bool gotten_into_driver_list() const { return gotten_into_driver_list_; }
649  void set_gotten_into_driver_list() { gotten_into_driver_list_ = true; }
650 
651  private:
652  Mutex* mu_;
653  char recv_from_source_addr_[200];
654  ares_socklen_t recv_from_source_addr_len_;
655  grpc_slice read_buf_;
656  bool read_buf_has_data_ = false;
657  grpc_slice write_buf_;
658  grpc_closure* read_closure_ = nullptr;
659  grpc_closure* write_closure_ = nullptr;
660  grpc_closure outer_read_closure_;
661  grpc_closure outer_write_closure_;
662  grpc_winsocket* winsocket_;
663  // tcp_write_state_ is only used on TCP GrpcPolledFds
664  WriteState tcp_write_state_;
665  const std::string name_;
666  bool gotten_into_driver_list_;
667  int address_family_;
668  int socket_type_;
669  grpc_closure on_tcp_connect_locked_;
670  bool connect_done_ = false;
671  int wsa_connect_error_ = 0;
672  // We don't run register_for_{readable,writeable} logic until
673  // a socket is connected. In the interim, we queue readable/writeable
674  // registrations with the following state.
675  bool pending_continue_register_for_on_readable_locked_ = false;
676  bool pending_continue_register_for_on_writeable_locked_ = false;
677 };
678 
679 struct SockToPolledFdEntry {
680  SockToPolledFdEntry(SOCKET s, GrpcPolledFdWindows* fd)
681  : socket(s), polled_fd(fd) {}
682  SOCKET socket;
683  GrpcPolledFdWindows* polled_fd;
684  SockToPolledFdEntry* next = nullptr;
685 };
686 
687 /* A SockToPolledFdMap can make ares_socket_t types (SOCKET's on windows)
688  * to GrpcPolledFdWindow's, and is used to find the appropriate
689  * GrpcPolledFdWindows to handle a virtual socket call when c-ares makes that
690  * socket call on the ares_socket_t type. Instances are owned by and one-to-one
691  * with a GrpcPolledFdWindows factory and event driver */
692 class SockToPolledFdMap {
693  public:
694  explicit SockToPolledFdMap(Mutex* mu) : mu_(mu) {}
695 
696  ~SockToPolledFdMap() { GPR_ASSERT(head_ == nullptr); }
697 
698  void AddNewSocket(SOCKET s, GrpcPolledFdWindows* polled_fd) {
699  SockToPolledFdEntry* new_node = new SockToPolledFdEntry(s, polled_fd);
700  new_node->next = head_;
701  head_ = new_node;
702  }
703 
704  GrpcPolledFdWindows* LookupPolledFd(SOCKET s) {
705  for (SockToPolledFdEntry* node = head_; node != nullptr;
706  node = node->next) {
707  if (node->socket == s) {
708  GPR_ASSERT(node->polled_fd != nullptr);
709  return node->polled_fd;
710  }
711  }
712  abort();
713  }
714 
715  void RemoveEntry(SOCKET s) {
716  GPR_ASSERT(head_ != nullptr);
717  SockToPolledFdEntry** prev = &head_;
718  for (SockToPolledFdEntry* node = head_; node != nullptr;
719  node = node->next) {
720  if (node->socket == s) {
721  *prev = node->next;
722  delete node;
723  return;
724  }
725  prev = &node->next;
726  }
727  abort();
728  }
729 
730  /* These virtual socket functions are called from within the c-ares
731  * library. These methods generally dispatch those socket calls to the
732  * appropriate methods. The virtual "socket" and "close" methods are
733  * special and instead create/add and remove/destroy GrpcPolledFdWindows
734  * objects.
735  */
736  static ares_socket_t Socket(int af, int type, int protocol, void* user_data) {
737  if (type != SOCK_DGRAM && type != SOCK_STREAM) {
738  GRPC_CARES_TRACE_LOG("Socket called with invalid socket type:%d", type);
739  return INVALID_SOCKET;
740  }
741  SockToPolledFdMap* map = static_cast<SockToPolledFdMap*>(user_data);
742  SOCKET s = WSASocket(af, type, protocol, nullptr, 0,
743  grpc_get_default_wsa_socket_flags());
744  if (s == INVALID_SOCKET) {
746  "WSASocket failed with params af:%d type:%d protocol:%d", af, type,
747  protocol);
748  return s;
749  }
750  grpc_tcp_set_non_block(s);
751  GrpcPolledFdWindows* polled_fd =
752  new GrpcPolledFdWindows(s, map->mu_, af, type);
754  "fd:|%s| created with params af:%d type:%d protocol:%d",
755  polled_fd->GetName(), af, type, protocol);
756  map->AddNewSocket(s, polled_fd);
757  return s;
758  }
759 
760  static int Connect(ares_socket_t as, const struct sockaddr* target,
761  ares_socklen_t target_len, void* user_data) {
762  WSAErrorContext wsa_error_ctx;
763  SockToPolledFdMap* map = static_cast<SockToPolledFdMap*>(user_data);
764  GrpcPolledFdWindows* polled_fd = map->LookupPolledFd(as);
765  return polled_fd->Connect(&wsa_error_ctx, target, target_len);
766  }
767 
768  static ares_ssize_t SendV(ares_socket_t as, const struct iovec* iov,
769  int iovec_count, void* user_data) {
770  WSAErrorContext wsa_error_ctx;
771  SockToPolledFdMap* map = static_cast<SockToPolledFdMap*>(user_data);
772  GrpcPolledFdWindows* polled_fd = map->LookupPolledFd(as);
773  return polled_fd->SendV(&wsa_error_ctx, iov, iovec_count);
774  }
775 
776  static ares_ssize_t RecvFrom(ares_socket_t as, void* data, size_t data_len,
777  int flags, struct sockaddr* from,
778  ares_socklen_t* from_len, void* user_data) {
779  WSAErrorContext wsa_error_ctx;
780  SockToPolledFdMap* map = static_cast<SockToPolledFdMap*>(user_data);
781  GrpcPolledFdWindows* polled_fd = map->LookupPolledFd(as);
782  return polled_fd->RecvFrom(&wsa_error_ctx, data, data_len, flags, from,
783  from_len);
784  }
785 
786  static int CloseSocket(SOCKET s, void* user_data) {
787  SockToPolledFdMap* map = static_cast<SockToPolledFdMap*>(user_data);
788  GrpcPolledFdWindows* polled_fd = map->LookupPolledFd(s);
789  map->RemoveEntry(s);
790  // See https://github.com/grpc/grpc/pull/20284, this trace log is
791  // intentionally placed to attempt to trigger a crash in case of a
792  // use after free on polled_fd.
793  GRPC_CARES_TRACE_LOG("CloseSocket called for socket: %s",
794  polled_fd->GetName());
795  // If a gRPC polled fd has not made it in to the driver's list yet, then
796  // the driver has not and will never see this socket.
797  if (!polled_fd->gotten_into_driver_list()) {
798  polled_fd->ShutdownLocked(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
799  "Shut down c-ares fd before without it ever having made it into the "
800  "driver's list"));
801  }
802  delete polled_fd;
803  return 0;
804  }
805 
806  private:
807  Mutex* mu_;
808  SockToPolledFdEntry* head_ = nullptr;
809 };
810 
811 const struct ares_socket_functions custom_ares_sock_funcs = {
812  &SockToPolledFdMap::Socket /* socket */,
813  &SockToPolledFdMap::CloseSocket /* close */,
814  &SockToPolledFdMap::Connect /* connect */,
815  &SockToPolledFdMap::RecvFrom /* recvfrom */,
816  &SockToPolledFdMap::SendV /* sendv */,
817 };
818 
819 /* A thin wrapper over a GrpcPolledFdWindows object but with a shorter
820  lifetime. This object releases it's GrpcPolledFdWindows upon destruction,
821  so that c-ares can close it via usual socket teardown. */
822 class GrpcPolledFdWindowsWrapper : public GrpcPolledFd {
823  public:
824  explicit GrpcPolledFdWindowsWrapper(GrpcPolledFdWindows* wrapped)
825  : wrapped_(wrapped) {}
826 
827  ~GrpcPolledFdWindowsWrapper() {}
828 
829  void RegisterForOnReadableLocked(grpc_closure* read_closure) override {
830  wrapped_->RegisterForOnReadableLocked(read_closure);
831  }
832 
833  void RegisterForOnWriteableLocked(grpc_closure* write_closure) override {
834  wrapped_->RegisterForOnWriteableLocked(write_closure);
835  }
836 
837  bool IsFdStillReadableLocked() override {
838  return wrapped_->IsFdStillReadableLocked();
839  }
840 
841  void ShutdownLocked(grpc_error_handle error) override {
842  wrapped_->ShutdownLocked(error);
843  }
844 
845  ares_socket_t GetWrappedAresSocketLocked() override {
846  return wrapped_->GetWrappedAresSocketLocked();
847  }
848 
849  const char* GetName() const override { return wrapped_->GetName(); }
850 
851  private:
852  GrpcPolledFdWindows* const wrapped_;
853 };
854 
855 class GrpcPolledFdFactoryWindows : public GrpcPolledFdFactory {
856  public:
857  explicit GrpcPolledFdFactoryWindows(Mutex* mu) : sock_to_polled_fd_map_(mu) {}
858 
859  GrpcPolledFd* NewGrpcPolledFdLocked(
860  ares_socket_t as, grpc_pollset_set* driver_pollset_set) override {
861  GrpcPolledFdWindows* polled_fd = sock_to_polled_fd_map_.LookupPolledFd(as);
862  // Set a flag so that the virtual socket "close" method knows it
863  // doesn't need to call ShutdownLocked, since now the driver will.
864  polled_fd->set_gotten_into_driver_list();
865  return new GrpcPolledFdWindowsWrapper(polled_fd);
866  }
867 
868  void ConfigureAresChannelLocked(ares_channel channel) override {
869  ares_set_socket_functions(channel, &custom_ares_sock_funcs,
870  &sock_to_polled_fd_map_);
871  }
872 
873  private:
874  SockToPolledFdMap sock_to_polled_fd_map_;
875 };
876 
877 std::unique_ptr<GrpcPolledFdFactory> NewGrpcPolledFdFactory(Mutex* mu) {
878  return absl::make_unique<GrpcPolledFdFactoryWindows>(mu);
879 }
880 
881 } // namespace grpc_core
882 
883 #endif /* GRPC_ARES == 1 && defined(GPR_WINDOWS) */
GRPC_CLOSURE_INIT
#define GRPC_CLOSURE_INIT(closure, cb, cb_arg, scheduler)
Definition: closure.h:115
gen_build_yaml.out
dictionary out
Definition: src/benchmark/gen_build_yaml.py:24
GRPC_ERROR_NONE
#define GRPC_ERROR_NONE
Definition: error.h:234
log.h
sockaddr_utils.h
LPFN_CONNECTEX
BOOL(PASCAL * LPFN_CONNECTEX)(SOCKET s, const struct sockaddr *name, int namelen, PVOID lpSendBuffer, DWORD dwSendDataLength, LPDWORD lpdwBytesSent, LPOVERLAPPED lpOverlapped)
Definition: win.h:140
MutexLock
#define MutexLock(x)
Definition: bloaty/third_party/re2/util/mutex.h:125
Connect
static int Connect(uint16_t port)
Definition: bssl_shim.cc:98
memset
return memset(p, 0, total)
absl::StrFormat
ABSL_MUST_USE_RESULT std::string StrFormat(const FormatSpec< Args... > &format, const Args &... args)
Definition: abseil-cpp/absl/strings/str_format.h:338
false
#define false
Definition: setup_once.h:323
ares.h
grpc_core
Definition: call_metric_recorder.h:31
total
size_t total
Definition: cord_analysis.cc:59
grpc_pollset_set
struct grpc_pollset_set grpc_pollset_set
Definition: iomgr_fwd.h:23
string.h
buf
voidpf void * buf
Definition: bloaty/third_party/zlib/contrib/minizip/ioapi.h:136
gpr_free
GPRAPI void gpr_free(void *ptr)
Definition: alloc.cc:51
GRPC_SLICE_MALLOC
#define GRPC_SLICE_MALLOC(len)
Definition: include/grpc/slice.h:70
testing::internal::string
::std::string string
Definition: bloaty/third_party/protobuf/third_party/googletest/googletest/include/gtest/internal/gtest-port.h:881
error
grpc_error_handle error
Definition: retry_filter.cc:499
GRPC_CARES_TRACE_LOG
#define GRPC_CARES_TRACE_LOG(format,...)
Definition: grpc_ares_wrapper.h:47
grpc_sockaddr_make_wildcards
void grpc_sockaddr_make_wildcards(int port, grpc_resolved_address *wild4_out, grpc_resolved_address *wild6_out)
Definition: sockaddr_utils.cc:166
protocol
Protocol protocol
Definition: client_callback_end2end_test.cc:67
gpr_format_message
GPRAPI char * gpr_format_message(int messageid)
grpc_resolved_address
Definition: resolved_address.h:34
socket_windows.h
absl::FormatConversionChar::s
@ s
time.h
name_
const std::string name_
Definition: priority.cc:233
grpc_slice_sub_no_ref
GPRAPI grpc_slice grpc_slice_sub_no_ref(grpc_slice s, size_t begin, size_t end)
Definition: slice/slice.cc:264
setup.k
k
Definition: third_party/bloaty/third_party/capstone/bindings/python/setup.py:42
map
zval * map
Definition: php/ext/google/protobuf/encode_decode.c:480
grpc_ares_ev_driver.h
iocp_windows.h
BOOL
int BOOL
Definition: undname.c:46
mu_
Mutex mu_
Definition: oob_backend_metric.cc:115
iov
static uv_buf_t iov
Definition: libuv/docs/code/uvcat/main.c:15
memory.h
from
size_t from
Definition: abseil-cpp/absl/container/internal/layout_test.cc:1384
DEBUG_LOCATION
#define DEBUG_LOCATION
Definition: debug_location.h:41
GRPC_WSA_ERROR
#define GRPC_WSA_ERROR(err, call_name)
windows only: create an error associated with WSAGetLastError()!=0
Definition: error.h:357
string_util.h
memcpy
memcpy(mem, inblock.get(), min(CONTAINING_RECORD(inblock.get(), MEMBLOCK, data) ->size, size))
ares_set_socket_functions
CARES_EXTERN void ares_set_socket_functions(ares_channel channel, const struct ares_socket_functions *funcs, void *user_data)
Definition: ares_init.c:2606
channel
wrapped_grpc_channel * channel
Definition: src/php/ext/grpc/call.h:33
GPR_ASSERT
#define GPR_ASSERT(x)
Definition: include/grpc/impl/codegen/log.h:94
gen_stats_data.c_str
def c_str(s, encoding='ascii')
Definition: gen_stats_data.py:38
mu
Mutex mu
Definition: server_config_selector_filter.cc:74
grpc_ares_wrapper.h
bytes_read
static size_t bytes_read
Definition: test-ipc-heavy-traffic-deadlock-bug.c:47
GRPC_SLICE_START_PTR
#define GRPC_SLICE_START_PTR(slice)
Definition: include/grpc/impl/codegen/slice.h:101
arg
Definition: cmdline.cc:40
grpc_empty_slice
GPRAPI grpc_slice grpc_empty_slice(void)
Definition: slice/slice.cc:42
grpc_slice
Definition: include/grpc/impl/codegen/slice.h:65
grpc_resolved_address::len
socklen_t len
Definition: resolved_address.h:36
data
char data[kBufferLength]
Definition: abseil-cpp/absl/strings/internal/str_format/float_conversion.cc:1006
buffer
char buffer[1024]
Definition: libuv/docs/code/idle-compute/main.c:8
slice_internal.h
msg
std::string msg
Definition: client_interceptors_end2end_test.cc:372
GRPC_ERROR_CREATE_FROM_STATIC_STRING
#define GRPC_ERROR_CREATE_FROM_STATIC_STRING(desc)
Definition: error.h:291
GRPC_SLICE_LENGTH
#define GRPC_SLICE_LENGTH(slice)
Definition: include/grpc/impl/codegen/slice.h:104
tcp_windows.h
memory_diff.cur
def cur
Definition: memory_diff.py:83
ares_channeldata
Definition: ares_private.h:266
iovec
Definition: gsec.h:33
GRPC_ERROR_REF
#define GRPC_ERROR_REF(err)
Definition: error.h:261
wrapped
grpc_call * wrapped
Definition: src/php/ext/grpc/call.h:32
iovec::iov_len
size_t iov_len
Definition: gsec.h:35
absl::flags_internal
Definition: abseil-cpp/absl/flags/commandlineflag.h:40
framework.rpc.grpc_channelz.Socket
Socket
Definition: grpc_channelz.py:46
port.h
grpc_error_std_string
std::string grpc_error_std_string(grpc_error_handle error)
Definition: error.cc:944
alloc.h
google::protobuf.internal::Mutex
WrappedMutex Mutex
Definition: bloaty/third_party/protobuf/src/google/protobuf/stubs/mutex.h:113
next
AllocList * next[kMaxLevel]
Definition: abseil-cpp/absl/base/internal/low_level_alloc.cc:100
ares_socket_t
int ares_socket_t
Definition: ares.h:229
log_windows.h
sockaddr_windows.h
arg
struct arg arg
new_node
static test_node * new_node(size_t i, size_t *ctr)
Definition: mpscq_test.cc:40
google::protobuf::python::message_descriptor::GetName
static PyObject * GetName(PyBaseDescriptor *self, void *closure)
Definition: bloaty/third_party/protobuf/python/google/protobuf/pyext/descriptor.cc:471
grpc_core::ExecCtx::Run
static void Run(const DebugLocation &location, grpc_closure *closure, grpc_error_handle error)
Definition: exec_ctx.cc:98
WSAID_CONNECTEX
#define WSAID_CONNECTEX
Definition: win.h:113
test_server.socket
socket
Definition: test_server.py:65
absl
Definition: abseil-cpp/absl/algorithm/algorithm.h:31
flags
uint32_t flags
Definition: retry_filter.cc:632
asyncio_get_stats.type
type
Definition: asyncio_get_stats.py:37
error_
grpc_error_handle error_
Definition: message_decompress_filter.cc:112
grpc_error
Definition: error_internal.h:42
iovec::iov_base
void * iov_base
Definition: gsec.h:34
grpc_core::NewGrpcPolledFdFactory
std::unique_ptr< GrpcPolledFdFactory > NewGrpcPolledFdFactory(Mutex *mu)
ares_ssize_t
CARES_TYPEOF_ARES_SSIZE_T ares_ssize_t
Definition: ares_build.h:210
grpc_closure
Definition: closure.h:56
grpc_resolved_address::addr
char addr[GRPC_MAX_SOCKADDR_SIZE]
Definition: resolved_address.h:35
bytes_sent
static size_t bytes_sent
Definition: test-tcp-writealot.c:44
setup.target
target
Definition: third_party/bloaty/third_party/protobuf/python/setup.py:179
i
uint64_t i
Definition: abseil-cpp/absl/container/btree_benchmark.cc:230
grpc_slice_unref_internal
void grpc_slice_unref_internal(const grpc_slice &slice)
Definition: slice_refcount.h:39
GRPC_ERROR_IS_NONE
#define GRPC_ERROR_IS_NONE(err)
Definition: error.h:241
port_platform.h
ares_socket_functions
Definition: ares.h:401


grpc
Author(s):
autogenerated on Fri May 16 2025 02:58:44