active_poller.cpp
Go to the documentation of this file.
1 #include <zmq_addon.hpp>
2 
3 #include "testutil.hpp"
4 
5 #if defined(ZMQ_CPP11) && !defined(ZMQ_CPP11_PARTIAL) && defined(ZMQ_BUILD_DRAFT_API)
6 
7 #include <array>
8 #include <memory>
9 #include <cstring>
10 
11 #if !defined(_WIN32)
12  #include <unistd.h>
13 #endif // !_WIN32
14 
15 TEST_CASE("create destroy", "[active_poller]")
16 {
17  zmq::active_poller_t active_poller;
18  CHECK(active_poller.empty());
19 }
20 
22  "active_poller_t should not be copy-constructible");
24  "active_poller_t should not be copy-assignable");
25 
26 static const zmq::active_poller_t::handler_type no_op_handler =
27  [](zmq::event_flags) {};
28 
29 TEST_CASE("move construct empty", "[active_poller]")
30 {
31  zmq::active_poller_t a;
32  CHECK(a.empty());
33  zmq::active_poller_t b = std::move(a);
34  CHECK(b.empty());
35  CHECK(0u == a.size());
36  CHECK(0u == b.size());
37 }
38 
39 TEST_CASE("move assign empty", "[active_poller]")
40 {
41  zmq::active_poller_t a;
42  CHECK(a.empty());
43  zmq::active_poller_t b;
44  CHECK(b.empty());
45  b = std::move(a);
46  CHECK(0u == a.size());
47  CHECK(0u == b.size());
48  CHECK(a.empty());
49  CHECK(b.empty());
50 }
51 
52 TEST_CASE("move construct non empty", "[active_poller]")
53 {
54  zmq::context_t context;
55  zmq::socket_t socket{context, zmq::socket_type::router};
56 
57  zmq::active_poller_t a;
58  a.add(socket, zmq::event_flags::pollin, [](zmq::event_flags) {});
59  CHECK_FALSE(a.empty());
60  CHECK(1u == a.size());
61  zmq::active_poller_t b = std::move(a);
62  CHECK(a.empty());
63  CHECK(0u == a.size());
64  CHECK_FALSE(b.empty());
65  CHECK(1u == b.size());
66 }
67 
68 TEST_CASE("move assign non empty", "[active_poller]")
69 {
70  zmq::context_t context;
71  zmq::socket_t socket{context, zmq::socket_type::router};
72 
73  zmq::active_poller_t a;
74  a.add(socket, zmq::event_flags::pollin, no_op_handler);
75  CHECK_FALSE(a.empty());
76  CHECK(1u == a.size());
77  zmq::active_poller_t b;
78  b = std::move(a);
79  CHECK(a.empty());
80  CHECK(0u == a.size());
81  CHECK_FALSE(b.empty());
82  CHECK(1u == b.size());
83 }
84 
85 TEST_CASE("add handler", "[active_poller]")
86 {
87  zmq::context_t context;
88  zmq::socket_t socket{context, zmq::socket_type::router};
89  zmq::active_poller_t active_poller;
90  CHECK_NOTHROW(
91  active_poller.add(socket, zmq::event_flags::pollin, no_op_handler));
92 }
93 
94 TEST_CASE("add fd handler", "[active_poller]")
95 {
96  int fd = 1;
97  zmq::active_poller_t active_poller;
98  CHECK_NOTHROW(
99  active_poller.add(fd, zmq::event_flags::pollin, no_op_handler));
100 }
101 
102 TEST_CASE("remove fd handler", "[active_poller]")
103 {
104  int fd = 1;
105  zmq::active_poller_t active_poller;
106  CHECK_NOTHROW(
107  active_poller.add(fd, zmq::event_flags::pollin, no_op_handler));
108  CHECK_NOTHROW(
109  active_poller.remove(fd));
110  CHECK_THROWS_ZMQ_ERROR(EINVAL, active_poller.remove(100));
111 }
112 
113 #if !defined(_WIN32)
114 // On Windows, these functions can only be used with WinSock sockets.
115 
116 TEST_CASE("mixed socket and fd handlers", "[active_poller]")
117 {
118  int pipefd[2];
119  ::pipe(pipefd);
120 
121  zmq::context_t context;
122  constexpr char inprocSocketAddress[] = "inproc://mixed-handlers";
123  zmq::socket_t socket_rcv{context, zmq::socket_type::pair};
124  zmq::socket_t socket_snd{context, zmq::socket_type::pair};
125  socket_rcv.bind(inprocSocketAddress);
126  socket_snd.connect(inprocSocketAddress);
127 
128  unsigned eventsFd = 0;
129  unsigned eventsSocket = 0;
130 
131  constexpr char messageText[] = "message";
132  constexpr size_t messageSize = sizeof(messageText);
133 
134  zmq::active_poller_t active_poller;
135  CHECK_NOTHROW(
136  active_poller.add(pipefd[0], zmq::event_flags::pollin, [&](zmq::event_flags flags) {
137  if (flags == zmq::event_flags::pollin)
138  {
139  char buffer[256];
140  CHECK(messageSize == ::read(pipefd[0], buffer, messageSize));
141  CHECK(0 == std::strcmp(buffer, messageText));
142  ++eventsFd;
143  }
144  }));
145  CHECK_NOTHROW(
146  active_poller.add(socket_rcv, zmq::event_flags::pollin, [&](zmq::event_flags flags) {
147  if (flags == zmq::event_flags::pollin)
148  {
149  zmq::message_t msg;
150  CHECK(socket_rcv.recv(msg, zmq::recv_flags::dontwait).has_value());
151  CHECK(messageSize == msg.size());
152  CHECK(0 == std::strcmp(messageText, msg.data<const char>()));
153  ++eventsSocket;
154  }
155  }));
156 
157  // send/rcv socket pair
158  zmq::message_t msg{messageText, messageSize};
159  socket_snd.send(msg, zmq::send_flags::dontwait);
160  CHECK(1 == active_poller.wait(std::chrono::milliseconds{100}));
161  CHECK(0 == eventsFd);
162  CHECK(1 == eventsSocket);
163 
164  // send/rcv pipe
165  ::write(pipefd[1], messageText, messageSize);
166  CHECK(1 == active_poller.wait(std::chrono::milliseconds{100}));
167  CHECK(1 == eventsFd);
168  CHECK(1 == eventsSocket);
169 }
170 
171 #endif // !_WIN32
172 
173 TEST_CASE("add null handler fails", "[active_poller]")
174 {
175  zmq::context_t context;
176  zmq::socket_t socket{context, zmq::socket_type::router};
177  zmq::active_poller_t active_poller;
178  zmq::active_poller_t::handler_type handler;
179  CHECK_THROWS_AS(active_poller.add(socket, zmq::event_flags::pollin, handler),
180  std::invalid_argument);
181 }
182 
183 #if ZMQ_VERSION >= ZMQ_MAKE_VERSION(4, 3, 0)
184 // this behaviour was added by https://github.com/zeromq/libzmq/pull/3100
185 TEST_CASE("add handler invalid events type", "[active_poller]")
186 {
187  zmq::context_t context;
188  zmq::socket_t socket{context, zmq::socket_type::router};
189  zmq::active_poller_t active_poller;
190  short invalid_events_type = 2 << 10;
191  CHECK_THROWS_AS(
192  active_poller.add(socket, static_cast<zmq::event_flags>(invalid_events_type),
193  no_op_handler),
194  zmq::error_t);
195  CHECK(active_poller.empty());
196  CHECK(0u == active_poller.size());
197 }
198 #endif
199 
200 TEST_CASE("add handler twice throws", "[active_poller]")
201 {
202  common_server_client_setup s;
203 
204  CHECK(s.client.send(zmq::message_t{}, zmq::send_flags::none));
205 
206  zmq::active_poller_t active_poller;
207  bool message_received = false;
208  active_poller.add(
209  s.server, zmq::event_flags::pollin,
210  [&message_received](zmq::event_flags) { message_received = true; });
212  EINVAL, active_poller.add(s.server, zmq::event_flags::pollin, no_op_handler));
213  CHECK(1 == active_poller.wait(std::chrono::milliseconds{-1}));
214  CHECK(message_received); // handler unmodified
215 }
216 
217 TEST_CASE("wait with no handlers throws", "[active_poller]")
218 {
219  zmq::active_poller_t active_poller;
221  active_poller.wait(std::chrono::milliseconds{10}));
222 }
223 
224 TEST_CASE("remove unregistered throws", "[active_poller]")
225 {
226  zmq::context_t context;
227  zmq::socket_t socket{context, zmq::socket_type::router};
228  zmq::active_poller_t active_poller;
229  CHECK_THROWS_ZMQ_ERROR(EINVAL, active_poller.remove(socket));
230 }
231 
232 TEST_CASE("remove registered empty", "[active_poller]")
233 {
234  zmq::context_t context;
235  zmq::socket_t socket{context, zmq::socket_type::router};
236  zmq::active_poller_t active_poller;
237  active_poller.add(socket, zmq::event_flags::pollin, no_op_handler);
238  CHECK_NOTHROW(active_poller.remove(socket));
239 }
240 
241 TEST_CASE("remove registered non empty", "[active_poller]")
242 {
243  zmq::context_t context;
244  zmq::socket_t socket{context, zmq::socket_type::router};
245  zmq::active_poller_t active_poller;
246  active_poller.add(socket, zmq::event_flags::pollin, no_op_handler);
247  CHECK_NOTHROW(active_poller.remove(socket));
248 }
249 
250 namespace
251 {
252 struct server_client_setup : common_server_client_setup
253 {
254  zmq::active_poller_t::handler_type handler = [&](zmq::event_flags e) {
255  events = e;
256  };
257 
258  zmq::event_flags events = zmq::event_flags::none;
259 };
260 
261 const std::string hi_str = "Hi";
262 
263 }
264 
265 TEST_CASE("poll basic", "[active_poller]")
266 {
267  server_client_setup s;
268 
269  CHECK_NOTHROW(s.client.send(zmq::message_t{hi_str}, zmq::send_flags::none));
270 
271  zmq::active_poller_t active_poller;
272  bool message_received = false;
273  zmq::active_poller_t::handler_type handler =
274  [&message_received](zmq::event_flags events) {
275  CHECK(zmq::event_flags::none != (events & zmq::event_flags::pollin));
276  message_received = true;
277  };
278  CHECK_NOTHROW(active_poller.add(s.server, zmq::event_flags::pollin, handler));
279  CHECK(1 == active_poller.wait(std::chrono::milliseconds{-1}));
280  CHECK(message_received);
281 }
282 
284 TEST_CASE("client server", "[active_poller]")
285 {
286  const std::string send_msg = hi_str;
287 
288  // Setup server and client
289  server_client_setup s;
290 
291  // Setup active_poller
292  zmq::active_poller_t active_poller;
293  zmq::event_flags events;
294  zmq::active_poller_t::handler_type handler = [&](zmq::event_flags e) {
295  if (zmq::event_flags::none != (e & zmq::event_flags::pollin)) {
296  zmq::message_t zmq_msg;
297  CHECK_NOTHROW(s.server.recv(zmq_msg)); // get message
298  std::string recv_msg(zmq_msg.data<char>(), zmq_msg.size());
299  CHECK(send_msg == recv_msg);
300  } else if (zmq::event_flags::none != (e & ~zmq::event_flags::pollout)) {
301  INFO("Unexpected event type " << static_cast<short>(events));
302  REQUIRE(false);
303  }
304  events = e;
305  };
306 
307  CHECK_NOTHROW(active_poller.add(s.server, zmq::event_flags::pollin, handler));
308 
309  // client sends message
310  CHECK_NOTHROW(s.client.send(zmq::message_t{send_msg}, zmq::send_flags::none));
311 
312  CHECK(1 == active_poller.wait(std::chrono::milliseconds{-1}));
313  CHECK(events == zmq::event_flags::pollin);
314 
315  // Re-add server socket with pollout flag
316  CHECK_NOTHROW(active_poller.remove(s.server));
317  CHECK_NOTHROW(active_poller.add(
318  s.server, zmq::event_flags::pollin | zmq::event_flags::pollout, handler));
319  CHECK(1 == active_poller.wait(std::chrono::milliseconds{-1}));
320  CHECK(events == zmq::event_flags::pollout);
321 }
322 
323 TEST_CASE("add invalid socket throws", "[active_poller]")
324 {
325  zmq::context_t context;
326  zmq::active_poller_t active_poller;
327  zmq::socket_t a{context, zmq::socket_type::router};
328  zmq::socket_t b{std::move(a)};
329  CHECK_THROWS_AS(active_poller.add(a, zmq::event_flags::pollin, no_op_handler),
330  zmq::error_t);
331 }
332 
333 TEST_CASE("remove invalid socket throws", "[active_poller]")
334 {
335  zmq::context_t context;
336  zmq::socket_t socket{context, zmq::socket_type::router};
337  zmq::active_poller_t active_poller;
338  CHECK_NOTHROW(
339  active_poller.add(socket, zmq::event_flags::pollin, no_op_handler));
340  CHECK(1u == active_poller.size());
341  std::vector<zmq::socket_t> sockets;
342  sockets.emplace_back(std::move(socket));
343  CHECK_THROWS_AS(active_poller.remove(socket), zmq::error_t);
344  CHECK(1u == active_poller.size());
345 }
346 
347 TEST_CASE("wait on added empty handler", "[active_poller]")
348 {
349  server_client_setup s;
350  CHECK_NOTHROW(s.client.send(zmq::message_t{hi_str}, zmq::send_flags::none));
351  zmq::active_poller_t active_poller;
352  CHECK_NOTHROW(
353  active_poller.add(s.server, zmq::event_flags::pollin, no_op_handler));
354  CHECK_NOTHROW(active_poller.wait(std::chrono::milliseconds{-1}));
355 }
356 
357 TEST_CASE("modify empty throws", "[active_poller]")
358 {
359  zmq::context_t context;
360  zmq::socket_t socket{context, zmq::socket_type::push};
361  zmq::active_poller_t active_poller;
362  CHECK_THROWS_AS(active_poller.modify(socket, zmq::event_flags::pollin),
363  zmq::error_t);
364 }
365 
366 TEST_CASE("modify invalid socket throws", "[active_poller]")
367 {
368  zmq::context_t context;
370  zmq::socket_t b{std::move(a)};
371  zmq::active_poller_t active_poller;
372  CHECK_THROWS_AS(active_poller.modify(a, zmq::event_flags::pollin),
373  zmq::error_t);
374 }
375 
376 TEST_CASE("modify not added throws", "[active_poller]")
377 {
378  zmq::context_t context;
381  zmq::active_poller_t active_poller;
382  CHECK_NOTHROW(active_poller.add(a, zmq::event_flags::pollin, no_op_handler));
383  CHECK_THROWS_AS(active_poller.modify(b, zmq::event_flags::pollin),
384  zmq::error_t);
385 }
386 
387 TEST_CASE("modify simple", "[active_poller]")
388 {
389  zmq::context_t context;
391  zmq::active_poller_t active_poller;
392  CHECK_NOTHROW(active_poller.add(a, zmq::event_flags::pollin, no_op_handler));
393  CHECK_NOTHROW(
394  active_poller.modify(a, zmq::event_flags::pollin | zmq::event_flags::pollout));
395 }
396 
397 TEST_CASE("poll client server", "[active_poller]")
398 {
399  // Setup server and client
400  server_client_setup s;
401 
402  // Setup active_poller
403  zmq::active_poller_t active_poller;
404  CHECK_NOTHROW(active_poller.add(s.server, zmq::event_flags::pollin, s.handler));
405 
406  // client sends message
407  CHECK_NOTHROW(s.client.send(zmq::message_t{hi_str}, zmq::send_flags::none));
408 
409  // wait for message and verify events
410  CHECK_NOTHROW(active_poller.wait(std::chrono::milliseconds{500}));
411  CHECK(s.events == zmq::event_flags::pollin);
412 
413  // Modify server socket with pollout flag
414  CHECK_NOTHROW(active_poller.modify(s.server, zmq::event_flags::pollin
415  | zmq::event_flags::pollout));
416  CHECK(1 == active_poller.wait(std::chrono::milliseconds{500}));
417  CHECK(s.events == (zmq::event_flags::pollin | zmq::event_flags::pollout));
418 }
419 
420 TEST_CASE("wait one return", "[active_poller]")
421 {
422  // Setup server and client
423  server_client_setup s;
424 
425  int count = 0;
426 
427  // Setup active_poller
428  zmq::active_poller_t active_poller;
429  CHECK_NOTHROW(active_poller.add(s.server, zmq::event_flags::pollin,
430  [&count](zmq::event_flags) { ++count; }));
431 
432  // client sends message
433  CHECK_NOTHROW(s.client.send(zmq::message_t{hi_str}, zmq::send_flags::none));
434 
435  // wait for message and verify events
436  CHECK(1 == active_poller.wait(std::chrono::milliseconds{500}));
437  CHECK(1u == count);
438 }
439 
440 TEST_CASE("wait on move constructed active_poller", "[active_poller]")
441 {
442  server_client_setup s;
443  CHECK_NOTHROW(s.client.send(zmq::message_t{hi_str}, zmq::send_flags::none));
444  zmq::active_poller_t a;
445  CHECK_NOTHROW(a.add(s.server, zmq::event_flags::pollin, no_op_handler));
446  zmq::active_poller_t b{std::move(a)};
447  CHECK(1u == b.size());
448  CHECK(0u == a.size());
449  CHECK_THROWS_ZMQ_ERROR(EFAULT, a.wait(std::chrono::milliseconds{10}));
450  CHECK(b.wait(std::chrono::milliseconds{-1}));
451 }
452 
453 TEST_CASE("wait on move assigned active_poller", "[active_poller]")
454 {
455  server_client_setup s;
456  CHECK_NOTHROW(s.client.send(zmq::message_t{hi_str}, zmq::send_flags::none));
457  zmq::active_poller_t a;
458  CHECK_NOTHROW(a.add(s.server, zmq::event_flags::pollin, no_op_handler));
459  zmq::active_poller_t b;
460  b = {std::move(a)};
461  CHECK(1u == b.size());
462  CHECK(0u == a.size());
463  CHECK_THROWS_ZMQ_ERROR(EFAULT, a.wait(std::chrono::milliseconds{10}));
464  CHECK(b.wait(std::chrono::milliseconds{-1}));
465 }
466 
467 TEST_CASE("received on move constructed active_poller", "[active_poller]")
468 {
469  // Setup server and client
470  server_client_setup s;
471  int count = 0;
472  // Setup active_poller a
473  zmq::active_poller_t a;
474  CHECK_NOTHROW(a.add(s.server, zmq::event_flags::pollin,
475  [&count](zmq::event_flags) { ++count; }));
476  // client sends message
477  CHECK_NOTHROW(s.client.send(zmq::message_t{hi_str}, zmq::send_flags::none));
478  // wait for message and verify it is received
479  CHECK(1 == a.wait(std::chrono::milliseconds{500}));
480  CHECK(1u == count);
481  // Move construct active_poller b
482  zmq::active_poller_t b{std::move(a)};
483  // client sends message again
484  CHECK_NOTHROW(s.client.send(zmq::message_t{hi_str}, zmq::send_flags::none));
485  // wait for message and verify it is received
486  CHECK(1 == b.wait(std::chrono::milliseconds{500}));
487  CHECK(2u == count);
488 }
489 
490 
491 TEST_CASE("remove from handler", "[active_poller]")
492 {
493  constexpr size_t ITER_NO = 10;
494 
495  // Setup servers and clients
496  std::vector<server_client_setup> setup_list;
497  for (size_t i = 0; i < ITER_NO; ++i)
498  setup_list.emplace_back(server_client_setup{});
499 
500  // Setup active_poller
501  zmq::active_poller_t active_poller;
502  int count = 0;
503  for (size_t i = 0; i < ITER_NO; ++i) {
504  CHECK_NOTHROW(active_poller.add(
505  setup_list[i].server, zmq::event_flags::pollin,
506  [&, i](zmq::event_flags events) {
507  CHECK(events == zmq::event_flags::pollin);
508  active_poller.remove(setup_list[ITER_NO - i - 1].server);
509  CHECK((ITER_NO - i - 1) == active_poller.size());
510  }));
511  ++count;
512  }
513  CHECK(ITER_NO == active_poller.size());
514  // Clients send messages
515  for (auto &s : setup_list) {
516  CHECK_NOTHROW(s.client.send(zmq::message_t{hi_str}, zmq::send_flags::none));
517  }
518 
519  // Wait for all servers to receive a message
520  for (auto &s : setup_list) {
521  zmq::pollitem_t items[] = {{s.server, 0, ZMQ_POLLIN, 0}};
522  zmq::poll(&items[0], 1);
523  }
524 
525  // Fire all handlers in one wait
526  CHECK(ITER_NO == active_poller.wait(std::chrono::milliseconds{-1}));
527  CHECK(ITER_NO == count);
528 }
529 
530 #endif
TEST_CASE
TEST_CASE("context construct default and destroy", "[context]")
Definition: context.cpp:9
INFO
const int INFO
Definition: log_severity.h:59
zmq::poll
int poll(zmq_pollitem_t *items_, size_t nitems_, long timeout_=-1)
Definition: zmq.hpp:319
zmq::message_t
Definition: zmq.hpp:409
EINVAL
#define EINVAL
Definition: errno.hpp:25
s
XmlRpcServer s
zmq::socket_t
Definition: zmq.hpp:2188
zmq::message_t::data
void * data() ZMQ_NOTHROW
Definition: zmq.hpp:594
string
GLsizei const GLchar *const * string
Definition: glcorearb.h:3083
zmq_pollitem_t
Definition: zmq.h:487
testutil.hpp
flags
GLbitfield flags
Definition: glcorearb.h:3585
b
GLboolean GLboolean GLboolean b
Definition: glcorearb.h:3228
ZMQ_POLLIN
#define ZMQ_POLLIN
Definition: zmq.h:482
zmq::context_t
Definition: zmq.hpp:799
CHECK_THROWS_ZMQ_ERROR
#define CHECK_THROWS_ZMQ_ERROR(ecode, expr)
Definition: cppzmq/tests/testutil.hpp:37
CHECK
#define CHECK(x)
Definition: php/ext/google/protobuf/upb.c:8393
i
int i
Definition: gmock-matchers_test.cc:764
push
static void push(tarjan *t, const upb_refcounted *r)
Definition: ruby/ext/google/protobuf_c/upb.c:5890
handler
void * handler
Definition: test_security_curve.cpp:27
value
GLsizei const GLfloat * value
Definition: glcorearb.h:3093
EFAULT
#define EFAULT
Definition: errno.hpp:17
count
GLint GLsizei count
Definition: glcorearb.h:2830
zmq::message_t::size
size_t size() const ZMQ_NOTHROW
Definition: zmq.hpp:601
a
GLboolean GLboolean GLboolean GLboolean a
Definition: glcorearb.h:3228
zmq::error_t
Definition: zmq.hpp:290
zmq_addon.hpp


libaditof
Author(s):
autogenerated on Wed May 21 2025 02:06:47