test_xpub_manual_last_value.cpp
Go to the documentation of this file.
1 /* SPDX-License-Identifier: MPL-2.0 */
2 
3 #include "testutil.hpp"
4 #include "testutil_unity.hpp"
5 
7 
8 void test_basic ()
9 {
10  // Create a publisher
11  void *pub = test_context_socket (ZMQ_XPUB);
12  int manual = 1;
14  zmq_setsockopt (pub, ZMQ_XPUB_MANUAL_LAST_VALUE, &manual, 4));
15  TEST_ASSERT_SUCCESS_ERRNO (zmq_bind (pub, "inproc://soname"));
16 
17  // Create a subscriber
18  void *sub = test_context_socket (ZMQ_XSUB);
19  TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (sub, "inproc://soname"));
20 
21  // Subscribe for A
22  const char subscription[] = {1, 'A', 0};
23  send_string_expect_success (sub, subscription, 0);
24 
25  // Receive subscriptions from subscriber
26  recv_string_expect_success (pub, subscription, 0);
27 
28  // Subscribe socket for B instead
30 
31  // Sending A message and B Message
32  send_string_expect_success (pub, "A", 0);
33  send_string_expect_success (pub, "B", 0);
34 
36 
37  // Clean up.
40 }
41 
43 {
44  // Create a publisher
45  void *pub = test_context_socket (ZMQ_XPUB);
46  TEST_ASSERT_SUCCESS_ERRNO (zmq_bind (pub, "inproc://soname"));
47 
48  // set pub socket options
49  int manual = 1;
51  &manual, sizeof (manual)));
52 
53  // Create a subscriber
54  void *sub = test_context_socket (ZMQ_XSUB);
55  TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (sub, "inproc://soname"));
56 
57  // Subscribe for A
58  const uint8_t subscription1[] = {1, 'A'};
59  send_array_expect_success (sub, subscription1, 0);
60 
61  // Subscribe for B
62  const uint8_t subscription2[] = {1, 'B'};
63  send_array_expect_success (sub, subscription2, 0);
64 
65  char buffer[3];
66 
67  // Receive subscription "A" from subscriber
68  recv_array_expect_success (pub, subscription1, 0);
69 
70  // Subscribe socket for XA instead
72 
73  // Receive subscription "B" from subscriber
74  recv_array_expect_success (pub, subscription2, 0);
75 
76  // Subscribe socket for XB instead
78 
79  // Unsubscribe from A
80  const uint8_t unsubscription1[2] = {0, 'A'};
81  send_array_expect_success (sub, unsubscription1, 0);
82 
83  // Receive unsubscription "A" from subscriber
84  recv_array_expect_success (pub, unsubscription1, 0);
85 
86  // Unsubscribe socket from XA instead
88 
89  // Sending messages XA, XB
90  send_string_expect_success (pub, "XA", 0);
91  send_string_expect_success (pub, "XB", 0);
92 
93  // Subscriber should receive XB only
95 
96  // Close subscriber
98 
99  // Receive unsubscription "B"
100  const char unsubscription2[2] = {0, 'B'};
102  sizeof unsubscription2,
103  TEST_ASSERT_SUCCESS_ERRNO (zmq_recv (pub, buffer, sizeof buffer, 0)));
104  TEST_ASSERT_EQUAL_INT8_ARRAY (unsubscription2, buffer,
105  sizeof unsubscription2);
106 
107  // Unsubscribe socket from XB instead
109 
110  // Clean up.
112 }
113 
115 {
116  const uint8_t topic_buff[] = {"1"};
117  const uint8_t payload_buff[] = {"X"};
118 
119  char my_endpoint_backend[MAX_SOCKET_STRING];
120  char my_endpoint_frontend[MAX_SOCKET_STRING];
121 
122  int manual = 1;
123 
124  // proxy frontend
125  void *xsub_proxy = test_context_socket (ZMQ_XSUB);
126  bind_loopback_ipv4 (xsub_proxy, my_endpoint_frontend,
127  sizeof my_endpoint_frontend);
128 
129  // proxy backend
130  void *xpub_proxy = test_context_socket (ZMQ_XPUB);
132  zmq_setsockopt (xpub_proxy, ZMQ_XPUB_MANUAL_LAST_VALUE, &manual, 4));
133  bind_loopback_ipv4 (xpub_proxy, my_endpoint_backend,
134  sizeof my_endpoint_backend);
135 
136  // publisher
137  void *pub = test_context_socket (ZMQ_PUB);
138  TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (pub, my_endpoint_frontend));
139 
140  // first subscriber subscribes
141  void *sub1 = test_context_socket (ZMQ_SUB);
142  TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (sub1, my_endpoint_backend));
144  zmq_setsockopt (sub1, ZMQ_SUBSCRIBE, topic_buff, 1));
145 
146  // wait
148 
149  // proxy reroutes and confirms subscriptions
150  const uint8_t subscription[2] = {1, *topic_buff};
151  recv_array_expect_success (xpub_proxy, subscription, ZMQ_DONTWAIT);
153  zmq_setsockopt (xpub_proxy, ZMQ_SUBSCRIBE, topic_buff, 1));
154  send_array_expect_success (xsub_proxy, subscription, 0);
155 
156  // second subscriber subscribes
157  void *sub2 = test_context_socket (ZMQ_SUB);
158  TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (sub2, my_endpoint_backend));
160  zmq_setsockopt (sub2, ZMQ_SUBSCRIBE, topic_buff, 1));
161 
162  // wait
164 
165  // proxy reroutes
166  recv_array_expect_success (xpub_proxy, subscription, ZMQ_DONTWAIT);
168  zmq_setsockopt (xpub_proxy, ZMQ_SUBSCRIBE, topic_buff, 1));
169  send_array_expect_success (xsub_proxy, subscription, 0);
170 
171  // wait
173 
174  // let publisher send a msg
175  send_array_expect_success (pub, topic_buff, ZMQ_SNDMORE);
176  send_array_expect_success (pub, payload_buff, 0);
177 
178  // wait
180 
181  // proxy reroutes data messages to subscribers
182  recv_array_expect_success (xsub_proxy, topic_buff, ZMQ_DONTWAIT);
183  recv_array_expect_success (xsub_proxy, payload_buff, ZMQ_DONTWAIT);
184 
185  // send 2 messages
186  send_array_expect_success (xpub_proxy, topic_buff, ZMQ_SNDMORE);
187  send_array_expect_success (xpub_proxy, payload_buff, 0);
188  send_array_expect_success (xpub_proxy, topic_buff, ZMQ_SNDMORE);
189  send_array_expect_success (xpub_proxy, payload_buff, 0);
190 
191  // wait
193 
194  // sub2 will get 2 messages because the last subscription is sub2.
195  recv_array_expect_success (sub2, topic_buff, ZMQ_DONTWAIT);
196  recv_array_expect_success (sub2, payload_buff, ZMQ_DONTWAIT);
197  recv_array_expect_success (sub2, topic_buff, ZMQ_DONTWAIT);
198  recv_array_expect_success (sub2, payload_buff, ZMQ_DONTWAIT);
199 
200  recv_array_expect_success (sub1, topic_buff, ZMQ_DONTWAIT);
201  recv_array_expect_success (sub1, payload_buff, ZMQ_DONTWAIT);
202 
203  // Disconnect both subscribers
206 
207  // wait
209 
210  // unsubscribe messages are passed from proxy to publisher
211  const uint8_t unsubscription[] = {0, *topic_buff};
212  recv_array_expect_success (xpub_proxy, unsubscription, 0);
214  zmq_setsockopt (xpub_proxy, ZMQ_UNSUBSCRIBE, topic_buff, 1));
215  send_array_expect_success (xsub_proxy, unsubscription, 0);
216 
217  // should receive another unsubscribe msg
218  recv_array_expect_success (xpub_proxy, unsubscription, 0);
220  zmq_setsockopt (xpub_proxy, ZMQ_UNSUBSCRIBE, topic_buff, 1));
221  send_array_expect_success (xsub_proxy, unsubscription, 0);
222 
223  // wait
225 
226  // let publisher send a msg
227  send_array_expect_success (pub, topic_buff, ZMQ_SNDMORE);
228  send_array_expect_success (pub, payload_buff, 0);
229 
230  // wait
232 
233  // nothing should come to the proxy
234  char buffer[1];
236  EAGAIN, zmq_recv (xsub_proxy, buffer, sizeof buffer, ZMQ_DONTWAIT));
237 
239  test_context_socket_close (xpub_proxy);
240  test_context_socket_close (xsub_proxy);
241 }
242 
244 {
245  const char *topic1 = "1";
246  const char *topic2 = "2";
247  const char *payload = "X";
248 
249  char my_endpoint_backend[MAX_SOCKET_STRING];
250  char my_endpoint_frontend[MAX_SOCKET_STRING];
251 
252  int manual = 1;
253 
254  // proxy frontend
255  void *xsub_proxy = test_context_socket (ZMQ_XSUB);
256  bind_loopback_ipv4 (xsub_proxy, my_endpoint_frontend,
257  sizeof my_endpoint_frontend);
258 
259  // proxy backend
260  void *xpub_proxy = test_context_socket (ZMQ_XPUB);
262  zmq_setsockopt (xpub_proxy, ZMQ_XPUB_MANUAL_LAST_VALUE, &manual, 4));
263  bind_loopback_ipv4 (xpub_proxy, my_endpoint_backend,
264  sizeof my_endpoint_backend);
265 
266  // publisher
267  void *pub = test_context_socket (ZMQ_PUB);
268  TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (pub, my_endpoint_frontend));
269 
270  // Here's the problem: because subscribers subscribe in quick succession,
271  // the proxy is unable to confirm the first subscription before receiving
272  // the second. This causes the first subscription to get lost.
273 
274  // first subscriber
275  void *sub1 = test_context_socket (ZMQ_SUB);
276  TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (sub1, my_endpoint_backend));
278 
279  // wait
281 
282  // proxy now reroutes and confirms subscriptions
283  const uint8_t subscription1[] = {1, static_cast<uint8_t> (topic1[0])};
284  recv_array_expect_success (xpub_proxy, subscription1, ZMQ_DONTWAIT);
286  zmq_setsockopt (xpub_proxy, ZMQ_SUBSCRIBE, topic1, 1));
287  send_array_expect_success (xsub_proxy, subscription1, 0);
288 
289  // second subscriber
290  void *sub2 = test_context_socket (ZMQ_SUB);
291  TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (sub2, my_endpoint_backend));
293 
294  // wait
296 
297  // proxy now reroutes and confirms subscriptions
298  const uint8_t subscription2[] = {1, static_cast<uint8_t> (topic2[0])};
299  recv_array_expect_success (xpub_proxy, subscription2, ZMQ_DONTWAIT);
301  zmq_setsockopt (xpub_proxy, ZMQ_SUBSCRIBE, topic2, 1));
302  send_array_expect_success (xsub_proxy, subscription2, 0);
303 
304  // wait
306 
307  // let publisher send 2 msgs, each with its own topic_buff
309  send_string_expect_success (pub, payload, 0);
311  send_string_expect_success (pub, payload, 0);
312 
313  // wait
315 
316  // proxy reroutes data messages to subscribers
317  recv_string_expect_success (xsub_proxy, topic1, ZMQ_DONTWAIT);
318  recv_string_expect_success (xsub_proxy, payload, ZMQ_DONTWAIT);
319  send_string_expect_success (xpub_proxy, topic1, ZMQ_SNDMORE);
320  send_string_expect_success (xpub_proxy, payload, 0);
321 
322  recv_string_expect_success (xsub_proxy, topic2, ZMQ_DONTWAIT);
323  recv_string_expect_success (xsub_proxy, payload, ZMQ_DONTWAIT);
324  send_string_expect_success (xpub_proxy, topic2, ZMQ_SNDMORE);
325  send_string_expect_success (xpub_proxy, payload, 0);
326 
327  // wait
329 
330  // only sub2 should now get a message
332  recv_string_expect_success (sub2, payload, ZMQ_DONTWAIT);
333 
334  //recv_string_expect_success (sub1, topic1, ZMQ_DONTWAIT);
335  //recv_string_expect_success (sub1, payload, ZMQ_DONTWAIT);
336 
337  // Clean up
341  test_context_socket_close (xpub_proxy);
342  test_context_socket_close (xsub_proxy);
343 }
344 
346 {
348 
349  // Create a publisher
350  void *pub = test_context_socket (ZMQ_XPUB);
351  int manual = 1;
353  zmq_setsockopt (pub, ZMQ_XPUB_MANUAL_LAST_VALUE, &manual, 4));
355 
356  // Create a subscriber
357  void *sub = test_context_socket (ZMQ_XSUB);
359 
360  // Subscribe for A
361  const uint8_t subscription1[2] = {1, 'A'};
362  send_array_expect_success (sub, subscription1, 0);
363 
364 
365  // Receive subscriptions from subscriber
366  recv_array_expect_success (pub, subscription1, 0);
368 
369  // send 2 messages
370  send_string_expect_success (pub, "XA", 0);
371  send_string_expect_success (pub, "XB", 0);
372 
373  // receive the single message
374  recv_string_expect_success (sub, "XA", 0);
375 
376  // should be nothing left in the queue
377  char buffer[2];
379  EAGAIN, zmq_recv (sub, buffer, sizeof buffer, ZMQ_DONTWAIT));
380 
381  // close the socket
383 
384  // closing the socket will result in an unsubscribe event
385  const uint8_t unsubscription[2] = {0, 'A'};
386  recv_array_expect_success (pub, unsubscription, 0);
387 
388  // this doesn't really do anything
389  // there is no last_pipe set it will just fail silently
391 
392  // reconnect
395 
396  // send a subscription for B
397  const uint8_t subscription2[2] = {1, 'B'};
398  send_array_expect_success (sub, subscription2, 0);
399 
400  // receive the subscription, overwrite it to XB
401  recv_array_expect_success (pub, subscription2, 0);
403 
404  // send 2 messages
405  send_string_expect_success (pub, "XA", 0);
406  send_string_expect_success (pub, "XB", 0);
407 
408  // receive the single message
409  recv_string_expect_success (sub, "XB", 0);
410 
411  // should be nothing left in the queue
413  EAGAIN, zmq_recv (sub, buffer, sizeof buffer, ZMQ_DONTWAIT));
414 
415  // Clean up.
418 }
419 
421 {
422  // Create a publisher
423  void *pub = test_context_socket (ZMQ_XPUB);
424 
425  int hwm = 2000;
427 
428  // set pub socket options
429  int manual = 1;
431  zmq_setsockopt (pub, ZMQ_XPUB_MANUAL_LAST_VALUE, &manual, 4));
432 
433  TEST_ASSERT_SUCCESS_ERRNO (zmq_bind (pub, "inproc://soname"));
434 
435  // Create a subscriber
436  void *sub = test_context_socket (ZMQ_SUB);
437  TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (sub, "inproc://soname"));
438 
439  // Create another subscriber
440  void *sub2 = test_context_socket (ZMQ_SUB);
441  TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (sub2, "inproc://soname"));
442 
443  // Subscribe for "A".
445 
446  const uint8_t subscription[2] = {1, 'A'};
447  // we must wait for the subscription to be processed here, otherwise some
448  // or all published messages might be lost
449  recv_array_expect_success (pub, subscription, 0);
450 
451  // manual subscribe message
453  send_string_expect_success (pub, "A", 0);
454  recv_string_expect_success (sub, "A", 0);
455 
456  // Subscribe for "A".
458  recv_array_expect_success (pub, subscription, 0);
460  send_string_expect_success (pub, "A", 0);
461  recv_string_expect_success (sub2, "A", 0);
462 
463  char buffer[255];
464  // sub won't get a message because the last subscription pipe is sub2.
466  EAGAIN, zmq_recv (sub, buffer, sizeof (buffer), ZMQ_DONTWAIT));
467 
468  // Clean up.
472 }
473 
474 int main ()
475 {
477 
478  UNITY_BEGIN ();
485 
486  return UNITY_END ();
487 }
recv_array_expect_success
void recv_array_expect_success(void *socket_, const uint8_t(&array_)[SIZE], int flags_)
Definition: testutil_unity.hpp:148
test_xpub_proxy_unsubscribe_on_disconnect
void test_xpub_proxy_unsubscribe_on_disconnect()
Definition: test_xpub_manual_last_value.cpp:114
test_unsubscribe_manual
void test_unsubscribe_manual()
Definition: test_xpub_manual_last_value.cpp:42
UNITY_END
return UNITY_END()
ZMQ_XPUB
#define ZMQ_XPUB
Definition: zmq.h:267
ZMQ_PUB
#define ZMQ_PUB
Definition: zmq.h:259
test_unsubscribe_cleanup
void test_unsubscribe_cleanup()
Definition: test_xpub_manual_last_value.cpp:345
msleep
void msleep(int milliseconds_)
Definition: testutil.cpp:227
EAGAIN
#define EAGAIN
Definition: errno.hpp:14
RUN_TEST
#define RUN_TEST(func)
Definition: unity_internals.h:615
SETUP_TEARDOWN_TESTCONTEXT
#define SETUP_TEARDOWN_TESTCONTEXT
Definition: testutil_unity.hpp:172
ZMQ_XPUB_MANUAL_LAST_VALUE
#define ZMQ_XPUB_MANUAL_LAST_VALUE
Definition: zmq_draft.h:30
test_manual_last_value
void test_manual_last_value()
Definition: test_xpub_manual_last_value.cpp:420
ZMQ_SUBSCRIBE
#define ZMQ_SUBSCRIBE
Definition: zmq.h:278
bind_loopback_ipv4
void bind_loopback_ipv4(void *socket_, char *my_endpoint_, size_t len_)
Definition: testutil_unity.cpp:246
ZMQ_XSUB
#define ZMQ_XSUB
Definition: zmq.h:268
ZMQ_SUB
#define ZMQ_SUB
Definition: zmq.h:260
zmq_connect
ZMQ_EXPORT int zmq_connect(void *s_, const char *addr_)
Definition: zmq.cpp:307
TEST_ASSERT_EQUAL_INT8_ARRAY
#define TEST_ASSERT_EQUAL_INT8_ARRAY(expected, actual, num_elements)
Definition: unity.h:241
testutil_unity.hpp
zmq_setsockopt
ZMQ_EXPORT int zmq_setsockopt(void *s_, int option_, const void *optval_, size_t optvallen_)
Definition: zmq.cpp:250
main
int main()
Definition: test_xpub_manual_last_value.cpp:474
testutil.hpp
send_array_expect_success
void send_array_expect_success(void *socket_, const uint8_t(&array_)[SIZE], int flags_)
Definition: testutil_unity.hpp:132
my_endpoint
char my_endpoint[MAX_SOCKET_STRING]
Definition: test_security_curve.cpp:31
MAX_SOCKET_STRING
#define MAX_SOCKET_STRING
Definition: libzmq/tests/testutil.hpp:35
zmq_bind
ZMQ_EXPORT int zmq_bind(void *s_, const char *addr_)
Definition: zmq.cpp:299
buffer
Definition: buffer_processor.h:43
test_context_socket
void * test_context_socket(int type_)
Definition: testutil_unity.cpp:200
ZMQ_DONTWAIT
#define ZMQ_DONTWAIT
Definition: zmq.h:358
SETTLE_TIME
#define SETTLE_TIME
Definition: libzmq/tests/testutil.hpp:31
test_basic
SETUP_TEARDOWN_TESTCONTEXT void test_basic()
Definition: test_xpub_manual_last_value.cpp:8
TEST_ASSERT_EQUAL_INT
#define TEST_ASSERT_EQUAL_INT(expected, actual)
Definition: unity.h:128
ZMQ_SNDHWM
#define ZMQ_SNDHWM
Definition: zmq.h:293
send_string_expect_success
void send_string_expect_success(void *socket_, const char *str_, int flags_)
Definition: testutil_unity.cpp:94
zmq_recv
ZMQ_EXPORT int zmq_recv(void *s_, void *buf_, size_t len_, int flags_)
Definition: zmq.cpp:487
ZMQ_UNSUBSCRIBE
#define ZMQ_UNSUBSCRIBE
Definition: zmq.h:279
recv_string_expect_success
void recv_string_expect_success(void *socket_, const char *str_, int flags_)
Definition: testutil_unity.cpp:101
setup_test_environment
void setup_test_environment(int timeout_seconds_)
Definition: testutil.cpp:201
UNITY_BEGIN
UNITY_BEGIN()
ZMQ_SNDMORE
#define ZMQ_SNDMORE
Definition: zmq.h:359
TEST_ASSERT_FAILURE_ERRNO
#define TEST_ASSERT_FAILURE_ERRNO(error_code, expr)
Definition: testutil_unity.hpp:95
test_missing_subscriptions
void test_missing_subscriptions()
Definition: test_xpub_manual_last_value.cpp:243
test_context_socket_close
void * test_context_socket_close(void *socket_)
Definition: testutil_unity.cpp:208
TEST_ASSERT_SUCCESS_ERRNO
#define TEST_ASSERT_SUCCESS_ERRNO(expr)
Definition: proxy_thr.cpp:47


libaditof
Author(s):
autogenerated on Wed May 21 2025 02:06:59