epoll.cpp
Go to the documentation of this file.
1 /* SPDX-License-Identifier: MPL-2.0 */
2 
3 #include "precompiled.hpp"
4 #if defined ZMQ_IOTHREAD_POLLER_USE_EPOLL
5 #include "epoll.hpp"
6 
7 #if !defined ZMQ_HAVE_WINDOWS
8 #include <unistd.h>
9 #endif
10 
11 #include <stdlib.h>
12 #include <string.h>
13 #include <signal.h>
14 #include <algorithm>
15 #include <new>
16 
17 #include "macros.hpp"
18 #include "err.hpp"
19 #include "config.hpp"
20 #include "i_poll_events.hpp"
21 
22 #ifdef ZMQ_HAVE_WINDOWS
23 const zmq::epoll_t::epoll_fd_t zmq::epoll_t::epoll_retired_fd =
24  INVALID_HANDLE_VALUE;
25 #endif
26 
27 zmq::epoll_t::epoll_t (const zmq::thread_ctx_t &ctx_) :
28  worker_poller_base_t (ctx_)
29 {
30 #ifdef ZMQ_IOTHREAD_POLLER_USE_EPOLL_CLOEXEC
31  // Setting this option result in sane behaviour when exec() functions
32  // are used. Old sockets are closed and don't block TCP ports, avoid
33  // leaks, etc.
34  _epoll_fd = epoll_create1 (EPOLL_CLOEXEC);
35 #else
36  _epoll_fd = epoll_create (1);
37 #endif
38  errno_assert (_epoll_fd != epoll_retired_fd);
39 }
40 
41 zmq::epoll_t::~epoll_t ()
42 {
43  // Wait till the worker thread exits.
44  stop_worker ();
45 
46 #ifdef ZMQ_HAVE_WINDOWS
47  epoll_close (_epoll_fd);
48 #else
49  close (_epoll_fd);
50 #endif
51  for (retired_t::iterator it = _retired.begin (), end = _retired.end ();
52  it != end; ++it) {
53  LIBZMQ_DELETE (*it);
54  }
55 }
56 
57 zmq::epoll_t::handle_t zmq::epoll_t::add_fd (fd_t fd_, i_poll_events *events_)
58 {
59  check_thread ();
60  poll_entry_t *pe = new (std::nothrow) poll_entry_t;
61  alloc_assert (pe);
62 
63  // The memset is not actually needed. It's here to prevent debugging
64  // tools to complain about using uninitialised memory.
65  memset (pe, 0, sizeof (poll_entry_t));
66 
67  pe->fd = fd_;
68  pe->ev.events = 0;
69  pe->ev.data.ptr = pe;
70  pe->events = events_;
71 
72  const int rc = epoll_ctl (_epoll_fd, EPOLL_CTL_ADD, fd_, &pe->ev);
73  errno_assert (rc != -1);
74 
75  // Increase the load metric of the thread.
76  adjust_load (1);
77 
78  return pe;
79 }
80 
81 void zmq::epoll_t::rm_fd (handle_t handle_)
82 {
83  check_thread ();
84  poll_entry_t *pe = static_cast<poll_entry_t *> (handle_);
85  const int rc = epoll_ctl (_epoll_fd, EPOLL_CTL_DEL, pe->fd, &pe->ev);
86  errno_assert (rc != -1);
87  pe->fd = retired_fd;
88  _retired.push_back (pe);
89 
90  // Decrease the load metric of the thread.
91  adjust_load (-1);
92 }
93 
94 void zmq::epoll_t::set_pollin (handle_t handle_)
95 {
96  check_thread ();
97  poll_entry_t *pe = static_cast<poll_entry_t *> (handle_);
98  pe->ev.events |= EPOLLIN;
99  const int rc = epoll_ctl (_epoll_fd, EPOLL_CTL_MOD, pe->fd, &pe->ev);
100  errno_assert (rc != -1);
101 }
102 
103 void zmq::epoll_t::reset_pollin (handle_t handle_)
104 {
105  check_thread ();
106  poll_entry_t *pe = static_cast<poll_entry_t *> (handle_);
107  pe->ev.events &= ~(static_cast<uint32_t> (EPOLLIN));
108  const int rc = epoll_ctl (_epoll_fd, EPOLL_CTL_MOD, pe->fd, &pe->ev);
109  errno_assert (rc != -1);
110 }
111 
112 void zmq::epoll_t::set_pollout (handle_t handle_)
113 {
114  check_thread ();
115  poll_entry_t *pe = static_cast<poll_entry_t *> (handle_);
116  pe->ev.events |= EPOLLOUT;
117  const int rc = epoll_ctl (_epoll_fd, EPOLL_CTL_MOD, pe->fd, &pe->ev);
118  errno_assert (rc != -1);
119 }
120 
121 void zmq::epoll_t::reset_pollout (handle_t handle_)
122 {
123  check_thread ();
124  poll_entry_t *pe = static_cast<poll_entry_t *> (handle_);
125  pe->ev.events &= ~(static_cast<uint32_t> (EPOLLOUT));
126  const int rc = epoll_ctl (_epoll_fd, EPOLL_CTL_MOD, pe->fd, &pe->ev);
127  errno_assert (rc != -1);
128 }
129 
130 void zmq::epoll_t::stop ()
131 {
132  check_thread ();
133 }
134 
135 int zmq::epoll_t::max_fds ()
136 {
137  return -1;
138 }
139 
140 void zmq::epoll_t::loop ()
141 {
142  epoll_event ev_buf[max_io_events];
143 
144  while (true) {
145  // Execute any due timers.
146  const int timeout = static_cast<int> (execute_timers ());
147 
148  if (get_load () == 0) {
149  if (timeout == 0)
150  break;
151 
152  // TODO sleep for timeout
153  continue;
154  }
155 
156  // Wait for events.
157  const int n = epoll_wait (_epoll_fd, &ev_buf[0], max_io_events,
158  timeout ? timeout : -1);
159  if (n == -1) {
160  errno_assert (errno == EINTR);
161  continue;
162  }
163 
164  for (int i = 0; i < n; i++) {
165  const poll_entry_t *const pe =
166  static_cast<const poll_entry_t *> (ev_buf[i].data.ptr);
167 
168  if (NULL == pe)
169  continue;
170  if (NULL == pe->events)
171  continue;
172  if (pe->fd == retired_fd)
173  continue;
174  if (ev_buf[i].events & (EPOLLERR | EPOLLHUP))
175  pe->events->in_event ();
176  if (pe->fd == retired_fd)
177  continue;
178  if (ev_buf[i].events & EPOLLOUT)
179  pe->events->out_event ();
180  if (pe->fd == retired_fd)
181  continue;
182  if (ev_buf[i].events & EPOLLIN)
183  pe->events->in_event ();
184  }
185 
186  // Destroy retired event sources.
187  for (retired_t::iterator it = _retired.begin (), end = _retired.end ();
188  it != end; ++it) {
189  LIBZMQ_DELETE (*it);
190  }
191  _retired.clear ();
192  }
193 }
194 
195 #endif
LIBZMQ_DELETE
#define LIBZMQ_DELETE(p_object)
Definition: macros.hpp:7
EPOLL_CTL_DEL
#define EPOLL_CTL_DEL
Definition: wepoll.c:68
i_poll_events.hpp
end
GLuint GLuint end
Definition: glcorearb.h:2858
NULL
NULL
Definition: test_security_zap.cpp:405
EINTR
#define EINTR
Definition: errno.hpp:7
config.hpp
precompiled.hpp
epoll_event::data
epoll_data_t data
Definition: wepoll.c:84
errno
int errno
epoll_create1
WEPOLL_EXPORT HANDLE epoll_create1(int flags)
Definition: wepoll.c:584
epoll_create
WEPOLL_EXPORT HANDLE epoll_create(int size)
Definition: wepoll.c:577
EPOLLOUT
#define EPOLLOUT
Definition: wepoll.c:55
epoll_data::ptr
void * ptr
Definition: wepoll.c:74
retired_fd
@ retired_fd
Definition: libzmq/tests/testutil.hpp:117
alloc_assert
#define alloc_assert(x)
Definition: err.hpp:146
errno_assert
#define errno_assert(x)
Definition: err.hpp:113
macros.hpp
EPOLLHUP
#define EPOLLHUP
Definition: wepoll.c:57
epoll_ctl
WEPOLL_EXPORT int epoll_ctl(HANDLE ephnd, int op, SOCKET sock, struct epoll_event *event)
Definition: wepoll.c:616
epoll.hpp
timeout
GLbitfield GLuint64 timeout
Definition: glcorearb.h:3588
EPOLL_CTL_ADD
#define EPOLL_CTL_ADD
Definition: wepoll.c:66
n
GLdouble n
Definition: glcorearb.h:4153
i
int i
Definition: gmock-matchers_test.cc:764
zmq::thread_ctx_t
Definition: ctx.hpp:37
EPOLL_CTL_MOD
#define EPOLL_CTL_MOD
Definition: wepoll.c:67
EPOLLERR
#define EPOLLERR
Definition: wepoll.c:56
epoll_event
Definition: wepoll.c:82
epoll_wait
WEPOLL_EXPORT int epoll_wait(HANDLE ephnd, struct epoll_event *events, int maxevents, int timeout)
Definition: wepoll.c:648
err.hpp
fd_t
zmq_fd_t fd_t
Definition: libzmq/tests/testutil.hpp:98
epoll_close
WEPOLL_EXPORT int epoll_close(HANDLE ephnd)
Definition: wepoll.c:591
EPOLLIN
#define EPOLLIN
Definition: wepoll.c:53
it
MapIter it
Definition: php/ext/google/protobuf/map.c:205
zmq::max_io_events
@ max_io_events
Definition: config.hpp:32


libaditof
Author(s):
autogenerated on Wed May 21 2025 02:06:51