Go to the documentation of this file.
31 #include <sys/types.h>
32 #include <sys/socket.h>
38 #if defined(__APPLE__)
39 # include <sys/event.h>
40 # include <sys/time.h>
41 # include <sys/select.h>
44 typedef struct uv__stream_select_s uv__stream_select_t;
46 struct uv__stream_select_s {
67 # define RETRY_ON_WRITE_ERROR(errno) (errno == EINTR || errno == EPROTOTYPE)
68 # define IS_TRANSIENT_WRITE_ERROR(errno, send_handle) \
69 (errno == EAGAIN || errno == EWOULDBLOCK || errno == ENOBUFS || \
70 (errno == EMSGSIZE && send_handle != NULL))
72 # define RETRY_ON_WRITE_ERROR(errno) (errno == EINTR)
73 # define IS_TRANSIENT_WRITE_ERROR(errno, send_handle) \
74 (errno == EAGAIN || errno == EWOULDBLOCK || errno == ENOBUFS)
94 stream->connection_cb = NULL;
95 stream->connect_req = NULL;
96 stream->shutdown_req = NULL;
102 stream->write_queue_size = 0;
104 if (
loop->emfile_fd == -1) {
115 #if defined(__APPLE__)
124 #if defined(__APPLE__)
126 uv__stream_select_t* s;
138 r =
write(s->fake_fd,
"x", 1);
139 while (
r == -1 && errno == EINTR);
148 #if defined(__APPLE__)
149 static void uv__stream_osx_select(
void*
arg) {
151 uv__stream_select_t*
s;
177 FD_SET(fd,
s->sread);
179 FD_SET(fd,
s->swrite);
180 FD_SET(
s->int_fd,
s->sread);
183 r = select(max_fd + 1,
s->sread,
s->swrite, NULL, NULL);
197 if (FD_ISSET(
s->int_fd,
s->sread))
201 if (
r ==
sizeof(
buf))
207 if (errno == EAGAIN || errno == EWOULDBLOCK)
218 if (FD_ISSET(fd,
s->sread))
220 if (FD_ISSET(fd,
s->swrite))
223 assert(events != 0 || FD_ISSET(
s->int_fd,
s->sread));
238 uv__stream_select_t*
s;
250 assert(events == (events & (POLLIN | POLLOUT)));
270 uv__stream_select_t*
s;
283 struct kevent filter[1];
284 struct kevent events[1];
286 uv__stream_select_t*
s;
298 perror(
"(libuv) kqueue()");
302 EV_SET(&filter[0], *fd, EVFILT_READ, EV_ADD | EV_ENABLE, 0, 0, 0);
309 ret = kevent(kq, filter, 1, events, 1, &
timeout);
310 while (
ret == -1 && errno == EINTR);
317 if (
ret == 0 || (events[0].
flags & EV_ERROR) == 0 || events[0].
data != EINVAL)
326 if (socketpair(AF_UNIX, SOCK_STREAM, 0, fds))
334 swrite_sz = sread_sz;
344 s->sread = (fd_set*) ((
char*)
s +
sizeof(*s));
345 s->sread_sz = sread_sz;
346 s->swrite = (fd_set*) ((
char*)
s->sread + sread_sz);
347 s->swrite_sz = swrite_sz;
351 goto failed_async_init;
358 goto failed_close_sem_init;
362 goto failed_async_sem_init;
374 goto failed_thread_create;
378 failed_thread_create:
385 failed_async_sem_init:
388 failed_close_sem_init:
407 #if defined(__APPLE__)
411 if (!(
stream->io_watcher.fd == -1 ||
stream->io_watcher.fd == fd))
417 if (
stream->type == UV_TCP) {
428 #if defined(__APPLE__)
430 if (setsockopt(fd, SOL_SOCKET, SO_OOBINLINE, &enable,
sizeof(enable)) &&
437 stream->io_watcher.fd = fd;
462 if (
stream->connect_req) {
464 stream->connect_req->cb(
stream->connect_req, UV_ECANCELED);
465 stream->connect_req = NULL;
471 if (
stream->shutdown_req) {
478 stream->shutdown_req->cb(
stream->shutdown_req, UV_ECANCELED);
479 stream->shutdown_req = NULL;
482 assert(
stream->write_queue_size == 0);
501 if (
loop->emfile_fd == -1)
505 loop->emfile_fd = -1;
511 }
while (
err >= 0 ||
err == UV_EINTR);
515 loop->emfile_fd = emfile_fd;
521 #if defined(UV_HAVE_KQUEUE)
522 # define UV_DEC_BACKLOG(w) w->rcount--;
524 # define UV_DEC_BACKLOG(w)
533 assert(events & POLLIN);
534 assert(
stream->accepted_fd == -1);
543 assert(
stream->accepted_fd == -1);
545 #if defined(UV_HAVE_KQUEUE)
555 if (
err == UV_ECONNABORTED)
558 if (
err == UV_EMFILE ||
err == UV_ENFILE) {
572 if (
stream->accepted_fd != -1) {
578 if (
stream->type == UV_TCP &&
581 struct timespec
timeout = { 0, 1 };
588 #undef UV_DEC_BACKLOG
596 if (
server->accepted_fd == -1)
628 if (
server->queued_fds != NULL) {
631 queued_fds =
server->queued_fds;
634 server->accepted_fd = queued_fds->
fds[0];
637 assert(queued_fds->
offset > 0);
638 if (--queued_fds->
offset == 0) {
640 server->queued_fds = NULL;
643 memmove(queued_fds->
fds,
645 queued_fds->
offset *
sizeof(*queued_fds->
fds));
691 assert(
stream->shutdown_req);
694 stream->shutdown_req = NULL;
722 assert(
req->bufs != NULL);
724 req->nbufs -
req->write_index);
743 assert(n <= stream->write_queue_size);
758 return req->write_index ==
req->nbufs;
774 if (
req->error == 0) {
775 if (
req->bufs !=
req->bufsml)
828 iovcnt =
req->nbufs -
req->write_index;
841 if (
req->send_handle) {
844 struct cmsghdr *cmsg;
847 struct cmsghdr alias;
859 assert(fd_to_send >= 0);
864 msg.msg_iovlen = iovcnt;
868 msg.msg_controllen = CMSG_SPACE(
sizeof(fd_to_send));
870 cmsg = CMSG_FIRSTHDR(&
msg);
871 cmsg->cmsg_level = SOL_SOCKET;
872 cmsg->cmsg_type = SCM_RIGHTS;
873 cmsg->cmsg_len = CMSG_LEN(
sizeof(fd_to_send));
877 void* pv = CMSG_DATA(cmsg);
888 req->send_handle = NULL;
944 if (
req->bufs != NULL) {
946 if (
req->bufs !=
req->bufsml)
959 struct sockaddr_storage ss;
964 memset(&ss, 0,
sizeof(ss));
967 if (getsockname(fd, (
struct sockaddr*)&ss, &sslen))
972 if (getsockopt(fd, SOL_SOCKET, SO_TYPE, &
type, &
len))
975 if (
type == SOCK_STREAM) {
976 #if defined(_AIX) || defined(__DragonFly__)
982 return UV_NAMED_PIPE;
984 switch (ss.ss_family) {
986 return UV_NAMED_PIPE;
993 if (
type == SOCK_DGRAM &&
994 (ss.ss_family == AF_INET || ss.ss_family ==
AF_INET6))
1014 unsigned int queue_size;
1016 queued_fds =
stream->queued_fds;
1017 if (queued_fds == NULL) {
1019 queued_fds =
uv__malloc((queue_size - 1) *
sizeof(*queued_fds->
fds) +
1020 sizeof(*queued_fds));
1021 if (queued_fds == NULL)
1023 queued_fds->
size = queue_size;
1025 stream->queued_fds = queued_fds;
1028 }
else if (queued_fds->
size == queued_fds->
offset) {
1029 queue_size = queued_fds->
size + 8;
1031 (queue_size - 1) *
sizeof(*queued_fds->
fds) +
1032 sizeof(*queued_fds));
1038 if (queued_fds == NULL)
1040 queued_fds->
size = queue_size;
1041 stream->queued_fds = queued_fds;
1045 queued_fds->
fds[queued_fds->
offset++] = fd;
1051 #if defined(__PASE__)
1053 # define UV__CMSG_FD_COUNT 60
1055 # define UV__CMSG_FD_COUNT 64
1057 #define UV__CMSG_FD_SIZE (UV__CMSG_FD_COUNT * sizeof(int))
1061 struct cmsghdr* cmsg;
1063 for (cmsg = CMSG_FIRSTHDR(
msg); cmsg != NULL; cmsg = CMSG_NXTHDR(
msg, cmsg)) {
1072 if (cmsg->cmsg_type != SCM_RIGHTS) {
1073 fprintf(
stderr,
"ignoring non-SCM_RIGHTS ancillary data: %d\n",
1079 pv = CMSG_DATA(cmsg);
1083 start = (
char*) cmsg;
1084 end = (
char*) cmsg + cmsg->cmsg_len;
1092 if (
stream->accepted_fd != -1) {
1111 # pragma clang diagnostic push
1112 # pragma clang diagnostic ignored "-Wgnu-folding-constant"
1113 # pragma clang diagnostic ignored "-Wvla-extension"
1140 assert(
stream->alloc_cb != NULL);
1144 if (
buf.base == NULL ||
buf.len == 0) {
1150 assert(
buf.base != NULL);
1157 while (nread < 0 && errno == EINTR);
1163 msg.msg_name = NULL;
1164 msg.msg_namelen = 0;
1166 msg.msg_controllen =
sizeof(cmsg_space);
1167 msg.msg_control = cmsg_space;
1172 while (nread < 0 && errno == EINTR);
1177 if (errno == EAGAIN || errno == EWOULDBLOCK) {
1184 #if defined(__CYGWIN__) || defined(__MSYS__)
1185 }
else if (errno == ECONNRESET &&
stream->type == UV_NAMED_PIPE) {
1201 }
else if (nread == 0) {
1216 #if defined(__MVS__)
1217 if (is_ipc &&
msg.msg_controllen > 0) {
1225 msg.msg_iov = (
struct iovec*) &blankbuf;
1235 }
while (nread == 0 &&
msg.msg_controllen > 0);
1242 if (nread < buflen) {
1252 # pragma clang diagnostic pop
1255 #undef UV__CMSG_FD_COUNT
1256 #undef UV__CMSG_FD_SIZE
1260 assert(
stream->type == UV_TCP ||
1261 stream->type == UV_TTY ||
1262 stream->type == UV_NAMED_PIPE);
1292 assert(
stream->type == UV_TCP ||
1293 stream->type == UV_NAMED_PIPE ||
1297 if (
stream->connect_req) {
1305 if (events & (POLLIN | POLLERR | POLLHUP))
1317 if ((events & POLLHUP) &&
1328 if (events & (POLLOUT | POLLERR | POLLHUP)) {
1347 socklen_t errorsize =
sizeof(
int);
1349 assert(
stream->type == UV_TCP ||
stream->type == UV_NAMED_PIPE);
1352 if (
stream->delayed_error) {
1358 stream->delayed_error = 0;
1373 stream->connect_req = NULL;
1402 assert((
stream->type == UV_TCP ||
1403 stream->type == UV_NAMED_PIPE ||
1404 stream->type == UV_TTY) &&
1405 "uv_write (unix) does not yet support other types of streams");
1426 #if defined(__CYGWIN__) || defined(__MSYS__)
1439 empty_queue = (
stream->write_queue_size == 0);
1446 req->send_handle = send_handle;
1453 if (
req->bufs == NULL)
1458 req->write_index = 0;
1468 if (
stream->connect_req) {
1471 else if (empty_queue) {
1509 unsigned int nbufs) {
1517 if (
stream->connect_req != NULL ||
stream->write_queue_size != 0)
1528 if (
req.bufs != NULL)
1532 written -= req_size;
1533 stream->write_queue_size -= req_size;
1538 if (
req.bufs !=
req.bufsml)
1548 if (written == 0 && req_size != 0)
1549 return req.error < 0 ?
req.error : UV_EAGAIN;
1558 assert(
stream->type == UV_TCP ||
stream->type == UV_NAMED_PIPE ||
1616 #if defined(__APPLE__)
1618 const uv__stream_select_t*
s;
1620 assert(
handle->type == UV_TCP ||
1621 handle->type == UV_TTY ||
1622 handle->type == UV_NAMED_PIPE);
1628 return handle->io_watcher.fd;
1637 #if defined(__APPLE__)
1639 if (
handle->select != NULL) {
1640 uv__stream_select_t* s;
1663 if (
handle->io_watcher.fd != -1) {
1665 if (
handle->io_watcher.fd > STDERR_FILENO)
1667 handle->io_watcher.fd = -1;
1670 if (
handle->accepted_fd != -1) {
1672 handle->accepted_fd = -1;
1676 if (
handle->queued_fds != NULL) {
1677 queued_fds =
handle->queued_fds;
1678 for (
i = 0;
i < queued_fds->
offset;
i++)
1681 handle->queued_fds = NULL;
#define uv__is_closing(h)
@ UV_HANDLE_TCP_KEEPALIVE
static void uv__read(uv_stream_t *stream)
UV_EXTERN int uv_sem_trywait(uv_sem_t *sem)
#define ARRAY_SIZE(array)
static void read_cb(uv_stream_t *stream, ssize_t nread, const uv_buf_t *buf)
int uv_shutdown(uv_shutdown_t *req, uv_stream_t *stream, uv_shutdown_cb cb)
return memset(p, 0, total)
void uv__stream_close(uv_stream_t *handle)
static int uv__stream_queue_fd(uv_stream_t *stream, int fd)
#define uv__req_init(loop, req, typ)
@ UV_HANDLE_TCP_SINGLE_ACCEPT
int uv_write(uv_write_t *req, uv_stream_t *handle, const uv_buf_t bufs[], unsigned int nbufs, uv_write_cb cb)
void * uv__malloc(size_t size)
int uv_read_stop(uv_stream_t *stream)
void uv__stream_flush_write_queue(uv_stream_t *stream, int error)
static void uv__write(uv_stream_t *stream)
void(* uv_read_cb)(uv_stream_t *stream, ssize_t nread, const uv_buf_t *buf)
static void alloc_cb(uv_handle_t *handle, size_t suggested_size, uv_buf_t *buf)
int uv__open_cloexec(const char *path, int flags)
int nanosleep(const struct timespec *req, struct timespec *rem)
UV_EXTERN int uv_thread_join(uv_thread_t *tid)
void uv__io_init(uv__io_t *w, uv__io_cb cb, int fd)
#define QUEUE_DATA(ptr, type, field)
#define container_of(ptr, type, member)
@ UV_HANDLE_BLOCKING_WRITES
#define uv__handle_unref(h)
int uv_tcp_listen(uv_tcp_t *tcp, int backlog, uv_connection_cb cb)
int uv_read_start(uv_stream_t *stream, uv_alloc_cb alloc_cb, uv_read_cb read_cb)
UV_EXTERN void uv_close(uv_handle_t *handle, uv_close_cb close_cb)
UV_REQ_FIELDS uv_connect_cb cb
void(* uv_alloc_cb)(uv_handle_t *handle, size_t suggested_size, uv_buf_t *buf)
static void uv__stream_osx_interrupt_select(uv_stream_t *stream)
memcpy(mem, inblock.get(), min(CONTAINING_RECORD(inblock.get(), MEMBLOCK, data) ->size, size))
static void uv__write_callbacks(uv_stream_t *stream)
#define ACCESS_ONCE(type, var)
int uv_accept(uv_stream_t *server, uv_stream_t *client)
int uv_try_write(uv_stream_t *stream, const uv_buf_t bufs[], unsigned int nbufs)
void uv__stream_destroy(uv_stream_t *stream)
static void uv__drain(uv_stream_t *stream)
int uv__io_active(const uv__io_t *w, unsigned int events)
int uv_pipe_listen(uv_pipe_t *handle, int backlog, uv_connection_cb cb)
void uv__io_start(uv_loop_t *loop, uv__io_t *w, unsigned int events)
int uv__tcp_keepalive(int fd, int on, unsigned int delay)
int uv_write2(uv_write_t *req, uv_stream_t *stream, const uv_buf_t bufs[], unsigned int nbufs, uv_stream_t *send_handle, uv_write_cb cb)
static void uv__stream_connect(uv_stream_t *)
#define UV_DEC_BACKLOG(w)
void(* uv_shutdown_cb)(uv_shutdown_t *req, int status)
#define writev(s, ptr, cnt)
UV_EXTERN int uv_thread_create(uv_thread_t *tid, uv_thread_cb entry, void *arg)
UV_PLATFORM_SEM_T uv_sem_t
void * uv__realloc(void *ptr, size_t size)
static void uv__stream_eof(uv_stream_t *stream, const uv_buf_t *buf)
#define IS_TRANSIENT_WRITE_ERROR(errno, send_handle)
ssize_t uv__recvmsg(int fd, struct msghdr *msg, int flags)
#define uv__handle_init(loop_, h, type_)
UV_EXTERN void uv_sem_post(uv_sem_t *sem)
static int uv__stream_recv_cmsg(uv_stream_t *stream, struct msghdr *msg)
UV_EXTERN int uv_sem_init(uv_sem_t *sem, unsigned int value)
#define RETRY_ON_WRITE_ERROR(errno)
uv_handle_type uv__handle_type(int fd)
int uv__accept(int sockfd)
UV_EXTERN int uv_async_init(uv_loop_t *, uv_async_t *async, uv_async_cb async_cb)
int read(izstream &zs, T *x, Items items)
void uv__io_feed(uv_loop_t *loop, uv__io_t *w)
void(* uv_write_cb)(uv_write_t *req, int status)
UV_EXTERN void uv_sem_destroy(uv_sem_t *sem)
UV_EXTERN int uv_udp_open(uv_udp_t *handle, uv_os_sock_t sock)
void uv__io_close(uv_loop_t *loop, uv__io_t *w)
UniquePtr< SSL_SESSION > ret
void(* uv_connection_cb)(uv_stream_t *server, int status)
#define uv__stream_fd(handle)
static ssize_t uv__writev(int fd, struct iovec *vec, size_t n)
static void uv__write_req_finish(uv_write_t *req)
int uv_is_readable(const uv_stream_t *stream)
static int uv__emfile_trick(uv_loop_t *loop, int accept_fd)
static void uv__stream_io(uv_loop_t *loop, uv__io_t *w, unsigned int events)
void uv__server_io(uv_loop_t *loop, uv__io_t *w, unsigned int events)
UV_EXTERN uv_buf_t uv_buf_init(char *base, unsigned int len)
int uv__tcp_nodelay(int fd, int on)
int uv__stream_open(uv_stream_t *stream, int fd, int flags)
void uv__io_stop(uv_loop_t *loop, uv__io_t *w, unsigned int events)
static int uv__handle_fd(uv_handle_t *handle)
int uv_stream_set_blocking(uv_stream_t *handle, int blocking)
void uv_try_write_cb(uv_write_t *req, int status)
size_t uv__count_bufs(const uv_buf_t bufs[], unsigned int nbufs)
static size_t uv__write_req_size(uv_write_t *req)
UV_EXTERN void uv_sem_wait(uv_sem_t *sem)
#define uv__handle_start(h)
void uv__stream_init(uv_loop_t *loop, uv_stream_t *stream, uv_handle_type type)
int uv_is_writable(const uv_stream_t *stream)
#define uv__handle_stop(h)
static uv_thread_t thread
#define QUEUE_INSERT_TAIL(h, q)
OPENSSL_EXPORT pem_password_cb * cb
#define uv__req_unregister(loop, req)
int uv_listen(uv_stream_t *stream, int backlog, uv_connection_cb cb)
static int uv__write_req_update(uv_stream_t *stream, uv_write_t *req, size_t n)
UV_EXTERN int uv_async_send(uv_async_t *async)
grpc
Author(s):
autogenerated on Thu Mar 13 2025 03:01:25