test_monitor.cpp
Go to the documentation of this file.
1 /* SPDX-License-Identifier: MPL-2.0 */
2 
3 #include "testutil.hpp"
5 
6 #include "testutil_unity.hpp"
7 
8 #include <stdlib.h>
9 #include <string.h>
10 
12 
14 {
16 
17  // Socket monitoring only works over inproc://
19  EPROTONOSUPPORT, zmq_socket_monitor (client, "tcp://127.0.0.1:*", 0));
20 
21 #ifdef ZMQ_EVENT_PIPES_STATS
22  // Stats command needs to be called on a valid socket with monitoring
23  // enabled
26 #endif
27 
29 }
30 
32 {
34 
35  // We'll monitor these two sockets
38 
39  // Monitor all events on client and server sockets
41  zmq_socket_monitor (client, "inproc://monitor-client", ZMQ_EVENT_ALL));
43  zmq_socket_monitor (server, "inproc://monitor-server", ZMQ_EVENT_ALL));
44 
45  // Create two sockets for collecting monitor events
46  void *client_mon = test_context_socket (ZMQ_PAIR);
48 
49  // Connect these to the inproc endpoints so they'll get events
51  zmq_connect (client_mon, "inproc://monitor-client"));
53  zmq_connect (server_mon, "inproc://monitor-server"));
54 
55  // Now do a basic ping test
57 
59  bounce (server, client);
60 
61  // Close client and server
62  // TODO why does this use zero_linger?
65 
66  // Now collect and check events from both sockets
67  int event = get_monitor_event (client_mon, NULL, NULL);
69  event = get_monitor_event (client_mon, NULL, NULL);
72  event = get_monitor_event (client_mon, NULL, NULL);
76  } else
78 
79  // This is the flow of server events
84  // Sometimes the server sees the client closing before it gets closed.
88  }
91  }
92  // TODO: When not waiting until the monitor stopped, the I/O thread runs
93  // into some deadlock. This must be fixed, but until it is fixed, we wait
94  // here in order to have more reliable test execution.
95  while (event != ZMQ_EVENT_MONITOR_STOPPED) {
97  }
98 
99  // Close down the sockets
100  // TODO why does this use zero_linger?
103 }
104 
105 #if (defined ZMQ_CURRENT_EVENT_VERSION && ZMQ_CURRENT_EVENT_VERSION >= 2) \
106  || (defined ZMQ_CURRENT_EVENT_VERSION \
107  && ZMQ_CURRENT_EVENT_VERSION_DRAFT >= 2)
109 {
111 
112  // Socket monitoring only works with ZMQ_PAIR, ZMQ_PUB and ZMQ_PUSH.
115  client, "inproc://invalid-socket-type", 0, 2, ZMQ_CLIENT));
116 
118 }
119 
121  const char *expected_prefix_,
122  int type_)
123 {
124  char server_endpoint[MAX_SOCKET_STRING];
125  char client_mon_endpoint[MAX_SOCKET_STRING];
126  char server_mon_endpoint[MAX_SOCKET_STRING];
127 
128  // Create a unique endpoint for each call so we don't have
129  // to wait for the sockets to unbind.
130  snprintf (client_mon_endpoint, MAX_SOCKET_STRING, "inproc://client%s%d",
131  expected_prefix_, type_);
132  snprintf (server_mon_endpoint, MAX_SOCKET_STRING, "inproc://server%s%d",
133  expected_prefix_, type_);
134 
135  // We'll monitor these two sockets
138 
139  // Monitor all events on client and server sockets
141  client, client_mon_endpoint, ZMQ_EVENT_ALL_V2, 2, type_));
143  server, server_mon_endpoint, ZMQ_EVENT_ALL_V2, 2, type_));
144 
145  // Choose the appropriate consumer socket type.
146  int mon_type = ZMQ_PAIR;
147  switch (type_) {
148  case ZMQ_PAIR:
149  mon_type = ZMQ_PAIR;
150  break;
151  case ZMQ_PUSH:
152  mon_type = ZMQ_PULL;
153  break;
154  case ZMQ_PUB:
155  mon_type = ZMQ_SUB;
156  break;
157  }
158 
159  // Create two sockets for collecting monitor events
160  void *client_mon = test_context_socket (mon_type);
161  void *server_mon = test_context_socket (mon_type);
162 
163  // Additionally subscribe to all events if a PUB socket is used.
164  if (type_ == ZMQ_PUB) {
166  zmq_setsockopt (client_mon, ZMQ_SUBSCRIBE, "", 0));
169  }
170 
171  // Connect these to the inproc endpoints so they'll get events
172  TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (client_mon, client_mon_endpoint));
173  TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (server_mon, server_mon_endpoint));
174 
175  // Now do a basic ping test
176  bind_function_ (server, server_endpoint, sizeof server_endpoint);
177 
178  int ipv6;
179  size_t ipv6_size = sizeof (ipv6);
181  zmq_getsockopt (server, ZMQ_IPV6, &ipv6, &ipv6_size));
183  zmq_setsockopt (client, ZMQ_IPV6, &ipv6, sizeof (int)));
184  TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (client, server_endpoint));
185  bounce (server, client);
186 
187  // Close client and server
188  // TODO why does this use zero_linger?
191 
192  char *client_local_address = NULL;
193  char *client_remote_address = NULL;
194 
195  // Now collect and check events from both sockets
196  int64_t event = get_monitor_event_v2 (
197  client_mon, NULL, &client_local_address, &client_remote_address);
199  free (client_local_address);
200  free (client_remote_address);
201  event = get_monitor_event_v2 (client_mon, NULL, &client_local_address,
202  &client_remote_address);
203  }
205  TEST_ASSERT_EQUAL_STRING (server_endpoint, client_remote_address);
206  TEST_ASSERT_EQUAL_STRING_LEN (expected_prefix_, client_local_address,
207  strlen (expected_prefix_));
209  0, strcmp (client_local_address, client_remote_address));
210 
212  client_local_address, client_remote_address);
213  event = get_monitor_event_v2 (client_mon, NULL, NULL, NULL);
214  if (event == ZMQ_EVENT_DISCONNECTED) {
216  client_local_address, client_remote_address);
218  } else
220 
221  // This is the flow of server events
223  client_remote_address, "");
225  client_remote_address, client_local_address);
227  client_remote_address, client_local_address);
229  // Sometimes the server sees the client closing before it gets closed.
230  if (event != ZMQ_EVENT_DISCONNECTED) {
233  }
234  if (event != ZMQ_EVENT_DISCONNECTED) {
236  }
237  // TODO: When not waiting until the monitor stopped, the I/O thread runs
238  // into some deadlock. This must be fixed, but until it is fixed, we wait
239  // here in order to have more reliable test execution.
240  while (event != ZMQ_EVENT_MONITOR_STOPPED) {
242  }
243  free (client_local_address);
244  free (client_remote_address);
245 
246  // Close down the sockets
247  // TODO why does this use zero_linger?
250 }
251 
253 {
254  static const char prefix[] = "tcp://127.0.0.1:";
258 }
259 
261 {
262  static const char prefix[] = "tcp://[::1]:";
266 }
267 
269 {
270  static const char prefix[] = "ipc://";
274 }
275 
277 {
278  static const char prefix[] = "tipc://";
282 }
283 
284 #ifdef ZMQ_EVENT_PIPES_STATS
286  const char *expected_prefix_)
287 {
288  char server_endpoint[MAX_SOCKET_STRING];
289  const int pulls_count = 4;
290  void *pulls[pulls_count];
291 
292  // We'll monitor these two sockets
294 
296  push, "inproc://monitor-push", ZMQ_EVENT_PIPES_STATS, 2, ZMQ_PAIR));
297 
298  // Should fail if there are no pipes to monitor
300 
301  void *push_mon = test_context_socket (ZMQ_PAIR);
302 
303  TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (push_mon, "inproc://monitor-push"));
304 
305  // Set lower HWM - queues will be filled so we should see it in the stats
306  int send_hwm = 500;
308  zmq_setsockopt (push, ZMQ_SNDHWM, &send_hwm, sizeof (send_hwm)));
309  // Set very low TCP buffers so that messages cannot be stored in-flight
310  const int tcp_buffer_size = 4096;
312  push, ZMQ_SNDBUF, &tcp_buffer_size, sizeof (tcp_buffer_size)));
313  bind_function_ (push, server_endpoint, sizeof (server_endpoint));
314 
315  int ipv6;
316  size_t ipv6_size = sizeof (ipv6);
318  zmq_getsockopt (push, ZMQ_IPV6, &ipv6, &ipv6_size));
319  for (int i = 0; i < pulls_count; ++i) {
320  pulls[i] = test_context_socket (ZMQ_PULL);
322  zmq_setsockopt (pulls[i], ZMQ_IPV6, &ipv6, sizeof (int)));
323  int timeout_ms = 10;
325  pulls[i], ZMQ_RCVTIMEO, &timeout_ms, sizeof (timeout_ms)));
327  zmq_setsockopt (pulls[i], ZMQ_RCVHWM, &send_hwm, sizeof (send_hwm)));
329  pulls[i], ZMQ_RCVBUF, &tcp_buffer_size, sizeof (tcp_buffer_size)));
330  TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (pulls[i], server_endpoint));
331  }
332 
333  // Send until we block
334  int send_count = 0;
335  // Saturate the TCP buffers too
336  char data[tcp_buffer_size * 2];
337  memset (data, 0, sizeof (data));
338  // Saturate all pipes - send + receive - on all connections
339  while (send_count < send_hwm * 2 * pulls_count) {
340  TEST_ASSERT_EQUAL_INT (sizeof (data),
341  zmq_send (push, data, sizeof (data), 0));
342  ++send_count;
343  }
344 
345  // Drain one of the pulls - doesn't matter how many messages, at least one
346  send_count = send_count / 4;
347  do {
348  zmq_recv (pulls[0], data, sizeof (data), 0);
349  --send_count;
350  } while (send_count > 0);
351 
352  // To kick the application thread, do a dummy getsockopt - users here
353  // should use the monitor and the other sockets in a poll.
354  unsigned long int dummy;
355  size_t dummy_size = sizeof (dummy);
357  // Note that the pipe stats on the sender will not get updated until the
358  // receiver has processed at least lwm ((hwm + 1) / 2) messages AND until
359  // the application thread has ran through the mailbox, as the update is
360  // delivered via a message (send_activate_write)
361  zmq_getsockopt (push, ZMQ_EVENTS, &dummy, &dummy_size);
362 
363  // Ask for stats and check that they match
365 
367  zmq_getsockopt (push, ZMQ_EVENTS, &dummy, &dummy_size);
368 
369  for (int i = 0; i < pulls_count; ++i) {
370  char *push_local_address = NULL;
371  char *push_remote_address = NULL;
372  uint64_t *queue_stat = NULL;
373  int64_t event = get_monitor_event_v2 (
374  push_mon, &queue_stat, &push_local_address, &push_remote_address);
375  TEST_ASSERT_EQUAL_STRING (server_endpoint, push_local_address);
376  TEST_ASSERT_EQUAL_STRING_LEN (expected_prefix_, push_remote_address,
377  strlen (expected_prefix_));
379  TEST_ASSERT_NOT_NULL (queue_stat);
380  TEST_ASSERT_EQUAL_INT (i == 0 ? 0 : send_hwm, queue_stat[0]);
381  TEST_ASSERT_EQUAL_INT (0, queue_stat[1]);
382  free (push_local_address);
383  free (push_remote_address);
384  free (queue_stat);
385  }
386 
387  // Close client and server
390  for (int i = 0; i < pulls_count; ++i)
392 }
393 
395 {
396  static const char prefix[] = "tcp://127.0.0.1:";
398 }
399 
401 {
402  static const char prefix[] = "tcp://[::1]:";
404 }
405 
407 {
408  static const char prefix[] = "ipc://";
410 }
411 #endif // ZMQ_EVENT_PIPES_STATS
412 #endif
413 
414 int main ()
415 {
417 
418  UNITY_BEGIN ();
421 
422 #if (defined ZMQ_CURRENT_EVENT_VERSION && ZMQ_CURRENT_EVENT_VERSION >= 2) \
423  || (defined ZMQ_CURRENT_EVENT_VERSION \
424  && ZMQ_CURRENT_EVENT_VERSION_DRAFT >= 2)
430 #ifdef ZMQ_EVENT_PIPES_STATS
434 #endif
435 #endif
436 
437  return UNITY_END ();
438 }
ZMQ_RCVBUF
#define ZMQ_RCVBUF
Definition: zmq.h:283
bounce
static void bounce(void *socket_)
Definition: test_req_relaxed.cpp:50
bind_loopback_tipc
void bind_loopback_tipc(void *socket_, char *my_endpoint_, size_t len_)
Definition: testutil_unity.cpp:265
TEST_ASSERT_EQUAL_STRING
#define TEST_ASSERT_EQUAL_STRING(expected, actual)
Definition: unity.h:235
test_monitor_versioned_stats
void test_monitor_versioned_stats(bind_function_t bind_function_, const char *expected_prefix_)
Definition: test_monitor.cpp:285
test_monitor_invalid_protocol_fails
SETUP_TEARDOWN_TESTCONTEXT void test_monitor_invalid_protocol_fails()
Definition: test_monitor.cpp:13
TEST_ASSERT_EQUAL_STRING_LEN
#define TEST_ASSERT_EQUAL_STRING_LEN(expected, actual, len)
Definition: unity.h:236
ZMQ_EVENT_ALL_V2
#define ZMQ_EVENT_ALL_V2
Definition: zmq_draft.h:156
NULL
NULL
Definition: test_security_zap.cpp:405
UNITY_END
return UNITY_END()
ZMQ_PUB
#define ZMQ_PUB
Definition: zmq.h:259
ZMQ_EVENTS
#define ZMQ_EVENTS
Definition: zmq.h:286
EINVAL
#define EINVAL
Definition: errno.hpp:25
msleep
void msleep(int milliseconds_)
Definition: testutil.cpp:227
zmq_socket_monitor
ZMQ_EXPORT int zmq_socket_monitor(void *s_, const char *addr_, int events_)
Definition: zmq.cpp:278
EAGAIN
#define EAGAIN
Definition: errno.hpp:14
ZMQ_CLIENT
#define ZMQ_CLIENT
Definition: zmq_draft.h:15
ZMQ_EVENT_CLOSED
#define ZMQ_EVENT_CLOSED
Definition: zmq.h:408
RUN_TEST
#define RUN_TEST(func)
Definition: unity_internals.h:615
test_monitor_versioned_basic_tcp_ipv4
void test_monitor_versioned_basic_tcp_ipv4()
Definition: test_monitor.cpp:252
test_monitor_versioned_invalid_socket_type
void test_monitor_versioned_invalid_socket_type()
Definition: test_monitor.cpp:108
test_monitor_versioned_stats_ipc
void test_monitor_versioned_stats_ipc()
Definition: test_monitor.cpp:406
SETUP_TEARDOWN_TESTCONTEXT
#define SETUP_TEARDOWN_TESTCONTEXT
Definition: testutil_unity.hpp:172
test_monitor_versioned_stats_tcp_ipv4
void test_monitor_versioned_stats_tcp_ipv4()
Definition: test_monitor.cpp:394
ZMQ_SUBSCRIBE
#define ZMQ_SUBSCRIBE
Definition: zmq.h:278
bind_loopback_ipv4
void bind_loopback_ipv4(void *socket_, char *my_endpoint_, size_t len_)
Definition: testutil_unity.cpp:246
client
void client(int num)
Definition: test_multithread.cpp:134
ZMQ_EVENT_PIPES_STATS
#define ZMQ_EVENT_PIPES_STATS
Definition: zmq_draft.h:150
dummy
ReturnVal dummy
Definition: register_benchmark_test.cc:68
ZMQ_SUB
#define ZMQ_SUB
Definition: zmq.h:260
test_context_socket_close_zero_linger
void * test_context_socket_close_zero_linger(void *socket_)
Definition: testutil_unity.cpp:215
zmq_connect
ZMQ_EXPORT int zmq_connect(void *s_, const char *addr_)
Definition: zmq.cpp:307
TEST_ASSERT_NOT_EQUAL
#define TEST_ASSERT_NOT_EQUAL(expected, actual)
Definition: unity.h:134
ZMQ_EVENT_LISTENING
#define ZMQ_EVENT_LISTENING
Definition: zmq.h:404
test_monitor_basic
void test_monitor_basic()
Definition: test_monitor.cpp:31
testutil_unity.hpp
ZMQ_EVENT_HANDSHAKE_SUCCEEDED
#define ZMQ_EVENT_HANDSHAKE_SUCCEEDED
Definition: zmq.h:417
snprintf
int snprintf(char *str, size_t size, const char *format,...)
Definition: port.cc:64
ZMQ_RCVHWM
#define ZMQ_RCVHWM
Definition: zmq.h:294
ZMQ_DEALER
#define ZMQ_DEALER
Definition: zmq.h:263
bind_loopback_ipc
void bind_loopback_ipc(void *socket_, char *my_endpoint_, size_t len_)
Definition: testutil_unity.cpp:256
zmq_setsockopt
ZMQ_EXPORT int zmq_setsockopt(void *s_, int option_, const void *optval_, size_t optvallen_)
Definition: zmq.cpp:250
ZMQ_EVENT_CONNECTED
#define ZMQ_EVENT_CONNECTED
Definition: zmq.h:401
event
struct _cl_event * event
Definition: glcorearb.h:4163
prefix
static const char prefix[]
Definition: test_pair_ipc.cpp:26
ZMQ_PUSH
#define ZMQ_PUSH
Definition: zmq.h:266
testutil.hpp
get_monitor_event_v2
int64_t get_monitor_event_v2(void *monitor_, uint64_t **value_, char **local_address_, char **remote_address_)
Definition: testutil_monitoring.cpp:280
bind_function_t
void(* bind_function_t)(void *socket_, char *my_endpoint_, size_t len_)
Definition: testutil_unity.hpp:236
test_monitor_versioned_basic_tipc
void test_monitor_versioned_basic_tipc()
Definition: test_monitor.cpp:276
my_endpoint
char my_endpoint[MAX_SOCKET_STRING]
Definition: test_security_curve.cpp:31
MAX_SOCKET_STRING
#define MAX_SOCKET_STRING
Definition: libzmq/tests/testutil.hpp:35
zmq_socket_monitor_versioned
int zmq_socket_monitor_versioned(void *s_, const char *addr_, uint64_t events_, int event_version_, int type_)
Definition: zmq.cpp:269
expect_monitor_event_v2
void expect_monitor_event_v2(void *monitor_, int64_t expected_event_, const char *expected_local_address_, const char *expected_remote_address_)
Definition: testutil_monitoring.cpp:289
ENOTSOCK
#define ENOTSOCK
Definition: zmq.h:128
test_context_socket
void * test_context_socket(int type_)
Definition: testutil_unity.cpp:200
SETTLE_TIME
#define SETTLE_TIME
Definition: libzmq/tests/testutil.hpp:31
server
void * server
Definition: test_security_curve.cpp:29
TEST_ASSERT_EQUAL
#define TEST_ASSERT_EQUAL(expected, actual)
Definition: unity.h:133
bind_loopback_ipv6
void bind_loopback_ipv6(void *socket_, char *my_endpoint_, size_t len_)
Definition: testutil_unity.cpp:251
TEST_ASSERT_EQUAL_INT
#define TEST_ASSERT_EQUAL_INT(expected, actual)
Definition: unity.h:128
test_monitor_versioned_basic
void test_monitor_versioned_basic(bind_function_t bind_function_, const char *expected_prefix_, int type_)
Definition: test_monitor.cpp:120
EPROTONOSUPPORT
#define EPROTONOSUPPORT
Definition: zmq.h:107
i
int i
Definition: gmock-matchers_test.cc:764
zmq_socket_monitor_pipes_stats
int zmq_socket_monitor_pipes_stats(void *s_)
Definition: zmq.cpp:1809
test_monitor_versioned_basic_tcp_ipv6
void test_monitor_versioned_basic_tcp_ipv6()
Definition: test_monitor.cpp:260
expect_monitor_event
void expect_monitor_event(void *monitor_, int expected_event_)
Definition: testutil_monitoring.cpp:96
ZMQ_SNDHWM
#define ZMQ_SNDHWM
Definition: zmq.h:293
ZMQ_EVENT_CONNECT_DELAYED
#define ZMQ_EVENT_CONNECT_DELAYED
Definition: zmq.h:402
test_monitor_versioned_stats_tcp_ipv6
void test_monitor_versioned_stats_tcp_ipv6()
Definition: test_monitor.cpp:400
server_mon
void * server_mon
Definition: test_security_curve.cpp:30
ZMQ_PAIR
#define ZMQ_PAIR
Definition: zmq.h:258
ZMQ_RCVTIMEO
#define ZMQ_RCVTIMEO
Definition: zmq.h:296
zmq_recv
ZMQ_EXPORT int zmq_recv(void *s_, void *buf_, size_t len_, int flags_)
Definition: zmq.cpp:487
ZMQ_EVENT_ACCEPTED
#define ZMQ_EVENT_ACCEPTED
Definition: zmq.h:406
ZMQ_EVENT_CONNECT_RETRIED
#define ZMQ_EVENT_CONNECT_RETRIED
Definition: zmq.h:403
ZMQ_EVENT_ALL
#define ZMQ_EVENT_ALL
Definition: zmq.h:412
ZMQ_SNDBUF
#define ZMQ_SNDBUF
Definition: zmq.h:282
ZMQ_EVENT_DISCONNECTED
#define ZMQ_EVENT_DISCONNECTED
Definition: zmq.h:410
testutil_monitoring.hpp
get_monitor_event
static SETUP_TEARDOWN_TESTCONTEXT int get_monitor_event(void *monitor_)
Definition: test_heartbeats.cpp:34
ZMQ_IPV6
#define ZMQ_IPV6
Definition: zmq.h:307
setup_test_environment
void setup_test_environment(int timeout_seconds_)
Definition: testutil.cpp:201
main
int main()
Definition: test_monitor.cpp:414
UNITY_BEGIN
UNITY_BEGIN()
push
static void push(tarjan *t, const upb_refcounted *r)
Definition: ruby/ext/google/protobuf_c/upb.c:5890
TEST_ASSERT_FAILURE_ERRNO
#define TEST_ASSERT_FAILURE_ERRNO(error_code, expr)
Definition: testutil_unity.hpp:95
data
GLint GLenum GLsizei GLsizei GLsizei GLint GLsizei const GLvoid * data
Definition: glcorearb.h:2879
zmq_send
ZMQ_EXPORT int zmq_send(void *s_, const void *buf_, size_t len_, int flags_)
Definition: zmq.cpp:377
ZMQ_PULL
#define ZMQ_PULL
Definition: zmq.h:265
TEST_ASSERT_NOT_NULL
#define TEST_ASSERT_NOT_NULL(pointer)
Definition: unity.h:125
TEST_ASSERT_SUCCESS_ERRNO
#define TEST_ASSERT_SUCCESS_ERRNO(expr)
Definition: proxy_thr.cpp:47
zmq_getsockopt
ZMQ_EXPORT int zmq_getsockopt(void *s_, int option_, void *optval_, size_t *optvallen_)
Definition: zmq.cpp:261
ZMQ_EVENT_MONITOR_STOPPED
#define ZMQ_EVENT_MONITOR_STOPPED
Definition: zmq.h:411
test_monitor_versioned_basic_ipc
void test_monitor_versioned_basic_ipc()
Definition: test_monitor.cpp:268


libaditof
Author(s):
autogenerated on Wed May 21 2025 02:06:59