test_proxy_hwm.cpp
Go to the documentation of this file.
1 /* SPDX-License-Identifier: MPL-2.0 */
2 
3 #include "testutil.hpp"
4 #include "testutil_unity.hpp"
5 #include <string.h>
6 #include <unity.h>
7 #include <assert.h>
8 #include <unistd.h>
9 
10 //
11 // Asynchronous proxy test using ZMQ_XPUB_NODROP and HWM:
12 //
13 // Topology:
14 //
15 // XPUB SUB
16 // | |
17 // \-----> XSUB -> XPUB -----/
18 // ^^^^^^^^^^^^^^
19 // ZMQ proxy
20 //
21 // All connections use "inproc" transport and have artificially-low HWMs set.
22 // Then the PUB socket starts flooding the Proxy. The SUB is artificially slow
23 // at receiving messages.
24 // This scenario simulates what happens when a SUB is slower than
25 // its (X)PUB: since ZMQ_XPUB_NODROP=1, the XPUB will block and then
26 // also the (X)PUB socket will block.
27 // The exact number of the messages that go through before (X)PUB blocks depends
28 // on ZeroMQ internals and how the OS will schedule the different threads.
29 // In the meanwhile asking statistics to the Proxy must NOT be blocking.
30 //
31 
32 
33 #define HWM 10
34 #define NUM_BYTES_PER_MSG 50000
35 
36 
37 typedef struct
38 {
39  void *context;
40  const char *frontend_endpoint;
41  const char *backend_endpoint;
42  const char *control_endpoint;
43 
46 
47 static void lower_hwm (void *skt_)
48 {
49  int send_hwm = HWM;
51  zmq_setsockopt (skt_, ZMQ_SNDHWM, &send_hwm, sizeof (send_hwm)));
52 
54  zmq_setsockopt (skt_, ZMQ_RCVHWM, &send_hwm, sizeof (send_hwm)));
55 }
56 
57 static void publisher_thread_main (void *pvoid_)
58 {
59  const proxy_hwm_cfg_t *const cfg =
60  static_cast<const proxy_hwm_cfg_t *> (pvoid_);
61 
62  void *pubsocket = zmq_socket (cfg->context, ZMQ_XPUB);
63  assert (pubsocket);
64 
65  lower_hwm (pubsocket);
66 
68 
69  int optval = 1;
71  zmq_setsockopt (pubsocket, ZMQ_XPUB_NODROP, &optval, sizeof (optval)));
72 
73  // Wait before starting TX operations till 1 subscriber has subscribed
74  // (in this test there's 1 subscriber only)
75  const char subscription_to_all_topics[] = {1, 0};
76  recv_string_expect_success (pubsocket, subscription_to_all_topics, 0);
77 
78  uint64_t send_count = 0;
79  while (true) {
80  zmq_msg_t msg;
81  int rc = zmq_msg_init_size (&msg, NUM_BYTES_PER_MSG);
82  assert (rc == 0);
83 
84  /* Fill in message content with 'AAAAAA' */
85  memset (zmq_msg_data (&msg), 'A', NUM_BYTES_PER_MSG);
86 
87  /* Send the message to the socket */
88  rc = zmq_msg_send (&msg, pubsocket, ZMQ_DONTWAIT);
89  if (rc != -1) {
90  send_count++;
91  } else {
93  break;
94  }
95  }
96 
97  // VERIFY EXPECTED RESULTS
98  // EXPLANATION FOR TX TO BE CONSIDERED SUCCESSFUL:
99  // this test has 3 threads doing I/O across 2 queues. Depending on the scheduling,
100  // it might happen that 20, 30 or 40 messages go through before the pub blocks.
101  // That's because the receiver thread gets kicked once every (hwm_ + 1) / 2 sent
102  // messages (search for zeromq sources compute_lwm function).
103  // So depending on the scheduling of the second thread, the publisher might get one,
104  // two or three more batches in. The ceiling is 40 as there's 2 queues.
105  //
106  assert (4 * HWM >= send_count && 2 * HWM <= send_count);
107 
108  // CLEANUP
109 
110  zmq_close (pubsocket);
111 }
112 
113 static void subscriber_thread_main (void *pvoid_)
114 {
115  const proxy_hwm_cfg_t *const cfg =
116  static_cast<const proxy_hwm_cfg_t *> (pvoid_);
117 
118  void *subsocket = zmq_socket (cfg->context, ZMQ_SUB);
119  assert (subsocket);
120 
121  lower_hwm (subsocket);
122 
124 
126 
127 
128  // receive all sent messages
129  uint64_t rxsuccess = 0;
130  bool success = true;
131  while (success) {
132  zmq_msg_t msg;
133  int rc = zmq_msg_init (&msg);
134  assert (rc == 0);
135 
136  rc = zmq_msg_recv (&msg, subsocket, 0);
137  if (rc != -1) {
139  rxsuccess++;
140 
141  // after receiving 1st message, set a finite timeout (default is infinite)
142  int timeout_ms = 100;
144  subsocket, ZMQ_RCVTIMEO, &timeout_ms, sizeof (timeout_ms)));
145  } else {
146  break;
147  }
148 
149  msleep (100);
150  }
151 
152 
153  // VERIFY EXPECTED RESULTS
154  // EXPLANATION FOR RX TO BE CONSIDERED SUCCESSFUL:
155  // see publisher thread why we have 3 possible outcomes as number of RX messages
156 
157  assert (4 * HWM >= rxsuccess && 2 * HWM <= rxsuccess);
158 
159  // INFORM THAT WE COMPLETED:
160 
162 
163  // CLEANUP
164 
165  zmq_close (subsocket);
166 }
167 
168 static void proxy_stats_asker_thread_main (void *pvoid_)
169 {
170  const proxy_hwm_cfg_t *const cfg =
171  static_cast<const proxy_hwm_cfg_t *> (pvoid_);
172 
173  // CONTROL REQ
174 
175  void *control_req =
176  zmq_socket (cfg->context,
177  ZMQ_REQ); // this one can be used to send command to the proxy
178  assert (control_req);
179 
180  // connect CONTROL-REQ: a socket to which send commands
181  int rc = zmq_connect (control_req, cfg->control_endpoint);
182  assert (rc == 0);
183 
184 
185  // IMPORTANT: by setting the tx/rx timeouts, we avoid getting blocked when interrogating a proxy which is
186  // itself blocked in a zmq_msg_send() on its XPUB socket having ZMQ_XPUB_NODROP=1!
187 
188  int optval = 10;
189  rc = zmq_setsockopt (control_req, ZMQ_SNDTIMEO, &optval, sizeof (optval));
190  assert (rc == 0);
191  rc = zmq_setsockopt (control_req, ZMQ_RCVTIMEO, &optval, sizeof (optval));
192  assert (rc == 0);
193 
194  optval = 10;
195  rc =
196  zmq_setsockopt (control_req, ZMQ_REQ_CORRELATE, &optval, sizeof (optval));
197  assert (rc == 0);
198 
199  rc =
200  zmq_setsockopt (control_req, ZMQ_REQ_RELAXED, &optval, sizeof (optval));
201  assert (rc == 0);
202 
203 
204  // Start!
205 
207  usleep (1000); // 1ms -> in best case we will get 1000updates/second
208  }
209 
210  zmq_close (control_req);
211 }
212 
213 static void proxy_thread_main (void *pvoid_)
214 {
215  const proxy_hwm_cfg_t *const cfg =
216  static_cast<const proxy_hwm_cfg_t *> (pvoid_);
217  int rc;
218 
219  // FRONTEND SUB
220 
221  void *frontend_xsub = zmq_socket (
222  cfg->context,
223  ZMQ_XSUB); // the frontend is the one exposed to internal threads (INPROC)
224  assert (frontend_xsub);
225 
226  lower_hwm (frontend_xsub);
227 
228  // bind FRONTEND
229  rc = zmq_bind (frontend_xsub, cfg->frontend_endpoint);
230  assert (rc == 0);
231 
232 
233  // BACKEND PUB
234 
235  void *backend_xpub = zmq_socket (
236  cfg->context,
237  ZMQ_XPUB); // the backend is the one exposed to the external world (TCP)
238  assert (backend_xpub);
239 
240  int optval = 1;
241  rc =
242  zmq_setsockopt (backend_xpub, ZMQ_XPUB_NODROP, &optval, sizeof (optval));
243  assert (rc == 0);
244 
245  lower_hwm (backend_xpub);
246 
247  // bind BACKEND
248  rc = zmq_bind (backend_xpub, cfg->backend_endpoint);
249  assert (rc == 0);
250 
251 
252  // CONTROL REP
253 
254  void *control_rep = zmq_socket (
255  cfg->context,
256  ZMQ_REP); // this one is used by the proxy to receive&reply to commands
257  assert (control_rep);
258 
259  // bind CONTROL
260  rc = zmq_bind (control_rep, cfg->control_endpoint);
261  assert (rc == 0);
262 
263 
264  // start proxying!
265 
266  zmq_proxy (frontend_xsub, backend_xpub, NULL);
267 
268  zmq_close (frontend_xsub);
269  zmq_close (backend_xpub);
270  zmq_close (control_rep);
271 }
272 
273 
274 // The main thread simply starts several clients and a server, and then
275 // waits for the server to finish.
276 
277 int main (void)
278 {
280 
281  void *context = zmq_ctx_new ();
282  assert (context);
283 
284 
285  // START ALL SECONDARY THREADS
286 
287  proxy_hwm_cfg_t cfg;
288  cfg.context = context;
289  cfg.frontend_endpoint = "inproc://frontend";
290  cfg.backend_endpoint = "inproc://backend";
291  cfg.control_endpoint = "inproc://ctrl";
293 
294  void *proxy = zmq_threadstart (&proxy_thread_main, (void *) &cfg);
295  assert (proxy != 0);
296  void *publisher = zmq_threadstart (&publisher_thread_main, (void *) &cfg);
297  assert (publisher != 0);
298  void *subscriber = zmq_threadstart (&subscriber_thread_main, (void *) &cfg);
299  assert (subscriber != 0);
300  void *asker =
302  assert (asker != 0);
303 
304 
305  // CLEANUP
306 
307  zmq_threadclose (publisher);
308  zmq_threadclose (subscriber);
309  zmq_threadclose (asker);
310 
311  int rc = zmq_ctx_term (context);
312  assert (rc == 0);
313 
315 
316  zmq_atomic_counter_destroy (&cfg.subscriber_received_all);
317 
318  return 0;
319 }
main
int main(void)
Definition: test_proxy_hwm.cpp:277
proxy_thread_main
static void proxy_thread_main(void *pvoid_)
Definition: test_proxy_hwm.cpp:213
NUM_BYTES_PER_MSG
#define NUM_BYTES_PER_MSG
Definition: test_proxy_hwm.cpp:34
zmq::proxy
void proxy(void *frontend, void *backend, void *capture)
Definition: zmq.hpp:2274
NULL
NULL
Definition: test_security_zap.cpp:405
ZMQ_XPUB
#define ZMQ_XPUB
Definition: zmq.h:267
proxy_stats_asker_thread_main
static void proxy_stats_asker_thread_main(void *pvoid_)
Definition: test_proxy_hwm.cpp:168
zmq_threadstart
ZMQ_EXPORT void * zmq_threadstart(zmq_thread_fn *func_, void *arg_)
Definition: zmq_utils.cpp:54
subscriber_thread_main
static void subscriber_thread_main(void *pvoid_)
Definition: test_proxy_hwm.cpp:113
msleep
void msleep(int milliseconds_)
Definition: testutil.cpp:227
zmq_msg_send
ZMQ_EXPORT int zmq_msg_send(zmq_msg_t *msg_, void *s_, int flags_)
Definition: zmq.cpp:609
proxy_hwm_cfg_t::backend_endpoint
const char * backend_endpoint[4]
Definition: proxy_thr.cpp:71
zmq_ctx_new
ZMQ_EXPORT void * zmq_ctx_new(void)
Definition: zmq.cpp:109
ZMQ_SUBSCRIBE
#define ZMQ_SUBSCRIBE
Definition: zmq.h:278
ZMQ_XSUB
#define ZMQ_XSUB
Definition: zmq.h:268
ZMQ_SUB
#define ZMQ_SUB
Definition: zmq.h:260
ZMQ_REQ
#define ZMQ_REQ
Definition: zmq.h:261
zmq_msg_data
ZMQ_EXPORT void * zmq_msg_data(zmq_msg_t *msg_)
Definition: zmq.cpp:642
zmq_connect
ZMQ_EXPORT int zmq_connect(void *s_, const char *addr_)
Definition: zmq.cpp:307
ZMQ_REQ_RELAXED
#define ZMQ_REQ_RELAXED
Definition: zmq.h:318
proxy_hwm_cfg_t::frontend_endpoint
const char * frontend_endpoint[4]
Definition: proxy_thr.cpp:70
zmq_atomic_counter_inc
ZMQ_EXPORT int zmq_atomic_counter_inc(void *counter_)
Definition: zmq_utils.cpp:271
testutil_unity.hpp
ZMQ_RCVHWM
#define ZMQ_RCVHWM
Definition: zmq.h:294
zmq_setsockopt
ZMQ_EXPORT int zmq_setsockopt(void *s_, int option_, const void *optval_, size_t optvallen_)
Definition: zmq.cpp:250
ZMQ_SNDTIMEO
#define ZMQ_SNDTIMEO
Definition: zmq.h:297
zmq_threadclose
ZMQ_EXPORT void zmq_threadclose(void *thread_)
Definition: zmq_utils.cpp:62
zmq_atomic_counter_new
ZMQ_EXPORT void * zmq_atomic_counter_new(void)
Definition: zmq_utils.cpp:255
testutil.hpp
publisher_thread_main
static void publisher_thread_main(void *pvoid_)
Definition: test_proxy_hwm.cpp:57
zmq_msg_t
Definition: zmq.h:218
ZMQ_REP
#define ZMQ_REP
Definition: zmq.h:262
proxy_hwm_cfg_t::subscriber_received_all
void * subscriber_received_all
Definition: test_proxy_hwm.cpp:44
zmq_bind
ZMQ_EXPORT int zmq_bind(void *s_, const char *addr_)
Definition: zmq.cpp:299
zmq_msg_recv
ZMQ_EXPORT int zmq_msg_recv(zmq_msg_t *msg_, void *s_, int flags_)
Definition: zmq.cpp:617
zmq_socket
ZMQ_EXPORT void * zmq_socket(void *, int type_)
Definition: zmq.cpp:230
ZMQ_DONTWAIT
#define ZMQ_DONTWAIT
Definition: zmq.h:358
zmq_msg_init
ZMQ_EXPORT int zmq_msg_init(zmq_msg_t *msg_)
Definition: zmq.cpp:587
zmq_close
ZMQ_EXPORT int zmq_close(void *s_)
Definition: zmq.cpp:241
unity.h
ZMQ_SNDHWM
#define ZMQ_SNDHWM
Definition: zmq.h:293
ZMQ_RCVTIMEO
#define ZMQ_RCVTIMEO
Definition: zmq.h:296
proxy_hwm_cfg_t::context
void * context
Definition: proxy_thr.cpp:68
zmq_proxy
ZMQ_EXPORT int zmq_proxy(void *frontend_, void *backend_, void *capture_)
Definition: zmq.cpp:1726
recv_string_expect_success
void recv_string_expect_success(void *socket_, const char *str_, int flags_)
Definition: testutil_unity.cpp:101
zmq_atomic_counter_value
ZMQ_EXPORT int zmq_atomic_counter_value(void *counter_)
Definition: zmq_utils.cpp:286
setup_test_environment
void setup_test_environment(int timeout_seconds_)
Definition: testutil.cpp:201
zmq_msg_init_size
ZMQ_EXPORT int zmq_msg_init_size(zmq_msg_t *msg_, size_t size_)
Definition: zmq.cpp:592
proxy_hwm_cfg_t::control_endpoint
const char * control_endpoint
Definition: proxy_thr.cpp:72
HWM
#define HWM
Definition: test_proxy_hwm.cpp:33
ZMQ_REQ_CORRELATE
#define ZMQ_REQ_CORRELATE
Definition: zmq.h:317
zmq_atomic_counter_destroy
ZMQ_EXPORT void zmq_atomic_counter_destroy(void **counter_p_)
Definition: zmq_utils.cpp:293
zmq_ctx_term
ZMQ_EXPORT int zmq_ctx_term(void *context_)
Definition: zmq.cpp:128
assert.h
lower_hwm
static void lower_hwm(void *skt_)
Definition: test_proxy_hwm.cpp:47
zmq_msg_close
ZMQ_EXPORT int zmq_msg_close(zmq_msg_t *msg_)
Definition: zmq.cpp:625
proxy_hwm_cfg_t
Definition: proxy_thr.cpp:66
TEST_ASSERT_SUCCESS_ERRNO
#define TEST_ASSERT_SUCCESS_ERRNO(expr)
Definition: proxy_thr.cpp:47
ZMQ_XPUB_NODROP
#define ZMQ_XPUB_NODROP
Definition: zmq.h:330


libaditof
Author(s):
autogenerated on Wed May 21 2025 02:06:59