test_xpub_manual.cpp
Go to the documentation of this file.
1 /* SPDX-License-Identifier: MPL-2.0 */
2 
3 #include "testutil.hpp"
4 #include "testutil_unity.hpp"
5 
7 
8 void test_basic ()
9 {
10  // Create a publisher
11  void *pub = test_context_socket (ZMQ_XPUB);
12  int manual = 1;
14  zmq_setsockopt (pub, ZMQ_XPUB_MANUAL, &manual, 4));
15  TEST_ASSERT_SUCCESS_ERRNO (zmq_bind (pub, "inproc://soname"));
16 
17  // Create a subscriber
18  void *sub = test_context_socket (ZMQ_XSUB);
19  TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (sub, "inproc://soname"));
20 
21  // Subscribe for A
22  const char subscription[] = {1, 'A', 0};
23  send_string_expect_success (sub, subscription, 0);
24 
25  // Receive subscriptions from subscriber
26  recv_string_expect_success (pub, subscription, 0);
27 
28  // Subscribe socket for B instead
30 
31  // Sending A message and B Message
32  send_string_expect_success (pub, "A", 0);
33  send_string_expect_success (pub, "B", 0);
34 
36 
37  // Clean up.
40 }
41 
43 {
44  // Create a publisher
45  void *pub = test_context_socket (ZMQ_XPUB);
46  TEST_ASSERT_SUCCESS_ERRNO (zmq_bind (pub, "inproc://soname"));
47 
48  // set pub socket options
49  int manual = 1;
51  zmq_setsockopt (pub, ZMQ_XPUB_MANUAL, &manual, sizeof (manual)));
52 
53  // Create a subscriber
54  void *sub = test_context_socket (ZMQ_XSUB);
55  TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (sub, "inproc://soname"));
56 
57  // Subscribe for A
58  const uint8_t subscription1[] = {1, 'A'};
59  send_array_expect_success (sub, subscription1, 0);
60 
61  // Subscribe for B
62  const uint8_t subscription2[] = {1, 'B'};
63  send_array_expect_success (sub, subscription2, 0);
64 
65  char buffer[3];
66 
67  // Receive subscription "A" from subscriber
68  recv_array_expect_success (pub, subscription1, 0);
69 
70  // Subscribe socket for XA instead
72 
73  // Receive subscription "B" from subscriber
74  recv_array_expect_success (pub, subscription2, 0);
75 
76  // Subscribe socket for XB instead
78 
79  // Unsubscribe from A
80  const uint8_t unsubscription1[2] = {0, 'A'};
81  send_array_expect_success (sub, unsubscription1, 0);
82 
83  // Receive unsubscription "A" from subscriber
84  recv_array_expect_success (pub, unsubscription1, 0);
85 
86  // Unsubscribe socket from XA instead
88 
89  // Sending messages XA, XB
90  send_string_expect_success (pub, "XA", 0);
91  send_string_expect_success (pub, "XB", 0);
92 
93  // Subscriber should receive XB only
95 
96  // Close subscriber
98 
99  // Receive unsubscription "B"
100  const char unsubscription2[2] = {0, 'B'};
102  sizeof unsubscription2,
103  TEST_ASSERT_SUCCESS_ERRNO (zmq_recv (pub, buffer, sizeof buffer, 0)));
104  TEST_ASSERT_EQUAL_INT8_ARRAY (unsubscription2, buffer,
105  sizeof unsubscription2);
106 
107  // Unsubscribe socket from XB instead
109 
110  // Clean up.
112 }
113 
115 {
116  const uint8_t topic_buff[] = {"1"};
117  const uint8_t payload_buff[] = {"X"};
118 
119  char my_endpoint_backend[MAX_SOCKET_STRING];
120  char my_endpoint_frontend[MAX_SOCKET_STRING];
121 
122  int manual = 1;
123 
124  // proxy frontend
125  void *xsub_proxy = test_context_socket (ZMQ_XSUB);
126  bind_loopback_ipv4 (xsub_proxy, my_endpoint_frontend,
127  sizeof my_endpoint_frontend);
128 
129  // proxy backend
130  void *xpub_proxy = test_context_socket (ZMQ_XPUB);
132  zmq_setsockopt (xpub_proxy, ZMQ_XPUB_MANUAL, &manual, 4));
133  bind_loopback_ipv4 (xpub_proxy, my_endpoint_backend,
134  sizeof my_endpoint_backend);
135 
136  // publisher
137  void *pub = test_context_socket (ZMQ_PUB);
138  TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (pub, my_endpoint_frontend));
139 
140  // first subscriber subscribes
141  void *sub1 = test_context_socket (ZMQ_SUB);
142  TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (sub1, my_endpoint_backend));
144  zmq_setsockopt (sub1, ZMQ_SUBSCRIBE, topic_buff, 1));
145 
146  // wait
148 
149  // proxy reroutes and confirms subscriptions
150  const uint8_t subscription[2] = {1, *topic_buff};
151  recv_array_expect_success (xpub_proxy, subscription, ZMQ_DONTWAIT);
153  zmq_setsockopt (xpub_proxy, ZMQ_SUBSCRIBE, topic_buff, 1));
154  send_array_expect_success (xsub_proxy, subscription, 0);
155 
156  // second subscriber subscribes
157  void *sub2 = test_context_socket (ZMQ_SUB);
158  TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (sub2, my_endpoint_backend));
160  zmq_setsockopt (sub2, ZMQ_SUBSCRIBE, topic_buff, 1));
161 
162  // wait
164 
165  // proxy reroutes
166  recv_array_expect_success (xpub_proxy, subscription, ZMQ_DONTWAIT);
168  zmq_setsockopt (xpub_proxy, ZMQ_SUBSCRIBE, topic_buff, 1));
169  send_array_expect_success (xsub_proxy, subscription, 0);
170 
171  // wait
173 
174  // let publisher send a msg
175  send_array_expect_success (pub, topic_buff, ZMQ_SNDMORE);
176  send_array_expect_success (pub, payload_buff, 0);
177 
178  // wait
180 
181  // proxy reroutes data messages to subscribers
182  recv_array_expect_success (xsub_proxy, topic_buff, ZMQ_DONTWAIT);
183  recv_array_expect_success (xsub_proxy, payload_buff, ZMQ_DONTWAIT);
184  send_array_expect_success (xpub_proxy, topic_buff, ZMQ_SNDMORE);
185  send_array_expect_success (xpub_proxy, payload_buff, 0);
186 
187  // wait
189 
190  // each subscriber should now get a message
191  recv_array_expect_success (sub2, topic_buff, ZMQ_DONTWAIT);
192  recv_array_expect_success (sub2, payload_buff, ZMQ_DONTWAIT);
193 
194  recv_array_expect_success (sub1, topic_buff, ZMQ_DONTWAIT);
195  recv_array_expect_success (sub1, payload_buff, ZMQ_DONTWAIT);
196 
197  // Disconnect both subscribers
200 
201  // wait
203 
204  // unsubscribe messages are passed from proxy to publisher
205  const uint8_t unsubscription[] = {0, *topic_buff};
206  recv_array_expect_success (xpub_proxy, unsubscription, 0);
208  zmq_setsockopt (xpub_proxy, ZMQ_UNSUBSCRIBE, topic_buff, 1));
209  send_array_expect_success (xsub_proxy, unsubscription, 0);
210 
211  // should receive another unsubscribe msg
212  recv_array_expect_success (xpub_proxy, unsubscription, 0);
214  zmq_setsockopt (xpub_proxy, ZMQ_UNSUBSCRIBE, topic_buff, 1));
215  send_array_expect_success (xsub_proxy, unsubscription, 0);
216 
217  // wait
219 
220  // let publisher send a msg
221  send_array_expect_success (pub, topic_buff, ZMQ_SNDMORE);
222  send_array_expect_success (pub, payload_buff, 0);
223 
224  // wait
226 
227  // nothing should come to the proxy
228  char buffer[1];
230  EAGAIN, zmq_recv (xsub_proxy, buffer, sizeof buffer, ZMQ_DONTWAIT));
231 
233  test_context_socket_close (xpub_proxy);
234  test_context_socket_close (xsub_proxy);
235 }
236 
238 {
239  const char *topic1 = "1";
240  const char *topic2 = "2";
241  const char *payload = "X";
242 
243  char my_endpoint_backend[MAX_SOCKET_STRING];
244  char my_endpoint_frontend[MAX_SOCKET_STRING];
245 
246  int manual = 1;
247 
248  // proxy frontend
249  void *xsub_proxy = test_context_socket (ZMQ_XSUB);
250  bind_loopback_ipv4 (xsub_proxy, my_endpoint_frontend,
251  sizeof my_endpoint_frontend);
252 
253  // proxy backend
254  void *xpub_proxy = test_context_socket (ZMQ_XPUB);
256  zmq_setsockopt (xpub_proxy, ZMQ_XPUB_MANUAL, &manual, 4));
257  bind_loopback_ipv4 (xpub_proxy, my_endpoint_backend,
258  sizeof my_endpoint_backend);
259 
260  // publisher
261  void *pub = test_context_socket (ZMQ_PUB);
262  TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (pub, my_endpoint_frontend));
263 
264  // Here's the problem: because subscribers subscribe in quick succession,
265  // the proxy is unable to confirm the first subscription before receiving
266  // the second. This causes the first subscription to get lost.
267 
268  // first subscriber
269  void *sub1 = test_context_socket (ZMQ_SUB);
270  TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (sub1, my_endpoint_backend));
272 
273  // wait
275 
276  // proxy now reroutes and confirms subscriptions
277  const uint8_t subscription1[] = {1, static_cast<uint8_t> (topic1[0])};
278  recv_array_expect_success (xpub_proxy, subscription1, ZMQ_DONTWAIT);
280  zmq_setsockopt (xpub_proxy, ZMQ_SUBSCRIBE, topic1, 1));
281  send_array_expect_success (xsub_proxy, subscription1, 0);
282 
283  // second subscriber
284  void *sub2 = test_context_socket (ZMQ_SUB);
285  TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (sub2, my_endpoint_backend));
287 
288  // wait
290 
291  const uint8_t subscription2[] = {1, static_cast<uint8_t> (topic2[0])};
292  recv_array_expect_success (xpub_proxy, subscription2, ZMQ_DONTWAIT);
294  zmq_setsockopt (xpub_proxy, ZMQ_SUBSCRIBE, topic2, 1));
295  send_array_expect_success (xsub_proxy, subscription2, 0);
296 
297  // wait
299 
300  // let publisher send 2 msgs, each with its own topic_buff
302  send_string_expect_success (pub, payload, 0);
304  send_string_expect_success (pub, payload, 0);
305 
306  // wait
308 
309  // proxy reroutes data messages to subscribers
310  recv_string_expect_success (xsub_proxy, topic1, ZMQ_DONTWAIT);
311  recv_string_expect_success (xsub_proxy, payload, ZMQ_DONTWAIT);
312  send_string_expect_success (xpub_proxy, topic1, ZMQ_SNDMORE);
313  send_string_expect_success (xpub_proxy, payload, 0);
314 
315  recv_string_expect_success (xsub_proxy, topic2, ZMQ_DONTWAIT);
316  recv_string_expect_success (xsub_proxy, payload, ZMQ_DONTWAIT);
317  send_string_expect_success (xpub_proxy, topic2, ZMQ_SNDMORE);
318  send_string_expect_success (xpub_proxy, payload, 0);
319 
320  // wait
322 
323  // each subscriber should now get a message
325  recv_string_expect_success (sub2, payload, ZMQ_DONTWAIT);
326 
328  recv_string_expect_success (sub1, payload, ZMQ_DONTWAIT);
329 
330  // Clean up
334  test_context_socket_close (xpub_proxy);
335  test_context_socket_close (xsub_proxy);
336 }
337 
339 {
341 
342  // Create a publisher
343  void *pub = test_context_socket (ZMQ_XPUB);
344  int manual = 1;
346  zmq_setsockopt (pub, ZMQ_XPUB_MANUAL, &manual, 4));
348 
349  // Create a subscriber
350  void *sub = test_context_socket (ZMQ_XSUB);
352 
353  // Subscribe for A
354  const uint8_t subscription1[2] = {1, 'A'};
355  send_array_expect_success (sub, subscription1, 0);
356 
357 
358  // Receive subscriptions from subscriber
359  recv_array_expect_success (pub, subscription1, 0);
361 
362  // send 2 messages
363  send_string_expect_success (pub, "XA", 0);
364  send_string_expect_success (pub, "XB", 0);
365 
366  // receive the single message
367  recv_string_expect_success (sub, "XA", 0);
368 
369  // should be nothing left in the queue
370  char buffer[2];
372  EAGAIN, zmq_recv (sub, buffer, sizeof buffer, ZMQ_DONTWAIT));
373 
374  // close the socket
376 
377  // closing the socket will result in an unsubscribe event
378  const uint8_t unsubscription[2] = {0, 'A'};
379  recv_array_expect_success (pub, unsubscription, 0);
380 
381  // this doesn't really do anything
382  // there is no last_pipe set it will just fail silently
384 
385  // reconnect
388 
389  // send a subscription for B
390  const uint8_t subscription2[2] = {1, 'B'};
391  send_array_expect_success (sub, subscription2, 0);
392 
393  // receive the subscription, overwrite it to XB
394  recv_array_expect_success (pub, subscription2, 0);
396 
397  // send 2 messages
398  send_string_expect_success (pub, "XA", 0);
399  send_string_expect_success (pub, "XB", 0);
400 
401  // receive the single message
402  recv_string_expect_success (sub, "XB", 0);
403 
404  // should be nothing left in the queue
406  EAGAIN, zmq_recv (sub, buffer, sizeof buffer, ZMQ_DONTWAIT));
407 
408  // Clean up.
411 }
412 
414 {
415  // Create a publisher
416  void *pub = test_context_socket (ZMQ_XPUB);
417  TEST_ASSERT_SUCCESS_ERRNO (zmq_bind (pub, "inproc://soname"));
418 
419  // Create a subscriber
420  void *sub = test_context_socket (ZMQ_XSUB);
421  TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (sub, "inproc://soname"));
422 
423  // Send some data that is neither sub nor unsub
424  const char subscription[] = {2, 'A', 0};
425  send_string_expect_success (sub, subscription, 0);
426 
427  // Receive subscriptions from subscriber
428  recv_string_expect_success (pub, subscription, 0);
429 
430  // Clean up.
433 }
434 
435 #ifdef ZMQ_ONLY_FIRST_SUBSCRIBE
437 {
438  const int only_first_subscribe = 1;
439 
440  // Create a publisher
441  void *pub = test_context_socket (ZMQ_XPUB);
442  TEST_ASSERT_SUCCESS_ERRNO (zmq_bind (pub, "inproc://soname"));
444  &only_first_subscribe,
445  sizeof (only_first_subscribe)));
446 
447  // Create a subscriber
448  void *sub = test_context_socket (ZMQ_XSUB);
449  TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (sub, "inproc://soname"));
451  &only_first_subscribe,
452  sizeof (only_first_subscribe)));
453 
454  // Send some data that is neither sub nor unsub
455  const uint8_t msg_common[] = {'A', 'B', 'C'};
456  // Message starts with 0 but should still treated as user
457  const uint8_t msg_0a[] = {0, 'B', 'C'};
458  const uint8_t msg_0b[] = {0, 'C', 'D'};
459  // Message starts with 1 but should still treated as user
460  const uint8_t msg_1a[] = {1, 'B', 'C'};
461  const uint8_t msg_1b[] = {1, 'C', 'D'};
462 
463  // Test second message starting with 0
464  send_array_expect_success (sub, msg_common, ZMQ_SNDMORE);
465  send_array_expect_success (sub, msg_0a, 0);
466 
467  // Receive messages from subscriber
468  recv_array_expect_success (pub, msg_common, 0);
469  recv_array_expect_success (pub, msg_0a, 0);
470 
471  // Test second message starting with 1
472  send_array_expect_success (sub, msg_common, ZMQ_SNDMORE);
473  send_array_expect_success (sub, msg_1a, 0);
474 
475  // Receive messages from subscriber
476  recv_array_expect_success (pub, msg_common, 0);
477  recv_array_expect_success (pub, msg_1a, 0);
478 
479  // Test first message starting with 1
480  send_array_expect_success (sub, msg_1a, ZMQ_SNDMORE);
481  send_array_expect_success (sub, msg_1b, 0);
482  recv_array_expect_success (pub, msg_1a, 0);
483  recv_array_expect_success (pub, msg_1b, 0);
484 
485  send_array_expect_success (sub, msg_0a, ZMQ_SNDMORE);
486  send_array_expect_success (sub, msg_0b, 0);
487  recv_array_expect_success (pub, msg_0a, 0);
488  recv_array_expect_success (pub, msg_0b, 0);
489 
490  // Clean up.
493 }
494 #endif
495 
496 int main ()
497 {
499 
500  UNITY_BEGIN ();
507 #ifdef ZMQ_ONLY_FIRST_SUBSCRIBE
509 #endif
510 
511  return UNITY_END ();
512 }
recv_array_expect_success
void recv_array_expect_success(void *socket_, const uint8_t(&array_)[SIZE], int flags_)
Definition: testutil_unity.hpp:148
ZMQ_XPUB_MANUAL
#define ZMQ_XPUB_MANUAL
Definition: zmq.h:332
test_user_message_multi
void test_user_message_multi()
Definition: test_xpub_manual.cpp:436
UNITY_END
return UNITY_END()
ZMQ_XPUB
#define ZMQ_XPUB
Definition: zmq.h:267
test_unsubscribe_manual
void test_unsubscribe_manual()
Definition: test_xpub_manual.cpp:42
ZMQ_PUB
#define ZMQ_PUB
Definition: zmq.h:259
msleep
void msleep(int milliseconds_)
Definition: testutil.cpp:227
EAGAIN
#define EAGAIN
Definition: errno.hpp:14
RUN_TEST
#define RUN_TEST(func)
Definition: unity_internals.h:615
SETUP_TEARDOWN_TESTCONTEXT
#define SETUP_TEARDOWN_TESTCONTEXT
Definition: testutil_unity.hpp:172
ZMQ_SUBSCRIBE
#define ZMQ_SUBSCRIBE
Definition: zmq.h:278
bind_loopback_ipv4
void bind_loopback_ipv4(void *socket_, char *my_endpoint_, size_t len_)
Definition: testutil_unity.cpp:246
ZMQ_XSUB
#define ZMQ_XSUB
Definition: zmq.h:268
ZMQ_SUB
#define ZMQ_SUB
Definition: zmq.h:260
test_unsubscribe_cleanup
void test_unsubscribe_cleanup()
Definition: test_xpub_manual.cpp:338
zmq_connect
ZMQ_EXPORT int zmq_connect(void *s_, const char *addr_)
Definition: zmq.cpp:307
TEST_ASSERT_EQUAL_INT8_ARRAY
#define TEST_ASSERT_EQUAL_INT8_ARRAY(expected, actual, num_elements)
Definition: unity.h:241
testutil_unity.hpp
zmq_setsockopt
ZMQ_EXPORT int zmq_setsockopt(void *s_, int option_, const void *optval_, size_t optvallen_)
Definition: zmq.cpp:250
testutil.hpp
send_array_expect_success
void send_array_expect_success(void *socket_, const uint8_t(&array_)[SIZE], int flags_)
Definition: testutil_unity.hpp:132
my_endpoint
char my_endpoint[MAX_SOCKET_STRING]
Definition: test_security_curve.cpp:31
MAX_SOCKET_STRING
#define MAX_SOCKET_STRING
Definition: libzmq/tests/testutil.hpp:35
zmq_bind
ZMQ_EXPORT int zmq_bind(void *s_, const char *addr_)
Definition: zmq.cpp:299
buffer
Definition: buffer_processor.h:43
test_context_socket
void * test_context_socket(int type_)
Definition: testutil_unity.cpp:200
ZMQ_DONTWAIT
#define ZMQ_DONTWAIT
Definition: zmq.h:358
SETTLE_TIME
#define SETTLE_TIME
Definition: libzmq/tests/testutil.hpp:31
TEST_ASSERT_EQUAL_INT
#define TEST_ASSERT_EQUAL_INT(expected, actual)
Definition: unity.h:128
send_string_expect_success
void send_string_expect_success(void *socket_, const char *str_, int flags_)
Definition: testutil_unity.cpp:94
ZMQ_ONLY_FIRST_SUBSCRIBE
#define ZMQ_ONLY_FIRST_SUBSCRIBE
Definition: zmq_draft.h:40
test_basic
SETUP_TEARDOWN_TESTCONTEXT void test_basic()
Definition: test_xpub_manual.cpp:8
zmq_recv
ZMQ_EXPORT int zmq_recv(void *s_, void *buf_, size_t len_, int flags_)
Definition: zmq.cpp:487
ZMQ_UNSUBSCRIBE
#define ZMQ_UNSUBSCRIBE
Definition: zmq.h:279
recv_string_expect_success
void recv_string_expect_success(void *socket_, const char *str_, int flags_)
Definition: testutil_unity.cpp:101
setup_test_environment
void setup_test_environment(int timeout_seconds_)
Definition: testutil.cpp:201
test_user_message
void test_user_message()
Definition: test_xpub_manual.cpp:413
UNITY_BEGIN
UNITY_BEGIN()
ZMQ_SNDMORE
#define ZMQ_SNDMORE
Definition: zmq.h:359
TEST_ASSERT_FAILURE_ERRNO
#define TEST_ASSERT_FAILURE_ERRNO(error_code, expr)
Definition: testutil_unity.hpp:95
main
int main()
Definition: test_xpub_manual.cpp:496
test_xpub_proxy_unsubscribe_on_disconnect
void test_xpub_proxy_unsubscribe_on_disconnect()
Definition: test_xpub_manual.cpp:114
test_context_socket_close
void * test_context_socket_close(void *socket_)
Definition: testutil_unity.cpp:208
TEST_ASSERT_SUCCESS_ERRNO
#define TEST_ASSERT_SUCCESS_ERRNO(expr)
Definition: proxy_thr.cpp:47
test_missing_subscriptions
void test_missing_subscriptions()
Definition: test_xpub_manual.cpp:237


libaditof
Author(s):
autogenerated on Wed May 21 2025 02:06:59