test_spec_pushpull.cpp
Go to the documentation of this file.
1 /* SPDX-License-Identifier: MPL-2.0 */
2 
3 #include "testutil.hpp"
4 #include "testutil_unity.hpp"
5 
6 #include <stdlib.h>
7 #include <string.h>
8 
10 
12 
13 // PUSH: SHALL route outgoing messages to connected peers using a
14 // round-robin strategy.
15 void test_push_round_robin_out (const char *bind_address_)
16 {
18 
19  TEST_ASSERT_SUCCESS_ERRNO (zmq_bind (push, bind_address_));
20  size_t len = MAX_SOCKET_STRING;
23 
24  const size_t services = 5;
25  void *pulls[services];
26  for (size_t peer = 0; peer < services; ++peer) {
27  pulls[peer] = test_context_socket (ZMQ_PULL);
28 
30  }
31 
32  // Wait for connections.
34 
35  // Send 2N messages
36  for (size_t peer = 0; peer < services; ++peer)
37  s_send_seq (push, "ABC", SEQ_END);
38  for (size_t peer = 0; peer < services; ++peer)
39  s_send_seq (push, "DEF", SEQ_END);
40 
41  // Expect every PULL got one of each
42  for (size_t peer = 0; peer < services; ++peer) {
43  s_recv_seq (pulls[peer], "ABC", SEQ_END);
44  s_recv_seq (pulls[peer], "DEF", SEQ_END);
45  }
46 
48 
49  for (size_t peer = 0; peer < services; ++peer)
51 }
52 
53 // PULL: SHALL receive incoming messages from its peers using a fair-queuing
54 // strategy.
55 void test_pull_fair_queue_in (const char *bind_address_)
56 {
57  void *pull = test_context_socket (ZMQ_PULL);
58 
59  TEST_ASSERT_SUCCESS_ERRNO (zmq_bind (pull, bind_address_));
60  size_t len = MAX_SOCKET_STRING;
63 
64  const unsigned char services = 5;
65  void *pushs[services];
66  for (unsigned char peer = 0; peer < services; ++peer) {
67  pushs[peer] = test_context_socket (ZMQ_PUSH);
68 
70  }
71 
72  // Wait for connections.
74 
75  int first_half = 0;
76  int second_half = 0;
77 
78  // Send 2N messages
79  for (unsigned char peer = 0; peer < services; ++peer) {
80  char *str = strdup ("A");
81 
82  str[0] += peer;
83  s_send_seq (pushs[peer], str, SEQ_END);
84  first_half += str[0];
85 
86  str[0] += services;
87  s_send_seq (pushs[peer], str, SEQ_END);
88  second_half += str[0];
89 
90  free (str);
91  }
92 
93  // Wait for data.
95 
96  zmq_msg_t msg;
98 
99  // Expect to pull one from each first
100  for (size_t peer = 0; peer < services; ++peer) {
102  2, TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_recv (&msg, pull, 0)));
103  const char *str = static_cast<const char *> (zmq_msg_data (&msg));
104  first_half -= str[0];
105  }
106  TEST_ASSERT_EQUAL_INT (0, first_half);
107 
108  // And then get the second batch
109  for (size_t peer = 0; peer < services; ++peer) {
111  2, TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_recv (&msg, pull, 0)));
112  const char *str = static_cast<const char *> (zmq_msg_data (&msg));
113  second_half -= str[0];
114  }
115  TEST_ASSERT_EQUAL_INT (0, second_half);
116 
118 
120 
121  for (size_t peer = 0; peer < services; ++peer)
123 }
124 
125 // PUSH: SHALL block on sending, or return a suitable error, when it has no
126 // available peers.
127 void test_push_block_on_send_no_peers (const char *bind_address_)
128 {
129  LIBZMQ_UNUSED (bind_address_);
130  void *sc = test_context_socket (ZMQ_PUSH);
131 
132  int timeout = 250;
134  zmq_setsockopt (sc, ZMQ_SNDTIMEO, &timeout, sizeof (timeout)));
135 
138 
140 }
141 
142 // PUSH and PULL: SHALL create this queue when a peer connects to it. If
143 // this peer disconnects, the socket SHALL destroy its queue and SHALL
144 // discard any messages it contains.
145 void test_destroy_queue_on_disconnect (const char *bind_address_)
146 {
147  void *a = test_context_socket (ZMQ_PUSH);
148 
149  int hwm = 1;
151  zmq_setsockopt (a, ZMQ_SNDHWM, &hwm, sizeof (hwm)));
152 
153  TEST_ASSERT_SUCCESS_ERRNO (zmq_bind (a, bind_address_));
154  size_t len = MAX_SOCKET_STRING;
157 
158  void *b = test_context_socket (ZMQ_PULL);
159 
161  zmq_setsockopt (b, ZMQ_RCVHWM, &hwm, sizeof (hwm)));
162 
164 
165  // Send two messages, one should be stuck in A's outgoing queue, the other
166  // arrives at B.
167  s_send_seq (a, "ABC", SEQ_END);
168  s_send_seq (a, "DEF", SEQ_END);
169 
170  // Both queues should now be full, indicated by A blocking on send.
172 
174 
175  // Disconnect may take time and need command processing.
176  zmq_pollitem_t poller[2] = {{a, 0, 0, 0}, {b, 0, 0, 0}};
178  0, TEST_ASSERT_SUCCESS_ERRNO (zmq_poll (poller, 2, 100)));
180  0, TEST_ASSERT_SUCCESS_ERRNO (zmq_poll (poller, 2, 100)));
181 
182  zmq_msg_t msg;
184 
185  // Can't receive old data on B.
187 
188  // Sending fails.
190 
191  // Reconnect B
193 
194  // Still can't receive old data on B.
196 
197  // two messages should be sendable before the queues are filled up.
198  s_send_seq (a, "ABC", SEQ_END);
199  s_send_seq (a, "DEF", SEQ_END);
200 
202 
204 
207 }
208 
209 // PUSH and PULL: SHALL either receive or drop multipart messages atomically.
210 void test_push_multipart_atomic_drop (const char *bind_address_,
211  const bool block_)
212 {
213  int linger = 0;
214  int hwm = 1;
215 
218  zmq_setsockopt (push, ZMQ_LINGER, &linger, sizeof (linger)));
220  zmq_setsockopt (push, ZMQ_SNDHWM, &hwm, sizeof (hwm)));
221  TEST_ASSERT_SUCCESS_ERRNO (zmq_bind (push, bind_address_));
222  size_t addr_len = MAX_SOCKET_STRING;
225 
226  void *pull = test_context_socket (ZMQ_PULL);
228  zmq_setsockopt (pull, ZMQ_LINGER, &linger, sizeof (linger)));
230  zmq_setsockopt (pull, ZMQ_RCVHWM, &hwm, sizeof (hwm)));
232 
233  // Wait for connections.
235 
236  int rc;
237  zmq_msg_t msg_data;
238  // A large message is needed to overrun the TCP buffers
239  const size_t len = 16 * 1024 * 1024;
240  size_t zmq_events_size = sizeof (int);
241  int zmq_events;
242 
243  // Normal case - exercise the queues
247  memset (zmq_msg_data (&msg_data), 'a', len);
248  TEST_ASSERT_EQUAL_INT (len, zmq_msg_send (&msg_data, push, 0));
249 
250  recv_string_expect_success (pull, "0", 0);
251  recv_string_expect_success (pull, "0", 0);
252  zmq_msg_init (&msg_data);
253  TEST_ASSERT_EQUAL_INT (len, zmq_msg_recv (&msg_data, pull, 0));
254  zmq_msg_close (&msg_data);
255 
256  // Fill the HWMs of sender and receiver, one message each
258 
262  memset (zmq_msg_data (&msg_data), 'b', len);
263  TEST_ASSERT_EQUAL_INT (len, zmq_msg_send (&msg_data, push, 0));
264 
265  // Disconnect and simulate a poll (doesn't work on Windows) to
266  // let the commands run and let the pipes start to be deallocated
268 
269  zmq_getsockopt (push, ZMQ_EVENTS, &zmq_events, &zmq_events_size);
270  zmq_getsockopt (pull, ZMQ_EVENTS, &zmq_events, &zmq_events_size);
272  zmq_getsockopt (push, ZMQ_EVENTS, &zmq_events, &zmq_events_size);
273  zmq_getsockopt (pull, ZMQ_EVENTS, &zmq_events, &zmq_events_size);
274 
275  // Reconnect and immediately push a large message into the pipe,
276  // if the problem is reproduced the pipe is in the process of being
277  // terminated but still exists (state term_ack_sent) and had already
278  // accepted the frame, so with the first frames already gone and
279  // unreachable only the last is left, and is stuck in the lb.
281 
285  memset (zmq_msg_data (&msg_data), 'c', len);
286  if (block_) {
288  zmq_msg_send (&msg_data, push, ZMQ_SNDMORE));
289  } else {
290  rc = zmq_msg_send (&msg_data, push, ZMQ_SNDMORE | ZMQ_DONTWAIT);
291  // inproc won't fail, much faster to connect/disconnect pipes than TCP
292  if (rc == -1) {
293  // at this point the new pipe is there and it works
297  zmq_msg_send (&msg_data, push, ZMQ_SNDMORE));
298  }
299  }
300  send_string_expect_success (push, "3b", 0);
301 
302  zmq_getsockopt (push, ZMQ_EVENTS, &zmq_events, &zmq_events_size);
303  zmq_getsockopt (pull, ZMQ_EVENTS, &zmq_events, &zmq_events_size);
305  zmq_getsockopt (push, ZMQ_EVENTS, &zmq_events, &zmq_events_size);
306  zmq_getsockopt (pull, ZMQ_EVENTS, &zmq_events, &zmq_events_size);
307 
311  memset (zmq_msg_data (&msg_data), 'd', len);
312  TEST_ASSERT_EQUAL_INT (len, zmq_msg_send (&msg_data, push, 0));
313 
314  // On very slow machines the message will not be lost, as it will
315  // be sent when the new pipe is already in place, so avoid failing
316  // and simply carry on as it would be very noisy otherwise.
317  // Receive both to avoid leaking metadata.
318  // If only the "5" message is received, the problem is reproduced, and
319  // without the fix the first message received would be the last large
320  // frame of "3".
321  char buffer[2];
322  rc =
323  TEST_ASSERT_SUCCESS_ERRNO (zmq_recv (pull, buffer, sizeof (buffer), 0));
324  TEST_ASSERT_EQUAL_INT (1, rc);
325  TEST_ASSERT_TRUE (buffer[0] == '3' || buffer[0] == '5');
326  if (buffer[0] == '3') {
327  recv_string_expect_success (pull, "3", 0);
328  zmq_msg_init (&msg_data);
329  TEST_ASSERT_EQUAL_INT (len, zmq_msg_recv (&msg_data, pull, 0));
330  zmq_msg_close (&msg_data);
331  recv_string_expect_success (pull, "3b", 0);
332  recv_string_expect_success (pull, "5", 0);
333  }
334  recv_string_expect_success (pull, "5", 0);
335  zmq_msg_init (&msg_data);
336  TEST_ASSERT_EQUAL_INT (len, zmq_msg_recv (&msg_data, pull, 0));
337  zmq_msg_close (&msg_data);
338 
341 }
342 
343 #define def_test_spec_pushpull(name, bind_address_) \
344  void test_spec_pushpull_##name##_push_round_robin_out () \
345  { \
346  test_push_round_robin_out (bind_address_); \
347  } \
348  void test_spec_pushpull_##name##_pull_fair_queue_in () \
349  { \
350  test_pull_fair_queue_in (bind_address_); \
351  } \
352  void test_spec_pushpull_##name##_push_block_on_send_no_peers () \
353  { \
354  test_push_block_on_send_no_peers (bind_address_); \
355  } \
356  void test_spec_pushpull_##name##_destroy_queue_on_disconnect () \
357  { \
358  test_destroy_queue_on_disconnect (bind_address_); \
359  } \
360  void test_spec_pushpull_##name##_push_multipart_atomic_drop_block () \
361  { \
362  test_push_multipart_atomic_drop (bind_address_, true); \
363  } \
364  void test_spec_pushpull_##name##_push_multipart_atomic_drop_non_block () \
365  { \
366  test_push_multipart_atomic_drop (bind_address_, false); \
367  }
368 
369 def_test_spec_pushpull (inproc, "inproc://a")
370 
371  def_test_spec_pushpull (tcp, "tcp://127.0.0.1:*")
372 
373  int main ()
374 {
376 
377  UNITY_BEGIN ();
378  RUN_TEST (test_spec_pushpull_inproc_push_round_robin_out);
379  RUN_TEST (test_spec_pushpull_tcp_push_round_robin_out);
380  RUN_TEST (test_spec_pushpull_inproc_pull_fair_queue_in);
381  RUN_TEST (test_spec_pushpull_tcp_pull_fair_queue_in);
382  RUN_TEST (test_spec_pushpull_inproc_push_block_on_send_no_peers);
383  RUN_TEST (test_spec_pushpull_tcp_push_block_on_send_no_peers);
384  // TODO Tests disabled until libzmq does this properly
385  //RUN_TEST (test_spec_pushpull_inproc_destroy_queue_on_disconnect);
386  //RUN_TEST (test_spec_pushpull_tcp_destroy_queue_on_disconnect);
387  RUN_TEST (test_spec_pushpull_inproc_push_multipart_atomic_drop_block);
388  RUN_TEST (test_spec_pushpull_inproc_push_multipart_atomic_drop_non_block);
389  RUN_TEST (test_spec_pushpull_tcp_push_multipart_atomic_drop_block);
390  RUN_TEST (test_spec_pushpull_tcp_push_multipart_atomic_drop_non_block);
391  return UNITY_END ();
392 }
test_push_multipart_atomic_drop
void test_push_multipart_atomic_drop(const char *bind_address_, const bool block_)
Definition: test_spec_pushpull.cpp:210
connect_address
SETUP_TEARDOWN_TESTCONTEXT char connect_address[MAX_SOCKET_STRING]
Definition: test_spec_pushpull.cpp:11
UNITY_END
return UNITY_END()
ZMQ_EVENTS
#define ZMQ_EVENTS
Definition: zmq.h:286
msleep
void msleep(int milliseconds_)
Definition: testutil.cpp:227
zmq::protocol_name::tcp
static const char tcp[]
Definition: address.hpp:38
TEST_ASSERT_TRUE
#define TEST_ASSERT_TRUE(condition)
Definition: unity.h:121
test_push_round_robin_out
void test_push_round_robin_out(const char *bind_address_)
Definition: test_spec_pushpull.cpp:15
zmq_msg_send
ZMQ_EXPORT int zmq_msg_send(zmq_msg_t *msg_, void *s_, int flags_)
Definition: zmq.cpp:609
zmq_poll
ZMQ_EXPORT int zmq_poll(zmq_pollitem_t *items_, int nitems_, long timeout_)
Definition: zmq.cpp:827
EAGAIN
#define EAGAIN
Definition: errno.hpp:14
ZMQ_LAST_ENDPOINT
#define ZMQ_LAST_ENDPOINT
Definition: zmq.h:298
SETUP_TEARDOWN_TESTCONTEXT
#define SETUP_TEARDOWN_TESTCONTEXT
Definition: testutil_unity.hpp:172
zmq_pollitem_t
Definition: zmq.h:487
test_pull_fair_queue_in
void test_pull_fair_queue_in(const char *bind_address_)
Definition: test_spec_pushpull.cpp:55
services
const size_t services
Definition: test_req_relaxed.cpp:8
test_context_socket_close_zero_linger
void * test_context_socket_close_zero_linger(void *socket_)
Definition: testutil_unity.cpp:215
zmq_msg_data
ZMQ_EXPORT void * zmq_msg_data(zmq_msg_t *msg_)
Definition: zmq.cpp:642
zmq_connect
ZMQ_EXPORT int zmq_connect(void *s_, const char *addr_)
Definition: zmq.cpp:307
b
GLboolean GLboolean GLboolean b
Definition: glcorearb.h:3228
testutil_unity.hpp
ZMQ_RCVHWM
#define ZMQ_RCVHWM
Definition: zmq.h:294
zmq_setsockopt
ZMQ_EXPORT int zmq_setsockopt(void *s_, int option_, const void *optval_, size_t optvallen_)
Definition: zmq.cpp:250
ZMQ_SNDTIMEO
#define ZMQ_SNDTIMEO
Definition: zmq.h:297
zmq_disconnect
ZMQ_EXPORT int zmq_disconnect(void *s_, const char *addr_)
Definition: zmq.cpp:345
LIBZMQ_UNUSED
#define LIBZMQ_UNUSED(object)
Definition: macros.hpp:6
ZMQ_PUSH
#define ZMQ_PUSH
Definition: zmq.h:266
testutil.hpp
update_failure_list.str
str
Definition: update_failure_list.py:41
zmq_msg_t
Definition: zmq.h:218
MAX_SOCKET_STRING
#define MAX_SOCKET_STRING
Definition: libzmq/tests/testutil.hpp:35
zmq_bind
ZMQ_EXPORT int zmq_bind(void *s_, const char *addr_)
Definition: zmq.cpp:299
zmq_msg_recv
ZMQ_EXPORT int zmq_msg_recv(zmq_msg_t *msg_, void *s_, int flags_)
Definition: zmq.cpp:617
timeout
GLbitfield GLuint64 timeout
Definition: glcorearb.h:3588
sc
void * sc
Definition: test_channel.cpp:9
buffer
Definition: buffer_processor.h:43
test_context_socket
void * test_context_socket(int type_)
Definition: testutil_unity.cpp:200
ZMQ_DONTWAIT
#define ZMQ_DONTWAIT
Definition: zmq.h:358
SETTLE_TIME
#define SETTLE_TIME
Definition: libzmq/tests/testutil.hpp:31
zmq::protocol_name::inproc
static const char inproc[]
Definition: address.hpp:37
TEST_ASSERT_EQUAL_INT
#define TEST_ASSERT_EQUAL_INT(expected, actual)
Definition: unity.h:128
zmq_msg_init
ZMQ_EXPORT int zmq_msg_init(zmq_msg_t *msg_)
Definition: zmq.cpp:587
ZMQ_SNDHWM
#define ZMQ_SNDHWM
Definition: zmq.h:293
ZMQ_LINGER
#define ZMQ_LINGER
Definition: zmq.h:288
send_string_expect_success
void send_string_expect_success(void *socket_, const char *str_, int flags_)
Definition: testutil_unity.cpp:94
SEQ_END
const char * SEQ_END
Definition: testutil.cpp:47
RUN_TEST
RUN_TEST(test_spec_pushpull_inproc_push_round_robin_out)
len
int len
Definition: php/ext/google/protobuf/map.c:206
s_send_seq
void s_send_seq(void *socket_,...)
Definition: testutil.cpp:135
zmq_recv
ZMQ_EXPORT int zmq_recv(void *s_, void *buf_, size_t len_, int flags_)
Definition: zmq.cpp:487
main
int main(int argc, char **argv)
Definition: cppzmq/demo/main.cpp:3
test_destroy_queue_on_disconnect
void test_destroy_queue_on_disconnect(const char *bind_address_)
Definition: test_spec_pushpull.cpp:145
recv_string_expect_success
void recv_string_expect_success(void *socket_, const char *str_, int flags_)
Definition: testutil_unity.cpp:101
setup_test_environment
void setup_test_environment(int timeout_seconds_)
Definition: testutil.cpp:201
zmq_msg_init_size
ZMQ_EXPORT int zmq_msg_init_size(zmq_msg_t *msg_, size_t size_)
Definition: zmq.cpp:592
UNITY_BEGIN
UNITY_BEGIN()
s_recv_seq
void s_recv_seq(void *socket_,...)
Definition: testutil.cpp:158
ZMQ_SNDMORE
#define ZMQ_SNDMORE
Definition: zmq.h:359
def_test_spec_pushpull
#define def_test_spec_pushpull(name, bind_address_)
Definition: test_spec_pushpull.cpp:343
push
static void push(tarjan *t, const upb_refcounted *r)
Definition: ruby/ext/google/protobuf_c/upb.c:5890
TEST_ASSERT_FAILURE_ERRNO
#define TEST_ASSERT_FAILURE_ERRNO(error_code, expr)
Definition: testutil_unity.hpp:95
test_push_block_on_send_no_peers
void test_push_block_on_send_no_peers(const char *bind_address_)
Definition: test_spec_pushpull.cpp:127
zmq_send
ZMQ_EXPORT int zmq_send(void *s_, const void *buf_, size_t len_, int flags_)
Definition: zmq.cpp:377
ZMQ_PULL
#define ZMQ_PULL
Definition: zmq.h:265
test_context_socket_close
void * test_context_socket_close(void *socket_)
Definition: testutil_unity.cpp:208
a
GLboolean GLboolean GLboolean GLboolean a
Definition: glcorearb.h:3228
zmq_msg_close
ZMQ_EXPORT int zmq_msg_close(zmq_msg_t *msg_)
Definition: zmq.cpp:625
TEST_ASSERT_SUCCESS_ERRNO
#define TEST_ASSERT_SUCCESS_ERRNO(expr)
Definition: proxy_thr.cpp:47
zmq_getsockopt
ZMQ_EXPORT int zmq_getsockopt(void *s_, int option_, void *optval_, size_t *optvallen_)
Definition: zmq.cpp:261


libaditof
Author(s):
autogenerated on Wed May 21 2025 02:06:59