21 #if GRPC_ARES == 1 && defined(GRPC_WINDOWS_SOCKET_ARES_EV_DRIVER)
27 #include "absl/strings/str_format.h"
64 class WSAErrorContext {
66 explicit WSAErrorContext(){};
75 WSAErrorContext(
const WSAErrorContext&) =
delete;
76 WSAErrorContext& operator=(
const WSAErrorContext&) =
delete;
93 class GrpcPolledFdWindows {
99 WRITE_WAITING_FOR_VERIFICATION_UPON_RETRY,
107 tcp_write_state_(WRITE_IDLE),
109 gotten_into_driver_list_(
false),
110 address_family_(address_family),
111 socket_type_(socket_type) {
114 &GrpcPolledFdWindows::OnIocpReadable,
this,
115 grpc_schedule_on_exec_ctx);
117 &GrpcPolledFdWindows::OnIocpWriteable,
this,
118 grpc_schedule_on_exec_ctx);
120 &GrpcPolledFdWindows::OnTcpConnect,
this,
121 grpc_schedule_on_exec_ctx);
122 winsocket_ = grpc_winsocket_create(as,
name_.c_str());
125 ~GrpcPolledFdWindows() {
130 grpc_winsocket_destroy(winsocket_);
135 read_closure_ =
nullptr;
140 write_closure_ =
nullptr;
143 void RegisterForOnReadableLocked(
grpc_closure* read_closure) {
145 read_closure_ = read_closure;
151 ContinueRegisterForOnReadableLocked();
153 GPR_ASSERT(pending_continue_register_for_on_readable_locked_ ==
false);
154 pending_continue_register_for_on_readable_locked_ =
true;
158 void ContinueRegisterForOnReadableLocked() {
160 "fd:|%s| ContinueRegisterForOnReadableLocked "
161 "wsa_connect_error_:%d",
162 GetName(), wsa_connect_error_);
164 if (wsa_connect_error_ != 0) {
165 ScheduleAndNullReadClosure(
GRPC_WSA_ERROR(wsa_connect_error_,
"connect"));
171 memset(&winsocket_->read_info.overlapped, 0,
sizeof(OVERLAPPED));
172 recv_from_source_addr_len_ =
sizeof(recv_from_source_addr_);
174 if (WSARecvFrom(grpc_winsocket_wrapped_socket(winsocket_), &
buffer, 1,
175 nullptr, &
flags, (sockaddr*)recv_from_source_addr_,
176 &recv_from_source_addr_len_,
177 &winsocket_->read_info.overlapped,
nullptr)) {
178 int wsa_last_error = WSAGetLastError();
181 "fd:|%s| RegisterForOnReadableLocked WSARecvFrom error code:|%d| "
185 if (wsa_last_error != WSA_IO_PENDING) {
186 ScheduleAndNullReadClosure(
191 grpc_socket_notify_on_read(winsocket_, &outer_read_closure_);
194 void RegisterForOnWriteableLocked(
grpc_closure* write_closure) {
195 if (socket_type_ == SOCK_DGRAM) {
201 "fd:|%s| RegisterForOnWriteableLocked called tcp_write_state_: %d",
205 write_closure_ = write_closure;
207 ContinueRegisterForOnWriteableLocked();
209 GPR_ASSERT(pending_continue_register_for_on_writeable_locked_ ==
false);
210 pending_continue_register_for_on_writeable_locked_ =
true;
214 void ContinueRegisterForOnWriteableLocked() {
216 "fd:|%s| ContinueRegisterForOnWriteableLocked "
217 "wsa_connect_error_:%d",
218 GetName(), wsa_connect_error_);
220 if (wsa_connect_error_ != 0) {
221 ScheduleAndNullWriteClosure(
225 if (socket_type_ == SOCK_DGRAM) {
229 int wsa_error_code = 0;
230 switch (tcp_write_state_) {
234 case WRITE_REQUESTED:
235 tcp_write_state_ = WRITE_PENDING;
236 if (SendWriteBuf(
nullptr, &winsocket_->write_info.overlapped,
237 &wsa_error_code) != 0) {
238 ScheduleAndNullWriteClosure(
241 grpc_socket_notify_on_write(winsocket_, &outer_write_closure_);
245 case WRITE_WAITING_FOR_VERIFICATION_UPON_RETRY:
251 bool IsFdStillReadableLocked() {
return read_buf_has_data_; }
254 grpc_winsocket_shutdown(winsocket_);
258 return grpc_winsocket_wrapped_socket(winsocket_);
265 struct sockaddr*
from, ares_socklen_t* from_len) {
267 "fd:|%s| RecvFrom called read_buf_has_data:%d Current read buf "
270 if (!read_buf_has_data_) {
271 wsa_error_ctx->SetWSAError(WSAEWOULDBLOCK);
282 read_buf_has_data_ =
false;
286 if (
from !=
nullptr) {
287 GPR_ASSERT(*from_len <= recv_from_source_addr_len_);
288 memcpy(
from, &recv_from_source_addr_, recv_from_source_addr_len_);
289 *from_len = recv_from_source_addr_len_;
296 for (
int i = 0;
i < iov_count;
i++) {
301 for (
int i = 0;
i < iov_count;
i++) {
302 for (
int k = 0;
k <
iov[
i].iov_len;
k++) {
309 int SendWriteBuf(LPDWORD bytes_sent_ptr, LPWSAOVERLAPPED overlapped,
310 int* wsa_error_code) {
315 int out = WSASend(grpc_winsocket_wrapped_socket(winsocket_), &
buf, 1,
316 bytes_sent_ptr,
flags, overlapped,
nullptr);
317 *wsa_error_code = WSAGetLastError();
319 "fd:|%s| SendWriteBuf WSASend buf.len:%d *bytes_sent_ptr:%d "
321 "return:%d *wsa_error_code:%d",
322 GetName(),
buf.len, bytes_sent_ptr !=
nullptr ? *bytes_sent_ptr : 0,
323 overlapped,
out, *wsa_error_code);
330 "fd:|%s| SendV called connect_done_:%d wsa_connect_error_:%d",
331 GetName(), connect_done_, wsa_connect_error_);
332 if (!connect_done_) {
333 wsa_error_ctx->SetWSAError(WSAEWOULDBLOCK);
336 if (wsa_connect_error_ != 0) {
337 wsa_error_ctx->SetWSAError(wsa_connect_error_);
340 switch (socket_type_) {
342 return SendVUDP(wsa_error_ctx,
iov, iov_count);
344 return SendVTCP(wsa_error_ctx,
iov, iov_count);
358 write_buf_ = FlattenIovec(
iov, iov_count);
360 int wsa_error_code = 0;
361 if (SendWriteBuf(&
bytes_sent,
nullptr, &wsa_error_code) != 0) {
364 wsa_error_ctx->SetWSAError(wsa_error_code);
367 "fd:|%s| SendVUDP SendWriteBuf error code:%d msg:|%s|",
GetName(),
368 wsa_error_code,
msg);
386 switch (tcp_write_state_) {
388 tcp_write_state_ = WRITE_REQUESTED;
391 write_buf_ = FlattenIovec(
iov, iov_count);
392 wsa_error_ctx->SetWSAError(WSAEWOULDBLOCK);
394 case WRITE_REQUESTED:
396 wsa_error_ctx->SetWSAError(WSAEWOULDBLOCK);
398 case WRITE_WAITING_FOR_VERIFICATION_UPON_RETRY:
404 grpc_slice currently_attempted = FlattenIovec(
iov, iov_count);
414 tcp_write_state_ = WRITE_IDLE;
421 GrpcPolledFdWindows* grpc_polled_fd =
422 static_cast<GrpcPolledFdWindows*
>(
arg);
424 grpc_polled_fd->OnTcpConnectLocked(
error);
429 "fd:%s InnerOnTcpConnectLocked error:|%s| "
430 "pending_register_for_readable:%d"
431 " pending_register_for_writeable:%d",
433 pending_continue_register_for_on_readable_locked_,
434 pending_continue_register_for_on_writeable_locked_);
436 connect_done_ =
true;
439 DWORD transferred_bytes = 0;
442 WSAGetOverlappedResult(grpc_winsocket_wrapped_socket(winsocket_),
443 &winsocket_->write_info.overlapped,
444 &transferred_bytes, FALSE, &
flags);
447 wsa_connect_error_ = WSAGetLastError();
450 "fd:%s InnerOnTcpConnectLocked WSA overlapped result code:%d "
458 wsa_connect_error_ = WSA_OPERATION_ABORTED;
460 if (pending_continue_register_for_on_readable_locked_) {
461 ContinueRegisterForOnReadableLocked();
463 if (pending_continue_register_for_on_writeable_locked_) {
464 ContinueRegisterForOnWriteableLocked();
468 int Connect(WSAErrorContext* wsa_error_ctx,
const struct sockaddr*
target,
469 ares_socklen_t target_len) {
470 switch (socket_type_) {
472 return ConnectUDP(wsa_error_ctx,
target, target_len);
474 return ConnectTCP(wsa_error_ctx,
target, target_len);
480 int ConnectUDP(WSAErrorContext* wsa_error_ctx,
const struct sockaddr*
target,
481 ares_socklen_t target_len) {
485 SOCKET
s = grpc_winsocket_wrapped_socket(winsocket_);
487 WSAConnect(s,
target, target_len,
nullptr,
nullptr,
nullptr,
nullptr);
488 wsa_connect_error_ = WSAGetLastError();
489 wsa_error_ctx->SetWSAError(wsa_connect_error_);
490 connect_done_ =
true;
493 wsa_connect_error_,
msg);
496 return out == 0 ? 0 : -1;
499 int ConnectTCP(WSAErrorContext* wsa_error_ctx,
const struct sockaddr*
target,
500 ares_socklen_t target_len) {
504 DWORD ioctl_num_bytes;
505 SOCKET
s = grpc_winsocket_wrapped_socket(winsocket_);
506 if (WSAIoctl(s, SIO_GET_EXTENSION_FUNCTION_POINTER, &guid,
sizeof(guid),
507 &ConnectEx,
sizeof(ConnectEx), &ioctl_num_bytes,
nullptr,
509 int wsa_last_error = WSAGetLastError();
510 wsa_error_ctx->SetWSAError(wsa_last_error);
513 "fd:%s WSAIoctl(SIO_GET_EXTENSION_FUNCTION_POINTER) error code:%d "
517 connect_done_ =
true;
518 wsa_connect_error_ = wsa_last_error;
525 if (address_family_ == AF_INET) {
526 local_address = &wildcard4_addr;
528 local_address = &wildcard6_addr;
530 if (bind(s, (
struct sockaddr*)local_address->
addr,
531 (
int)local_address->
len) != 0) {
532 int wsa_last_error = WSAGetLastError();
533 wsa_error_ctx->SetWSAError(wsa_last_error);
536 wsa_last_error,
msg);
538 connect_done_ =
true;
539 wsa_connect_error_ = wsa_last_error;
543 if (ConnectEx(s,
target, target_len,
nullptr, 0,
nullptr,
544 &winsocket_->write_info.overlapped) == 0) {
546 int wsa_last_error = WSAGetLastError();
547 wsa_error_ctx->SetWSAError(wsa_last_error);
550 wsa_last_error,
msg);
552 if (wsa_last_error == WSA_IO_PENDING) {
556 wsa_error_ctx->SetWSAError(WSAEWOULDBLOCK);
560 connect_done_ =
true;
561 wsa_connect_error_ = wsa_last_error;
565 grpc_socket_notify_on_write(winsocket_, &on_tcp_connect_locked_);
570 GrpcPolledFdWindows* polled_fd =
static_cast<GrpcPolledFdWindows*
>(
arg);
573 polled_fd->OnIocpReadableLocked(
error);
583 if (winsocket_->read_info.wsa_error != 0) {
588 if (winsocket_->read_info.wsa_error != WSAEMSGSIZE) {
590 "OnIocpReadableInner");
592 "fd:|%s| OnIocpReadableInner winsocket_->read_info.wsa_error "
593 "code:|%d| msg:|%s|",
594 GetName(), winsocket_->read_info.wsa_error,
601 read_buf_, 0, winsocket_->read_info.bytes_transferred);
602 read_buf_has_data_ =
true;
608 "fd:|%s| OnIocpReadable finishing. read buf length now:|%d|",
GetName(),
610 ScheduleAndNullReadClosure(
error);
614 GrpcPolledFdWindows* polled_fd =
static_cast<GrpcPolledFdWindows*
>(
arg);
617 polled_fd->OnIocpWriteableLocked(
error);
624 if (winsocket_->write_info.wsa_error != 0) {
626 "OnIocpWriteableInner");
628 "fd:|%s| OnIocpWriteableInner. winsocket_->write_info.wsa_error "
629 "code:|%d| msg:|%s|",
630 GetName(), winsocket_->write_info.wsa_error,
634 GPR_ASSERT(tcp_write_state_ == WRITE_PENDING);
636 tcp_write_state_ = WRITE_WAITING_FOR_VERIFICATION_UPON_RETRY;
638 write_buf_, 0, winsocket_->write_info.bytes_transferred);
640 GetName(), winsocket_->write_info.bytes_transferred);
645 ScheduleAndNullWriteClosure(
error);
648 bool gotten_into_driver_list()
const {
return gotten_into_driver_list_; }
649 void set_gotten_into_driver_list() { gotten_into_driver_list_ =
true; }
653 char recv_from_source_addr_[200];
654 ares_socklen_t recv_from_source_addr_len_;
656 bool read_buf_has_data_ =
false;
662 grpc_winsocket* winsocket_;
664 WriteState tcp_write_state_;
666 bool gotten_into_driver_list_;
670 bool connect_done_ =
false;
671 int wsa_connect_error_ = 0;
675 bool pending_continue_register_for_on_readable_locked_ =
false;
676 bool pending_continue_register_for_on_writeable_locked_ =
false;
679 struct SockToPolledFdEntry {
680 SockToPolledFdEntry(SOCKET s, GrpcPolledFdWindows* fd)
683 GrpcPolledFdWindows* polled_fd;
684 SockToPolledFdEntry*
next =
nullptr;
692 class SockToPolledFdMap {
696 ~SockToPolledFdMap() {
GPR_ASSERT(head_ ==
nullptr); }
698 void AddNewSocket(SOCKET s, GrpcPolledFdWindows* polled_fd) {
699 SockToPolledFdEntry*
new_node =
new SockToPolledFdEntry(s, polled_fd);
704 GrpcPolledFdWindows* LookupPolledFd(SOCKET s) {
705 for (SockToPolledFdEntry* node = head_; node !=
nullptr;
707 if (node->socket == s) {
709 return node->polled_fd;
715 void RemoveEntry(SOCKET s) {
717 SockToPolledFdEntry** prev = &head_;
718 for (SockToPolledFdEntry* node = head_; node !=
nullptr;
720 if (node->socket == s) {
737 if (
type != SOCK_DGRAM &&
type != SOCK_STREAM) {
739 return INVALID_SOCKET;
741 SockToPolledFdMap*
map =
static_cast<SockToPolledFdMap*
>(user_data);
743 grpc_get_default_wsa_socket_flags());
744 if (s == INVALID_SOCKET) {
746 "WSASocket failed with params af:%d type:%d protocol:%d", af,
type,
750 grpc_tcp_set_non_block(s);
751 GrpcPolledFdWindows* polled_fd =
752 new GrpcPolledFdWindows(s,
map->mu_, af,
type);
754 "fd:|%s| created with params af:%d type:%d protocol:%d",
756 map->AddNewSocket(s, polled_fd);
761 ares_socklen_t target_len,
void* user_data) {
762 WSAErrorContext wsa_error_ctx;
763 SockToPolledFdMap*
map =
static_cast<SockToPolledFdMap*
>(user_data);
764 GrpcPolledFdWindows* polled_fd =
map->LookupPolledFd(as);
765 return polled_fd->Connect(&wsa_error_ctx,
target, target_len);
769 int iovec_count,
void* user_data) {
770 WSAErrorContext wsa_error_ctx;
771 SockToPolledFdMap*
map =
static_cast<SockToPolledFdMap*
>(user_data);
772 GrpcPolledFdWindows* polled_fd =
map->LookupPolledFd(as);
773 return polled_fd->SendV(&wsa_error_ctx,
iov, iovec_count);
778 ares_socklen_t* from_len,
void* user_data) {
779 WSAErrorContext wsa_error_ctx;
780 SockToPolledFdMap*
map =
static_cast<SockToPolledFdMap*
>(user_data);
781 GrpcPolledFdWindows* polled_fd =
map->LookupPolledFd(as);
782 return polled_fd->RecvFrom(&wsa_error_ctx,
data, data_len,
flags,
from,
786 static int CloseSocket(SOCKET s,
void* user_data) {
787 SockToPolledFdMap*
map =
static_cast<SockToPolledFdMap*
>(user_data);
788 GrpcPolledFdWindows* polled_fd =
map->LookupPolledFd(s);
794 polled_fd->GetName());
797 if (!polled_fd->gotten_into_driver_list()) {
799 "Shut down c-ares fd before without it ever having made it into the "
808 SockToPolledFdEntry* head_ =
nullptr;
813 &SockToPolledFdMap::CloseSocket ,
815 &SockToPolledFdMap::RecvFrom ,
816 &SockToPolledFdMap::SendV ,
822 class GrpcPolledFdWindowsWrapper :
public GrpcPolledFd {
824 explicit GrpcPolledFdWindowsWrapper(GrpcPolledFdWindows*
wrapped)
827 ~GrpcPolledFdWindowsWrapper() {}
829 void RegisterForOnReadableLocked(
grpc_closure* read_closure)
override {
830 wrapped_->RegisterForOnReadableLocked(read_closure);
833 void RegisterForOnWriteableLocked(
grpc_closure* write_closure)
override {
834 wrapped_->RegisterForOnWriteableLocked(write_closure);
837 bool IsFdStillReadableLocked()
override {
838 return wrapped_->IsFdStillReadableLocked();
842 wrapped_->ShutdownLocked(
error);
846 return wrapped_->GetWrappedAresSocketLocked();
849 const char*
GetName()
const override {
return wrapped_->GetName(); }
852 GrpcPolledFdWindows*
const wrapped_;
855 class GrpcPolledFdFactoryWindows :
public GrpcPolledFdFactory {
857 explicit GrpcPolledFdFactoryWindows(
Mutex*
mu) : sock_to_polled_fd_map_(
mu) {}
859 GrpcPolledFd* NewGrpcPolledFdLocked(
861 GrpcPolledFdWindows* polled_fd = sock_to_polled_fd_map_.LookupPolledFd(as);
864 polled_fd->set_gotten_into_driver_list();
865 return new GrpcPolledFdWindowsWrapper(polled_fd);
870 &sock_to_polled_fd_map_);
874 SockToPolledFdMap sock_to_polled_fd_map_;
878 return absl::make_unique<GrpcPolledFdFactoryWindows>(
mu);