test_proxy.cpp
Go to the documentation of this file.
1 /* SPDX-License-Identifier: MPL-2.0 */
2 
3 #include "testutil.hpp"
4 #include "testutil_unity.hpp"
5 
6 #include <stdlib.h>
7 #include <string.h>
8 
9 #define CONTENT_SIZE 13
10 #define CONTENT_SIZE_MAX 32
11 #define ROUTING_ID_SIZE 10
12 #define ROUTING_ID_SIZE_MAX 32
13 #define QT_WORKERS 5
14 #define QT_CLIENTS 3
15 #define is_verbose 0
16 
18 {
19  int id;
20 };
21 
25 
26 void setUp ()
27 {
29 }
30 
31 
32 // Asynchronous client-to-server (DEALER to ROUTER) - pure libzmq
33 //
34 // While this example runs in a single process, that is to make
35 // it easier to start and stop the example. Each task may have its own
36 // context and conceptually acts as a separate process. To have this
37 // behaviour, it is necessary to replace the inproc transport of the
38 // control socket by a tcp transport.
39 
40 // This is our client task
41 // It connects to the server, and then sends a request once per second
42 // It collects responses as they arrive, and it prints them out. We will
43 // run several client tasks in parallel, each with a different random ID.
44 
45 static void client_task (void *db_)
46 {
47  const thread_data *const databag = static_cast<const thread_data *> (db_);
48  // Endpoint socket gets random port to avoid test failing when port in use
49  void *endpoint = zmq_socket (get_test_context (), ZMQ_PAIR);
50  TEST_ASSERT_NOT_NULL (endpoint);
51  int linger = 0;
53  zmq_setsockopt (endpoint, ZMQ_LINGER, &linger, sizeof (linger)));
54  char endpoint_source[256];
55  snprintf (endpoint_source, 256 * sizeof (char), "inproc://endpoint%d",
56  databag->id);
57  TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (endpoint, endpoint_source));
58  char *my_endpoint = s_recv (endpoint);
60 
63 
64  // Control socket receives terminate command from main over inproc
65  void *control = zmq_socket (control_context, ZMQ_SUB);
66  TEST_ASSERT_NOT_NULL (control);
69  zmq_setsockopt (control, ZMQ_LINGER, &linger, sizeof (linger)));
70  TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (control, "inproc://control"));
71 
72  char content[CONTENT_SIZE_MAX] = {};
73  // Set random routing id to make tracing easier
74  char routing_id[ROUTING_ID_SIZE] = {};
75  snprintf (routing_id, ROUTING_ID_SIZE * sizeof (char), "%04X-%04X",
76  rand () % 0xFFFF, rand () % 0xFFFF);
78  client, ZMQ_ROUTING_ID, routing_id,
79  ROUTING_ID_SIZE)); // includes '\0' as an helper for printf
80  linger = 0;
82  zmq_setsockopt (client, ZMQ_LINGER, &linger, sizeof (linger)));
84 
85  zmq_pollitem_t items[] = {{client, 0, ZMQ_POLLIN, 0},
86  {control, 0, ZMQ_POLLIN, 0}};
87  int request_nbr = 0;
88  bool run = true;
89  bool keep_sending = true;
90  while (run) {
91  // Tick once per 200 ms, pulling in arriving messages
92  int centitick;
93  for (centitick = 0; centitick < 20; centitick++) {
94  zmq_poll (items, 2, 10);
95  if (items[0].revents & ZMQ_POLLIN) {
96  int rcvmore;
97  size_t sz = sizeof (rcvmore);
98  int rc = TEST_ASSERT_SUCCESS_ERRNO (
99  zmq_recv (client, content, CONTENT_SIZE_MAX, 0));
101  if (is_verbose)
102  printf (
103  "client receive - routing_id = %s content = %s\n",
104  routing_id, content);
105  // Check that message is still the same
106  TEST_ASSERT_EQUAL_STRING_LEN ("request #", content, 9);
108  zmq_getsockopt (client, ZMQ_RCVMORE, &rcvmore, &sz));
109  TEST_ASSERT_FALSE (rcvmore);
110  }
111  if (items[1].revents & ZMQ_POLLIN) {
112  int rc = zmq_recv (control, content, CONTENT_SIZE_MAX, 0);
113 
114  if (rc > 0) {
115  content[rc] = 0; // NULL-terminate the command string
116  if (is_verbose)
117  printf (
118  "client receive - routing_id = %s command = %s\n",
119  routing_id, content);
120  if (memcmp (content, "TERMINATE", 9) == 0) {
121  run = false;
122  break;
123  }
124  if (memcmp (content, "STOP", 4) == 0) {
125  keep_sending = false;
126  break;
127  }
128  }
129  }
130  }
131 
132  if (keep_sending) {
133  snprintf (content, CONTENT_SIZE_MAX * sizeof (char),
134  "request #%03d", ++request_nbr); // CONTENT_SIZE
135  if (is_verbose)
136  printf ("client send - routing_id = %s request #%03d\n",
137  routing_id, request_nbr);
139 
141  zmq_send (client, content, CONTENT_SIZE, 0));
142  }
143  }
144 
148  free (my_endpoint);
149 }
150 
151 // This is our server task.
152 // It uses the multithreaded server model to deal requests out to a pool
153 // of workers and route replies back to clients. One worker can handle
154 // one request at a time but one client can talk to multiple workers at
155 // once.
156 
157 static void server_worker (void * /*unused_*/);
158 
159 void server_task (void * /*unused_*/)
160 {
161  // Frontend socket talks to clients over TCP
163  void *frontend = zmq_socket (get_test_context (), ZMQ_ROUTER);
164  TEST_ASSERT_NOT_NULL (frontend);
165  int linger = 0;
167  zmq_setsockopt (frontend, ZMQ_LINGER, &linger, sizeof (linger)));
168  bind_loopback_ipv4 (frontend, my_endpoint, sizeof my_endpoint);
169 
170  // Backend socket talks to workers over inproc
171  void *backend = zmq_socket (get_test_context (), ZMQ_DEALER);
172  TEST_ASSERT_NOT_NULL (backend);
174  zmq_setsockopt (backend, ZMQ_LINGER, &linger, sizeof (linger)));
175  TEST_ASSERT_SUCCESS_ERRNO (zmq_bind (backend, "inproc://backend"));
176 
177  // Launch pool of worker threads, precise number is not critical
178  int thread_nbr;
179  void *threads[5];
180  for (thread_nbr = 0; thread_nbr < QT_WORKERS; thread_nbr++)
181  threads[thread_nbr] = zmq_threadstart (&server_worker, NULL);
182 
183  // Endpoint socket sends random port to avoid test failing when port in use
184  void *endpoint_receivers[QT_CLIENTS];
185  char endpoint_source[256];
186  for (int i = 0; i < QT_CLIENTS; ++i) {
187  endpoint_receivers[i] = zmq_socket (get_test_context (), ZMQ_PAIR);
188  TEST_ASSERT_NOT_NULL (endpoint_receivers[i]);
190  endpoint_receivers[i], ZMQ_LINGER, &linger, sizeof (linger)));
191  snprintf (endpoint_source, 256 * sizeof (char), "inproc://endpoint%d",
192  i);
194  zmq_bind (endpoint_receivers[i], endpoint_source));
195  }
196 
197  for (int i = 0; i < QT_CLIENTS; ++i) {
198  send_string_expect_success (endpoint_receivers[i], my_endpoint, 0);
199  }
200 
201  // Connect backend to frontend via a proxy
202  zmq_proxy (frontend, backend, NULL);
203 
204  for (thread_nbr = 0; thread_nbr < QT_WORKERS; thread_nbr++)
205  zmq_threadclose (threads[thread_nbr]);
206 
209  for (int i = 0; i < QT_CLIENTS; ++i) {
210  TEST_ASSERT_SUCCESS_ERRNO (zmq_close (endpoint_receivers[i]));
211  }
212 }
213 
214 // Each worker task works on one request at a time and sends a random number
215 // of replies back, with random delays between replies:
216 // The comments in the first column, if suppressed, makes it a poller version
217 
218 static void server_worker (void * /*unused_*/)
219 {
222  int linger = 0;
224  zmq_setsockopt (worker, ZMQ_LINGER, &linger, sizeof (linger)));
225  TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (worker, "inproc://backend"));
226 
227  // Control socket receives terminate command from main over inproc
228  void *control = zmq_socket (control_context, ZMQ_SUB);
229  TEST_ASSERT_NOT_NULL (control);
232  zmq_setsockopt (control, ZMQ_LINGER, &linger, sizeof (linger)));
233  TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (control, "inproc://control"));
234 
235  char content[CONTENT_SIZE_MAX] =
236  {}; // bigger than what we need to check that
237  char routing_id[ROUTING_ID_SIZE_MAX] =
238  {}; // the size received is the size sent
239 
240  bool run = true;
241  bool keep_sending = true;
242  while (run) {
243  int rc = zmq_recv (control, content, CONTENT_SIZE_MAX,
244  ZMQ_DONTWAIT); // usually, rc == -1 (no message)
245  if (rc > 0) {
246  content[rc] = 0; // NULL-terminate the command string
247  if (is_verbose)
248  printf ("server_worker receives command = %s\n", content);
249  if (memcmp (content, "TERMINATE", 9) == 0)
250  run = false;
251  if (memcmp (content, "STOP", 4) == 0)
252  keep_sending = false;
253  }
254  // The DEALER socket gives us the reply envelope and message
255  // if we don't poll, we have to use ZMQ_DONTWAIT, if we poll, we can block-receive with 0
256  rc = zmq_recv (worker, routing_id, ROUTING_ID_SIZE_MAX, ZMQ_DONTWAIT);
257  if (rc == ROUTING_ID_SIZE) {
258  rc = zmq_recv (worker, content, CONTENT_SIZE_MAX, 0);
260  if (is_verbose)
261  printf ("server receive - routing_id = %s content = %s\n",
262  routing_id, content);
263 
264  // Send 0..4 replies back
265  if (keep_sending) {
266  int reply, replies = rand () % 5;
267  for (reply = 0; reply < replies; reply++) {
268  // Sleep for some fraction of a second
269  msleep (rand () % 10 + 1);
270 
271  // Send message from server to client
272  if (is_verbose)
273  printf ("server send - routing_id = %s reply\n",
274  routing_id);
276 
277  rc = zmq_send (worker, routing_id, ROUTING_ID_SIZE,
278  ZMQ_SNDMORE);
280  rc = zmq_send (worker, content, CONTENT_SIZE, 0);
282  }
283  }
284  }
285  }
288 }
289 
290 // The main thread simply starts several clients and a server, and then
291 // waits for the server to finish.
292 
293 void test_proxy ()
294 {
299 
300  // Control socket receives terminate command from main over inproc
301  void *control = zmq_socket (control_context, ZMQ_PUB);
302  int linger = 0;
304  zmq_setsockopt (control, ZMQ_LINGER, &linger, sizeof (linger)));
305  TEST_ASSERT_SUCCESS_ERRNO (zmq_bind (control, "inproc://control"));
306 
307  void *threads[QT_CLIENTS + 1];
308  struct thread_data databags[QT_CLIENTS + 1];
309  for (int i = 0; i < QT_CLIENTS; i++) {
310  databags[i].id = i;
311  threads[i] = zmq_threadstart (&client_task, &databags[i]);
312  }
314  msleep (500); // Run for 500 ms then quit
315 
316  if (is_verbose)
317  printf ("stopping all clients and server workers\n");
318  send_string_expect_success (control, "STOP", 0);
319 
320  msleep (500); // Wait for all clients and workers to STOP
321 
322  if (is_verbose)
323  printf ("shutting down all clients and server workers\n");
324  send_string_expect_success (control, "TERMINATE", 0);
325 
326  msleep (500); // Wait for all clients and workers to terminate
327 
329 
330  for (int i = 0; i < QT_CLIENTS + 1; i++)
331  zmq_threadclose (threads[i]);
332 
335 }
336 
337 int main (void)
338 {
340 
341  UNITY_BEGIN ();
343  return UNITY_END ();
344 }
CONTENT_SIZE_MAX
#define CONTENT_SIZE_MAX
Definition: test_proxy.cpp:10
TEST_ASSERT_EQUAL_STRING_LEN
#define TEST_ASSERT_EQUAL_STRING_LEN(expected, actual, len)
Definition: unity.h:236
NULL
NULL
Definition: test_security_zap.cpp:405
UNITY_END
return UNITY_END()
ZMQ_PUB
#define ZMQ_PUB
Definition: zmq.h:259
zmq_threadstart
ZMQ_EXPORT void * zmq_threadstart(zmq_thread_fn *func_, void *arg_)
Definition: zmq_utils.cpp:54
msleep
void msleep(int milliseconds_)
Definition: testutil.cpp:227
zmq_poll
ZMQ_EXPORT int zmq_poll(zmq_pollitem_t *items_, int nitems_, long timeout_)
Definition: zmq.cpp:827
RUN_TEST
#define RUN_TEST(func)
Definition: unity_internals.h:615
zmq_ctx_new
ZMQ_EXPORT void * zmq_ctx_new(void)
Definition: zmq.cpp:109
setup_test_context
void setup_test_context()
Definition: testutil_unity.cpp:179
thread_data::id
int id
Definition: test_proxy.cpp:19
ZMQ_SUBSCRIBE
#define ZMQ_SUBSCRIBE
Definition: zmq.h:278
bind_loopback_ipv4
void bind_loopback_ipv4(void *socket_, char *my_endpoint_, size_t len_)
Definition: testutil_unity.cpp:246
zmq_pollitem_t
Definition: zmq.h:487
teardown_test_context
void teardown_test_context()
Definition: testutil_unity.cpp:189
get_test_context
void * get_test_context()
Definition: testutil_unity.cpp:184
client
void client(int num)
Definition: test_multithread.cpp:134
QT_WORKERS
#define QT_WORKERS
Definition: test_proxy.cpp:13
main
int main(void)
Definition: test_proxy.cpp:337
ZMQ_SUB
#define ZMQ_SUB
Definition: zmq.h:260
TEST_ASSERT_FALSE
#define TEST_ASSERT_FALSE(condition)
Definition: unity.h:123
zmq_ctx_destroy
ZMQ_EXPORT int zmq_ctx_destroy(void *context_)
Definition: zmq.cpp:212
s_recv
char * s_recv(void *socket_)
Definition: testutil.cpp:123
zmq_connect
ZMQ_EXPORT int zmq_connect(void *s_, const char *addr_)
Definition: zmq.cpp:307
thread_data
Definition: test_proxy.cpp:17
ZMQ_POLLIN
#define ZMQ_POLLIN
Definition: zmq.h:482
control_context
void * control_context
Definition: test_proxy.cpp:24
zmq_atomic_counter_inc
ZMQ_EXPORT int zmq_atomic_counter_inc(void *counter_)
Definition: zmq_utils.cpp:271
testutil_unity.hpp
snprintf
int snprintf(char *str, size_t size, const char *format,...)
Definition: port.cc:64
ZMQ_DEALER
#define ZMQ_DEALER
Definition: zmq.h:263
zmq_setsockopt
ZMQ_EXPORT int zmq_setsockopt(void *s_, int option_, const void *optval_, size_t optvallen_)
Definition: zmq.cpp:250
zmq_threadclose
ZMQ_EXPORT void zmq_threadclose(void *thread_)
Definition: zmq_utils.cpp:62
zmq_atomic_counter_new
ZMQ_EXPORT void * zmq_atomic_counter_new(void)
Definition: zmq_utils.cpp:255
QT_CLIENTS
#define QT_CLIENTS
Definition: test_proxy.cpp:14
ROUTING_ID_SIZE
#define ROUTING_ID_SIZE
Definition: test_proxy.cpp:11
is_verbose
#define is_verbose
Definition: test_proxy.cpp:15
testutil.hpp
ZMQ_ROUTER
#define ZMQ_ROUTER
Definition: zmq.h:264
setUp
void setUp()
Definition: test_proxy.cpp:26
worker
void worker(int num)
Definition: test_multithread.cpp:83
my_endpoint
char my_endpoint[MAX_SOCKET_STRING]
Definition: test_security_curve.cpp:31
MAX_SOCKET_STRING
#define MAX_SOCKET_STRING
Definition: libzmq/tests/testutil.hpp:35
zmq_bind
ZMQ_EXPORT int zmq_bind(void *s_, const char *addr_)
Definition: zmq.cpp:299
zmq_socket
ZMQ_EXPORT void * zmq_socket(void *, int type_)
Definition: zmq.cpp:230
client_task
static void client_task(void *db_)
Definition: test_proxy.cpp:45
ROUTING_ID_SIZE_MAX
#define ROUTING_ID_SIZE_MAX
Definition: test_proxy.cpp:12
ZMQ_DONTWAIT
#define ZMQ_DONTWAIT
Definition: zmq.h:358
TEST_ASSERT_EQUAL_INT
#define TEST_ASSERT_EQUAL_INT(expected, actual)
Definition: unity.h:128
CONTENT_SIZE
#define CONTENT_SIZE
Definition: test_proxy.cpp:9
zmq_close
ZMQ_EXPORT int zmq_close(void *s_)
Definition: zmq.cpp:241
i
int i
Definition: gmock-matchers_test.cc:764
test_proxy
void test_proxy()
Definition: test_proxy.cpp:293
ZMQ_LINGER
#define ZMQ_LINGER
Definition: zmq.h:288
send_string_expect_success
void send_string_expect_success(void *socket_, const char *str_, int flags_)
Definition: testutil_unity.cpp:94
ZMQ_PAIR
#define ZMQ_PAIR
Definition: zmq.h:258
zmq_recv
ZMQ_EXPORT int zmq_recv(void *s_, void *buf_, size_t len_, int flags_)
Definition: zmq.cpp:487
zmq_proxy
ZMQ_EXPORT int zmq_proxy(void *frontend_, void *backend_, void *capture_)
Definition: zmq.cpp:1726
setup_test_environment
void setup_test_environment(int timeout_seconds_)
Definition: testutil.cpp:201
UNITY_BEGIN
UNITY_BEGIN()
ZMQ_SNDMORE
#define ZMQ_SNDMORE
Definition: zmq.h:359
ZMQ_ROUTING_ID
#define ZMQ_ROUTING_ID
Definition: zmq.h:277
g_clients_pkts_out
void * g_clients_pkts_out
Definition: test_proxy.cpp:22
zmq_send
ZMQ_EXPORT int zmq_send(void *s_, const void *buf_, size_t len_, int flags_)
Definition: zmq.cpp:377
g_workers_pkts_out
void * g_workers_pkts_out
Definition: test_proxy.cpp:23
TEST_ASSERT_NOT_NULL
#define TEST_ASSERT_NOT_NULL(pointer)
Definition: unity.h:125
ZMQ_RCVMORE
#define ZMQ_RCVMORE
Definition: zmq.h:284
TEST_ASSERT_SUCCESS_ERRNO
#define TEST_ASSERT_SUCCESS_ERRNO(expr)
Definition: proxy_thr.cpp:47
zmq_getsockopt
ZMQ_EXPORT int zmq_getsockopt(void *s_, int option_, void *optval_, size_t *optvallen_)
Definition: zmq.cpp:261
server_worker
static void server_worker(void *)
Definition: test_proxy.cpp:218
server_task
void server_task(void *)
Definition: test_proxy.cpp:159


libaditof
Author(s):
autogenerated on Wed May 21 2025 02:06:59