pollset.cpp
Go to the documentation of this file.
1 /* SPDX-License-Identifier: MPL-2.0 */
2 
3 #include "precompiled.hpp"
4 #include "pollset.hpp"
5 #if defined ZMQ_IOTHREAD_POLLER_USE_POLLSET
6 
7 #include <stdlib.h>
8 #include <string.h>
9 #include <unistd.h>
10 #include <algorithm>
11 #include <new>
12 
13 #include "macros.hpp"
14 #include "err.hpp"
15 #include "config.hpp"
16 #include "i_poll_events.hpp"
17 
18 zmq::pollset_t::pollset_t (const zmq::thread_ctx_t &ctx_) :
19  ctx (ctx_), stopping (false)
20 {
21  pollset_fd = pollset_create (-1);
22  errno_assert (pollset_fd != -1);
23 }
24 
25 zmq::pollset_t::~pollset_t ()
26 {
27  // Wait till the worker thread exits.
28  worker.stop ();
29 
30  pollset_destroy (pollset_fd);
31  for (retired_t::iterator it = retired.begin (); it != retired.end (); ++it)
32  LIBZMQ_DELETE (*it);
33 }
34 
35 zmq::pollset_t::handle_t zmq::pollset_t::add_fd (fd_t fd_,
36  i_poll_events *events_)
37 {
38  poll_entry_t *pe = new (std::nothrow) poll_entry_t;
39  alloc_assert (pe);
40 
41  pe->fd = fd_;
42  pe->flag_pollin = false;
43  pe->flag_pollout = false;
44  pe->events = events_;
45 
46  struct poll_ctl pc;
47  pc.fd = fd_;
48  pc.cmd = PS_ADD;
49  pc.events = 0;
50 
51  int rc = pollset_ctl (pollset_fd, &pc, 1);
52  errno_assert (rc != -1);
53 
54  // Increase the load metric of the thread.
55  adjust_load (1);
56 
57  if (fd_ >= fd_table.size ()) {
58  fd_table.resize (fd_ + 1, NULL);
59  }
60  fd_table[fd_] = pe;
61  return pe;
62 }
63 
64 void zmq::pollset_t::rm_fd (handle_t handle_)
65 {
66  poll_entry_t *pe = (poll_entry_t *) handle_;
67 
68  struct poll_ctl pc;
69  pc.fd = pe->fd;
70  pc.cmd = PS_DELETE;
71  pc.events = 0;
72  pollset_ctl (pollset_fd, &pc, 1);
73 
74  fd_table[pe->fd] = NULL;
75 
76  pe->fd = retired_fd;
77  retired.push_back (pe);
78 
79  // Decrease the load metric of the thread.
80  adjust_load (-1);
81 }
82 
83 void zmq::pollset_t::set_pollin (handle_t handle_)
84 {
85  poll_entry_t *pe = (poll_entry_t *) handle_;
86  if (likely (!pe->flag_pollin)) {
87  struct poll_ctl pc;
88  pc.fd = pe->fd;
89  pc.cmd = PS_MOD;
90  pc.events = POLLIN;
91 
92  const int rc = pollset_ctl (pollset_fd, &pc, 1);
93  errno_assert (rc != -1);
94 
95  pe->flag_pollin = true;
96  }
97 }
98 
99 void zmq::pollset_t::reset_pollin (handle_t handle_)
100 {
101  poll_entry_t *pe = (poll_entry_t *) handle_;
102  if (unlikely (!pe->flag_pollin)) {
103  return;
104  }
105 
106  struct poll_ctl pc;
107  pc.fd = pe->fd;
108  pc.events = 0;
109 
110  pc.cmd = PS_DELETE;
111  int rc = pollset_ctl (pollset_fd, &pc, 1);
112 
113  if (pe->flag_pollout) {
114  pc.events = POLLOUT;
115  pc.cmd = PS_MOD;
116  rc = pollset_ctl (pollset_fd, &pc, 1);
117  errno_assert (rc != -1);
118  }
119 
120  pe->flag_pollin = false;
121 }
122 
123 void zmq::pollset_t::set_pollout (handle_t handle_)
124 {
125  poll_entry_t *pe = (poll_entry_t *) handle_;
126  if (likely (!pe->flag_pollout)) {
127  struct poll_ctl pc;
128  pc.fd = pe->fd;
129  pc.cmd = PS_MOD;
130  pc.events = POLLOUT;
131 
132  const int rc = pollset_ctl (pollset_fd, &pc, 1);
133  errno_assert (rc != -1);
134 
135  pe->flag_pollout = true;
136  }
137 }
138 
139 void zmq::pollset_t::reset_pollout (handle_t handle_)
140 {
141  poll_entry_t *pe = (poll_entry_t *) handle_;
142  if (unlikely (!pe->flag_pollout)) {
143  return;
144  }
145 
146  struct poll_ctl pc;
147  pc.fd = pe->fd;
148  pc.events = 0;
149 
150  pc.cmd = PS_DELETE;
151  int rc = pollset_ctl (pollset_fd, &pc, 1);
152  errno_assert (rc != -1);
153 
154  if (pe->flag_pollin) {
155  pc.cmd = PS_MOD;
156  pc.events = POLLIN;
157  rc = pollset_ctl (pollset_fd, &pc, 1);
158  errno_assert (rc != -1);
159  }
160  pe->flag_pollout = false;
161 }
162 
163 void zmq::pollset_t::start ()
164 {
165  ctx.start_thread (worker, worker_routine, this);
166 }
167 
168 void zmq::pollset_t::stop ()
169 {
170  stopping = true;
171 }
172 
173 int zmq::pollset_t::max_fds ()
174 {
175  return -1;
176 }
177 
178 void zmq::pollset_t::loop ()
179 {
180  struct pollfd polldata_array[max_io_events];
181 
182  while (!stopping) {
183  // Execute any due timers.
184  int timeout = (int) execute_timers ();
185 
186  // Wait for events.
187  int n = pollset_poll (pollset_fd, polldata_array, max_io_events,
188  timeout ? timeout : -1);
189  if (n == -1) {
190  errno_assert (errno == EINTR);
191  continue;
192  }
193 
194  for (int i = 0; i < n; i++) {
195  poll_entry_t *pe = fd_table[polldata_array[i].fd];
196  if (!pe)
197  continue;
198 
199  if (pe->fd == retired_fd)
200  continue;
201  if (polldata_array[i].revents & (POLLERR | POLLHUP))
202  pe->events->in_event ();
203  if (pe->fd == retired_fd)
204  continue;
205  if (polldata_array[i].revents & POLLOUT)
206  pe->events->out_event ();
207  if (pe->fd == retired_fd)
208  continue;
209  if (polldata_array[i].revents & POLLIN)
210  pe->events->in_event ();
211  }
212 
213  // Destroy retired event sources.
214  for (retired_t::iterator it = retired.begin (); it != retired.end ();
215  ++it)
216  LIBZMQ_DELETE (*it);
217  retired.clear ();
218  }
219 }
220 
221 void zmq::pollset_t::worker_routine (void *arg_)
222 {
223  ((pollset_t *) arg_)->loop ();
224 }
225 
226 #endif
LIBZMQ_DELETE
#define LIBZMQ_DELETE(p_object)
Definition: macros.hpp:7
i_poll_events.hpp
NULL
NULL
Definition: test_security_zap.cpp:405
EINTR
#define EINTR
Definition: errno.hpp:7
config.hpp
zmq::retired_fd
@ retired_fd
Definition: fd.hpp:32
pollset.hpp
precompiled.hpp
errno
int errno
alloc_assert
#define alloc_assert(x)
Definition: err.hpp:146
likely
#define likely(x)
Definition: likely.hpp:10
errno_assert
#define errno_assert(x)
Definition: err.hpp:113
macros.hpp
start
GLuint start
Definition: glcorearb.h:2858
worker
void worker(int num)
Definition: test_multithread.cpp:83
timeout
GLbitfield GLuint64 timeout
Definition: glcorearb.h:3588
n
GLdouble n
Definition: glcorearb.h:4153
i
int i
Definition: gmock-matchers_test.cc:764
zmq::thread_ctx_t
Definition: ctx.hpp:37
err.hpp
fd_t
zmq_fd_t fd_t
Definition: libzmq/tests/testutil.hpp:98
false
#define false
Definition: cJSON.c:70
it
MapIter it
Definition: php/ext/google/protobuf/map.c:205
zmq::max_io_events
@ max_io_events
Definition: config.hpp:32
unlikely
#define unlikely(x)
Definition: likely.hpp:11


libaditof
Author(s):
autogenerated on Wed May 21 2025 02:06:57