proxy_thr.cpp
Go to the documentation of this file.
1 /* SPDX-License-Identifier: MPL-2.0 */
2 #include "../include/zmq.h"
3 
4 #include <stdio.h>
5 #include <stdlib.h>
6 #include <string.h>
7 #include <assert.h>
8 #include <time.h>
9 #include <stdarg.h>
10 #include <string.h>
11 #include <string>
12 
13 #include "platform.hpp"
14 
15 #if defined ZMQ_HAVE_WINDOWS
16 #include <windows.h>
17 #include <process.h>
18 #else
19 #include <pthread.h>
20 #include <unistd.h>
21 #endif
22 
23 
24 /*
25  Asynchronous proxy benchmark using ZMQ_XPUB_NODROP.
26 
27  Topology:
28 
29  XPUB SUB
30  | |
31  +-----> XSUB -> XPUB -----/
32  | ^^^^^^^^^^^^
33  XPUB ZMQ proxy
34 
35  All connections use "inproc" transport. The two XPUB sockets start
36  flooding the proxy. The throughput is computed using the bytes received
37  in the SUB socket.
38 */
39 
40 
41 #define HWM 10000
42 
43 #ifndef ARRAY_SIZE
44 #define ARRAY_SIZE(x) (sizeof (x) / sizeof (*x))
45 #endif
46 
47 #define TEST_ASSERT_SUCCESS_ERRNO(expr) \
48  test_assert_success_message_errno_helper (expr, NULL, #expr)
49 
50 // This macro is used to avoid-variable warning. If used with an expression,
51 // the sizeof is not evaluated to avoid polluting the assembly code.
52 #ifdef NDEBUG
53 #define ASSERT_EXPR_SAFE(x) \
54  do { \
55  (void) sizeof (x); \
56  } while (0)
57 #else
58 #define ASSERT_EXPR_SAFE(x) assert (x)
59 #endif
60 
61 
62 static uint64_t message_count = 0;
63 static size_t message_size = 0;
64 
65 
66 typedef struct
67 {
68  void *context;
70  const char *frontend_endpoint[4];
71  const char *backend_endpoint[4];
72  const char *control_endpoint;
74 
75 
77  const char *msg_,
78  const char *expr_)
79 {
80  if (rc_ == -1) {
81  char buffer[512];
82  buffer[sizeof (buffer) - 1] =
83  0; // to ensure defined behavior with VC++ <= 2013
84  printf ("%s failed%s%s%s, errno = %i (%s)", expr_,
85  msg_ ? " (additional info: " : "", msg_ ? msg_ : "",
86  msg_ ? ")" : "", zmq_errno (), zmq_strerror (zmq_errno ()));
87  exit (1);
88  }
89  return rc_;
90 }
91 
92 static void set_hwm (void *skt)
93 {
94  int hwm = HWM;
95 
97  zmq_setsockopt (skt, ZMQ_SNDHWM, &hwm, sizeof (hwm)));
98 
100  zmq_setsockopt (skt, ZMQ_RCVHWM, &hwm, sizeof (hwm)));
101 }
102 
103 static void publisher_thread_main (void *pvoid)
104 {
105  const proxy_hwm_cfg_t *cfg = (proxy_hwm_cfg_t *) pvoid;
106  const int idx = cfg->thread_idx;
107  int optval;
108  int rc;
109 
110  void *pubsocket = zmq_socket (cfg->context, ZMQ_XPUB);
111  assert (pubsocket);
112 
113  set_hwm (pubsocket);
114 
115  optval = 1;
117  zmq_setsockopt (pubsocket, ZMQ_XPUB_NODROP, &optval, sizeof (optval)));
118 
119  optval = 1;
121  zmq_setsockopt (pubsocket, ZMQ_SNDTIMEO, &optval, sizeof (optval)));
122 
124  zmq_connect (pubsocket, cfg->frontend_endpoint[idx]));
125 
126  // Wait before starting TX operations till 1 subscriber has subscribed
127  // (in this test there's 1 subscriber only)
128  char buffer[32] = {};
130  zmq_recv (pubsocket, buffer, sizeof (buffer), 0));
131  if (rc != 1) {
132  printf ("invalid response length: expected 1, received %d", rc);
133  exit (1);
134  }
135  if (buffer[0] != 1) {
136  printf ("invalid response value: expected 1, received %d",
137  (int) buffer[0]);
138  exit (1);
139  }
140 
141  zmq_msg_t msg_orig;
142  rc = zmq_msg_init_size (&msg_orig, message_size);
143  assert (rc == 0);
144  memset (zmq_msg_data (&msg_orig), 'A', zmq_msg_size (&msg_orig));
145 
146  uint64_t send_count = 0;
147  while (send_count < message_count) {
148  zmq_msg_t msg;
149  zmq_msg_init (&msg);
150  rc = zmq_msg_copy (&msg, &msg_orig);
151  assert (rc == 0);
152 
153  // Send the message to the socket
154  rc = zmq_msg_send (&msg, pubsocket, 0);
155  if (rc != -1) {
156  send_count++;
157  } else {
159  }
160  }
161 
162  zmq_close (pubsocket);
163  //printf ("publisher thread ended\n");
164 }
165 
166 static void subscriber_thread_main (void *pvoid)
167 {
168  const proxy_hwm_cfg_t *cfg = (proxy_hwm_cfg_t *) pvoid;
169  const int idx = cfg->thread_idx;
170 
171  void *subsocket = zmq_socket (cfg->context, ZMQ_SUB);
172  assert (subsocket);
173 
174  set_hwm (subsocket);
175 
177 
179  zmq_connect (subsocket, cfg->backend_endpoint[idx]));
180 
181  // Receive message_count messages
182  uint64_t rxsuccess = 0;
183  bool success = true;
184  while (success) {
185  zmq_msg_t msg;
186  int rc = zmq_msg_init (&msg);
187  assert (rc == 0);
188 
189  rc = zmq_msg_recv (&msg, subsocket, 0);
190  if (rc != -1) {
192  rxsuccess++;
193  }
194 
195  if (rxsuccess == message_count)
196  break;
197  }
198 
199  // Cleanup
200 
201  zmq_close (subsocket);
202  //printf ("subscriber thread ended\n");
203 }
204 
205 static void proxy_thread_main (void *pvoid)
206 {
207  const proxy_hwm_cfg_t *cfg = (proxy_hwm_cfg_t *) pvoid;
208  int rc;
209 
210  // FRONTEND SUB
211 
212  void *frontend_xsub = zmq_socket (
213  cfg->context,
214  ZMQ_XSUB); // the frontend is the one exposed to internal threads (INPROC)
215  assert (frontend_xsub);
216 
217  set_hwm (frontend_xsub);
218 
219  // Bind FRONTEND
220  for (unsigned int i = 0; i < ARRAY_SIZE (cfg->frontend_endpoint); i++) {
221  const char *ep = cfg->frontend_endpoint[i];
222  if (ep != NULL) {
223  assert (strlen (ep) > 5);
224  rc = zmq_bind (frontend_xsub, ep);
225  ASSERT_EXPR_SAFE (rc == 0);
226  }
227  }
228 
229  // BACKEND PUB
230 
231  void *backend_xpub = zmq_socket (
232  cfg->context,
233  ZMQ_XPUB); // the backend is the one exposed to the external world (TCP)
234  assert (backend_xpub);
235 
236  int optval = 1;
237  rc =
238  zmq_setsockopt (backend_xpub, ZMQ_XPUB_NODROP, &optval, sizeof (optval));
239  ASSERT_EXPR_SAFE (rc == 0);
240 
241  set_hwm (backend_xpub);
242 
243  // Bind BACKEND
244  for (unsigned int i = 0; i < ARRAY_SIZE (cfg->backend_endpoint); i++) {
245  const char *ep = cfg->backend_endpoint[i];
246  if (ep != NULL) {
247  assert (strlen (ep) > 5);
248  rc = zmq_bind (backend_xpub, ep);
249  ASSERT_EXPR_SAFE (rc == 0);
250  }
251  }
252 
253  // CONTROL REP
254 
255  void *control_rep = zmq_socket (
256  cfg->context,
257  ZMQ_REP); // This one is used by the proxy to receive&reply to commands
258  assert (control_rep);
259 
260  // Bind CONTROL
261  rc = zmq_bind (control_rep, cfg->control_endpoint);
262  ASSERT_EXPR_SAFE (rc == 0);
263 
264  // Start proxying!
265 
266  zmq_proxy_steerable (frontend_xsub, backend_xpub, NULL, control_rep);
267 
268  zmq_close (frontend_xsub);
269  zmq_close (backend_xpub);
270  zmq_close (control_rep);
271  //printf ("proxy thread ended\n");
272 }
273 
275 {
276  // CONTROL REQ
277 
278  void *control_req = zmq_socket (
279  cfg->context,
280  ZMQ_REQ); // This one can be used to send command to the proxy
281  assert (control_req);
282 
283  // Connect CONTROL-REQ: a socket to which send commands
284  int rc = zmq_connect (control_req, cfg->control_endpoint);
285  ASSERT_EXPR_SAFE (rc == 0);
286 
287  // Ask the proxy to exit: the subscriber has received all messages
288 
289  rc = zmq_send (control_req, "TERMINATE", 9, 0);
290  ASSERT_EXPR_SAFE (rc == 9);
291 
292  zmq_close (control_req);
293 }
294 
295 // The main thread simply starts some publishers, a proxy,
296 // and a subscriber. Finish when all packets are received.
297 
298 int main (int argc, char *argv[])
299 {
300  if (argc != 3) {
301  printf ("usage: proxy_thr <message-size> <message-count>\n");
302  return 1;
303  }
304 
305  message_size = atoi (argv[1]);
306  message_count = atoi (argv[2]);
307  printf ("message size: %d [B]\n", (int) message_size);
308  printf ("message count: %d\n", (int) message_count);
309 
310  void *context = zmq_ctx_new ();
311  assert (context);
312 
313  int rv = zmq_ctx_set (context, ZMQ_IO_THREADS, 4);
314  ASSERT_EXPR_SAFE (rv == 0);
315 
316  // START ALL SECONDARY THREADS
317 
318  const char *pub1 = "inproc://perf_pub1";
319  const char *pub2 = "inproc://perf_pub2";
320  const char *sub1 = "inproc://perf_backend";
321 
322  proxy_hwm_cfg_t cfg_global = {};
323  cfg_global.context = context;
324  cfg_global.frontend_endpoint[0] = pub1;
325  cfg_global.frontend_endpoint[1] = pub2;
326  cfg_global.backend_endpoint[0] = sub1;
327  cfg_global.control_endpoint = "inproc://ctrl";
328 
329  // Proxy
330  proxy_hwm_cfg_t cfg_proxy = cfg_global;
331  void *proxy = zmq_threadstart (&proxy_thread_main, (void *) &cfg_proxy);
332  assert (proxy != 0);
333 
334  // Subscriber 1
335  proxy_hwm_cfg_t cfg_sub1 = cfg_global;
336  cfg_sub1.thread_idx = 0;
337  void *subscriber =
338  zmq_threadstart (&subscriber_thread_main, (void *) &cfg_sub1);
339  assert (subscriber != 0);
340 
341  // Start measuring
342  void *watch = zmq_stopwatch_start ();
343 
344  // Publisher 1
345  proxy_hwm_cfg_t cfg_pub1 = cfg_global;
346  cfg_pub1.thread_idx = 0;
347  void *publisher1 =
348  zmq_threadstart (&publisher_thread_main, (void *) &cfg_pub1);
349  assert (publisher1 != 0);
350 
351  // Publisher 2
352  proxy_hwm_cfg_t cfg_pub2 = cfg_global;
353  cfg_pub2.thread_idx = 1;
354  void *publisher2 =
355  zmq_threadstart (&publisher_thread_main, (void *) &cfg_pub2);
356  assert (publisher2 != 0);
357 
358  // Wait for all packets to be received
359  zmq_threadclose (subscriber);
360 
361  // Stop measuring
362  unsigned long elapsed = zmq_stopwatch_stop (watch);
363  if (elapsed == 0)
364  elapsed = 1;
365 
366  unsigned long throughput =
367  (unsigned long) ((double) message_count / (double) elapsed * 1000000);
368  double megabits = (double) (throughput * message_size * 8) / 1000000;
369 
370  printf ("mean throughput: %d [msg/s]\n", (int) throughput);
371  printf ("mean throughput: %.3f [Mb/s]\n", (double) megabits);
372 
373  // Wait for the end of publishers...
374  zmq_threadclose (publisher1);
375  zmq_threadclose (publisher2);
376 
377  // ... then close the proxy
378  terminate_proxy (&cfg_proxy);
380 
381  int rc = zmq_ctx_term (context);
382  ASSERT_EXPR_SAFE (rc == 0);
383 
384  return 0;
385 }
message_size
static size_t message_size
Definition: proxy_thr.cpp:63
zmq_strerror
const ZMQ_EXPORT char * zmq_strerror(int errnum_)
Definition: zmq.cpp:96
zmq_ctx_set
ZMQ_EXPORT int zmq_ctx_set(void *context_, int option_, int optval_)
Definition: zmq.cpp:156
zmq::proxy
void proxy(void *frontend, void *backend, void *capture)
Definition: zmq.hpp:2274
NULL
NULL
Definition: test_security_zap.cpp:405
ZMQ_XPUB
#define ZMQ_XPUB
Definition: zmq.h:267
zmq_threadstart
ZMQ_EXPORT void * zmq_threadstart(zmq_thread_fn *func_, void *arg_)
Definition: zmq_utils.cpp:54
zmq_errno
ZMQ_EXPORT int zmq_errno(void)
Definition: zmq.cpp:101
zmq_msg_send
ZMQ_EXPORT int zmq_msg_send(zmq_msg_t *msg_, void *s_, int flags_)
Definition: zmq.cpp:609
proxy_hwm_cfg_t::backend_endpoint
const char * backend_endpoint[4]
Definition: proxy_thr.cpp:71
zmq_ctx_new
ZMQ_EXPORT void * zmq_ctx_new(void)
Definition: zmq.cpp:109
ZMQ_SUBSCRIBE
#define ZMQ_SUBSCRIBE
Definition: zmq.h:278
ARRAY_SIZE
#define ARRAY_SIZE(x)
Definition: proxy_thr.cpp:44
ZMQ_XSUB
#define ZMQ_XSUB
Definition: zmq.h:268
ZMQ_SUB
#define ZMQ_SUB
Definition: zmq.h:260
publisher_thread_main
static void publisher_thread_main(void *pvoid)
Definition: proxy_thr.cpp:103
idx
static uint32_t idx(tarjan *t, const upb_refcounted *r)
Definition: ruby/ext/google/protobuf_c/upb.c:5925
ZMQ_REQ
#define ZMQ_REQ
Definition: zmq.h:261
zmq_msg_data
ZMQ_EXPORT void * zmq_msg_data(zmq_msg_t *msg_)
Definition: zmq.cpp:642
zmq_connect
ZMQ_EXPORT int zmq_connect(void *s_, const char *addr_)
Definition: zmq.cpp:307
test_assert_success_message_errno_helper
int test_assert_success_message_errno_helper(int rc_, const char *msg_, const char *expr_)
Definition: proxy_thr.cpp:76
proxy_hwm_cfg_t::frontend_endpoint
const char * frontend_endpoint[4]
Definition: proxy_thr.cpp:70
zmq_msg_size
ZMQ_EXPORT size_t zmq_msg_size(const zmq_msg_t *msg_)
Definition: zmq.cpp:647
proxy_hwm_cfg_t::thread_idx
int thread_idx
Definition: proxy_thr.cpp:69
ZMQ_RCVHWM
#define ZMQ_RCVHWM
Definition: zmq.h:294
zmq_setsockopt
ZMQ_EXPORT int zmq_setsockopt(void *s_, int option_, const void *optval_, size_t optvallen_)
Definition: zmq.cpp:250
ZMQ_SNDTIMEO
#define ZMQ_SNDTIMEO
Definition: zmq.h:297
zmq_threadclose
ZMQ_EXPORT void zmq_threadclose(void *thread_)
Definition: zmq_utils.cpp:62
buffer
GLuint buffer
Definition: glcorearb.h:2939
subscriber_thread_main
static void subscriber_thread_main(void *pvoid)
Definition: proxy_thr.cpp:166
zmq_stopwatch_start
ZMQ_EXPORT void * zmq_stopwatch_start(void)
Definition: zmq_utils.cpp:32
zmq_msg_t
Definition: zmq.h:218
ZMQ_REP
#define ZMQ_REP
Definition: zmq.h:262
zmq_bind
ZMQ_EXPORT int zmq_bind(void *s_, const char *addr_)
Definition: zmq.cpp:299
main
int main(int argc, char *argv[])
Definition: proxy_thr.cpp:298
zmq_msg_recv
ZMQ_EXPORT int zmq_msg_recv(zmq_msg_t *msg_, void *s_, int flags_)
Definition: zmq.cpp:617
zmq_socket
ZMQ_EXPORT void * zmq_socket(void *, int type_)
Definition: zmq.cpp:230
buffer
Definition: buffer_processor.h:43
zmq_msg_init
ZMQ_EXPORT int zmq_msg_init(zmq_msg_t *msg_)
Definition: zmq.cpp:587
zmq_close
ZMQ_EXPORT int zmq_close(void *s_)
Definition: zmq.cpp:241
ASSERT_EXPR_SAFE
#define ASSERT_EXPR_SAFE(x)
Definition: proxy_thr.cpp:58
i
int i
Definition: gmock-matchers_test.cc:764
ZMQ_SNDHWM
#define ZMQ_SNDHWM
Definition: zmq.h:293
proxy_thread_main
static void proxy_thread_main(void *pvoid)
Definition: proxy_thr.cpp:205
proxy_hwm_cfg_t::context
void * context
Definition: proxy_thr.cpp:68
HWM
#define HWM
Definition: proxy_thr.cpp:41
zmq_recv
ZMQ_EXPORT int zmq_recv(void *s_, void *buf_, size_t len_, int flags_)
Definition: zmq.cpp:487
zmq_proxy_steerable
ZMQ_EXPORT int zmq_proxy_steerable(void *frontend_, void *backend_, void *capture_, void *control_)
Definition: zmq.cpp:1738
terminate_proxy
void terminate_proxy(const proxy_hwm_cfg_t *cfg)
Definition: proxy_thr.cpp:274
zmq_msg_init_size
ZMQ_EXPORT int zmq_msg_init_size(zmq_msg_t *msg_, size_t size_)
Definition: zmq.cpp:592
set_hwm
static void set_hwm(void *skt)
Definition: proxy_thr.cpp:92
zmq_msg_copy
ZMQ_EXPORT int zmq_msg_copy(zmq_msg_t *dest_, zmq_msg_t *src_)
Definition: zmq.cpp:636
proxy_hwm_cfg_t::control_endpoint
const char * control_endpoint
Definition: proxy_thr.cpp:72
ZMQ_IO_THREADS
#define ZMQ_IO_THREADS
Definition: zmq.h:181
zmq_ctx_term
ZMQ_EXPORT int zmq_ctx_term(void *context_)
Definition: zmq.cpp:128
zmq_send
ZMQ_EXPORT int zmq_send(void *s_, const void *buf_, size_t len_, int flags_)
Definition: zmq.cpp:377
assert.h
message_count
static uint64_t message_count
Definition: proxy_thr.cpp:62
zmq_msg_close
ZMQ_EXPORT int zmq_msg_close(zmq_msg_t *msg_)
Definition: zmq.cpp:625
proxy_hwm_cfg_t
Definition: proxy_thr.cpp:66
TEST_ASSERT_SUCCESS_ERRNO
#define TEST_ASSERT_SUCCESS_ERRNO(expr)
Definition: proxy_thr.cpp:47
ep
const SETUP_TEARDOWN_TESTCONTEXT char ep[]
Definition: test_term_endpoint_tipc.cpp:8
zmq_stopwatch_stop
ZMQ_EXPORT unsigned long zmq_stopwatch_stop(void *watch_)
Definition: zmq_utils.cpp:47
google::protobuf.internal.decoder.long
long
Definition: decoder.py:89
ZMQ_XPUB_NODROP
#define ZMQ_XPUB_NODROP
Definition: zmq.h:330


libaditof
Author(s):
autogenerated on Wed May 21 2025 02:06:58