test_proxy_steerable.cpp
Go to the documentation of this file.
1 /* SPDX-License-Identifier: MPL-2.0 */
2 
3 #include "testutil.hpp"
4 #include "testutil_unity.hpp"
5 
6 #include <stdlib.h>
7 #include <string.h>
8 #include <inttypes.h>
9 
10 #define CONTENT_SIZE 13
11 #define CONTENT_SIZE_MAX 32
12 #define ROUTING_ID_SIZE 10
13 #define ROUTING_ID_SIZE_MAX 32
14 #define QT_WORKERS 3
15 #define QT_CLIENTS 3
16 #define is_verbose 0
17 #define TEST_SLEEP_MS 500
18 
19 const char *proxy_control_address = "inproc://proxy_control";
20 
21 struct thread_data
22 {
23  int id;
24 };
25 
28 void *control_context = NULL; // worker control, not proxy control
29 
31  ZMQ_PAIR; //or ZMQ_PAIR, ZMQ_SUB (without statistics)
32 
33 void setUp ()
34 {
36 }
37 
38 
39 // Asynchronous client-to-server (DEALER to ROUTER) - pure libzmq
40 //
41 // While this example runs in a single process, that is to make
42 // it easier to start and stop the example. Each task may have its own
43 // context and conceptually acts as a separate process. To have this
44 // behaviour, it is necessary to replace the inproc transport of the
45 // control socket by a tcp transport.
46 
47 // This is our client task
48 // It connects to the server, and then sends a request once per second
49 // It collects responses as they arrive, and it prints them out. We will
50 // run several client tasks in parallel, each with a different random ID.
51 
52 static void client_task (void *db_)
53 {
54  const thread_data *const databag = static_cast<const thread_data *> (db_);
55  // Endpoint socket gets random port to avoid test failing when port in use
56  void *endpoint = zmq_socket (get_test_context (), ZMQ_PAIR);
57  TEST_ASSERT_NOT_NULL (endpoint);
58  int linger = 0;
60  zmq_setsockopt (endpoint, ZMQ_LINGER, &linger, sizeof (linger)));
61  char endpoint_source[256];
62  snprintf (endpoint_source, 256 * sizeof (char), "inproc://endpoint%d",
63  databag->id);
64  TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (endpoint, endpoint_source));
65  char *my_endpoint = s_recv (endpoint);
67 
70 
71  // Control socket receives terminate command from main over inproc
72  void *control = zmq_socket (control_context, ZMQ_SUB);
73  TEST_ASSERT_NOT_NULL (control);
75  linger = 0;
77  zmq_setsockopt (control, ZMQ_LINGER, &linger, sizeof (linger)));
78  TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (control, "inproc://control"));
79 
80  char content[CONTENT_SIZE_MAX] = {};
81  // Set random routing id to make tracing easier
82  char routing_id[ROUTING_ID_SIZE] = {};
83  snprintf (routing_id, ROUTING_ID_SIZE * sizeof (char), "%04X-%04X",
84  rand () % 0xFFFF, rand () % 0xFFFF);
86  client, ZMQ_ROUTING_ID, routing_id,
87  ROUTING_ID_SIZE)); // includes '\0' as an helper for printf
88  linger = 0;
90  zmq_setsockopt (client, ZMQ_LINGER, &linger, sizeof (linger)));
92 
93  zmq_pollitem_t items[] = {{client, 0, ZMQ_POLLIN, 0},
94  {control, 0, ZMQ_POLLIN, 0}};
95 
96  int request_nbr = 0;
97  bool run = true;
98  bool enable_send = false;
99  while (run) {
100  // Tick once per 200 ms, pulling in arriving messages
101  int centitick;
102  for (centitick = 0; centitick < 20; centitick++) {
103  zmq_poll (items, 2, 10);
104  if (items[0].revents & ZMQ_POLLIN) {
105  int rcvmore;
106  size_t sz = sizeof (rcvmore);
107  int rc = TEST_ASSERT_SUCCESS_ERRNO (
108  zmq_recv (client, content, CONTENT_SIZE_MAX, 0));
110  if (is_verbose)
111  printf (
112  "client receive - routing_id = %s content = %s\n",
113  routing_id, content);
114  // Check that message is still the same
115  TEST_ASSERT_EQUAL_STRING_LEN ("request #", content, 9);
117  zmq_getsockopt (client, ZMQ_RCVMORE, &rcvmore, &sz));
118  TEST_ASSERT_FALSE (rcvmore);
119  }
120  if (items[1].revents & ZMQ_POLLIN) {
121  int rc = zmq_recv (control, content, CONTENT_SIZE_MAX, 0);
122 
123  if (rc > 0) {
124  content[rc] = 0; // NULL-terminate the command string
125  if (is_verbose)
126  printf (
127  "client receive - routing_id = %s command = %s\n",
128  routing_id, content);
129  if (memcmp (content, "TERMINATE", 9) == 0) {
130  run = false;
131  } else if (memcmp (content, "STOP", 4) == 0) {
132  enable_send = false;
133  } else if (memcmp (content, "START", 5) == 0) {
134  enable_send = true;
135  }
136  break;
137  }
138  }
139  }
140 
141  if (enable_send) {
142  snprintf (content, CONTENT_SIZE_MAX * sizeof (char),
143  "request #%03d", ++request_nbr); // CONTENT_SIZE
144  if (is_verbose)
145  printf ("client send - routing_id = %s request #%03d\n",
146  routing_id, request_nbr);
148 
150  zmq_send (client, content, CONTENT_SIZE, 0));
151  }
152  }
153 
157  free (my_endpoint);
158 }
159 
160 // This is our server task.
161 // It uses the multithreaded server model to deal requests out to a pool
162 // of workers and route replies back to clients. One worker can handle
163 // one request at a time but one client can talk to multiple workers at
164 // once.
165 
166 static void server_worker (void * /*unused_*/);
167 
168 void server_task (void * /*unused_*/)
169 {
170  // Frontend socket talks to clients over TCP
172  void *frontend = zmq_socket (get_test_context (), ZMQ_ROUTER);
173  TEST_ASSERT_NOT_NULL (frontend);
174  int linger = 0;
176  zmq_setsockopt (frontend, ZMQ_LINGER, &linger, sizeof (linger)));
177  bind_loopback_ipv4 (frontend, my_endpoint, sizeof my_endpoint);
178 
179  // Backend socket talks to workers over inproc
180  void *backend = zmq_socket (get_test_context (), ZMQ_DEALER);
181  TEST_ASSERT_NOT_NULL (backend);
183  zmq_setsockopt (backend, ZMQ_LINGER, &linger, sizeof (linger)));
184  TEST_ASSERT_SUCCESS_ERRNO (zmq_bind (backend, "inproc://backend"));
185 
186  // Launch pool of worker threads, precise number is not critical
187  int thread_nbr;
188  void *threads[QT_WORKERS];
189  for (thread_nbr = 0; thread_nbr < QT_WORKERS; thread_nbr++)
190  threads[thread_nbr] = zmq_threadstart (&server_worker, NULL);
191 
192  // Endpoint socket sends random port to avoid test failing when port in use
193  void *endpoint_receivers[QT_CLIENTS];
194  char endpoint_source[256];
195  for (int i = 0; i < QT_CLIENTS; ++i) {
196  endpoint_receivers[i] = zmq_socket (get_test_context (), ZMQ_PAIR);
197  TEST_ASSERT_NOT_NULL (endpoint_receivers[i]);
199  endpoint_receivers[i], ZMQ_LINGER, &linger, sizeof (linger)));
200  snprintf (endpoint_source, 256 * sizeof (char), "inproc://endpoint%d",
201  i);
203  zmq_bind (endpoint_receivers[i], endpoint_source));
204  }
205 
206  for (int i = 0; i < QT_CLIENTS; ++i) {
207  send_string_expect_success (endpoint_receivers[i], my_endpoint, 0);
208  }
209 
210  // Proxy control socket
211  void *proxy_control =
213  TEST_ASSERT_NOT_NULL (proxy_control);
217  zmq_setsockopt (proxy_control, ZMQ_SUBSCRIBE, "", 0));
218  }
219 
220  // Connect backend to frontend via a steerable proxy
221  int rc = zmq_proxy_steerable (frontend, backend, NULL, proxy_control);
222  TEST_ASSERT_EQUAL_INT (0, rc);
223 
224  for (thread_nbr = 0; thread_nbr < QT_WORKERS; thread_nbr++) {
225  zmq_threadclose (threads[thread_nbr]);
226  }
227 
230  TEST_ASSERT_SUCCESS_ERRNO (zmq_close (proxy_control));
231  for (int i = 0; i < QT_CLIENTS; ++i) {
232  TEST_ASSERT_SUCCESS_ERRNO (zmq_close (endpoint_receivers[i]));
233  }
234 }
235 
236 // Each worker task works on one request at a time and sends a random number
237 // of replies back, with random delays between replies:
238 // The comments in the first column, if suppressed, makes it a poller version
239 
240 static void server_worker (void * /*unused_*/)
241 {
244  int linger = 0;
246  zmq_setsockopt (worker, ZMQ_LINGER, &linger, sizeof (linger)));
247  TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (worker, "inproc://backend"));
248 
249  // Control socket receives terminate command from main over inproc
250  void *control = zmq_socket (control_context, ZMQ_SUB);
251  TEST_ASSERT_NOT_NULL (control);
254  zmq_setsockopt (control, ZMQ_LINGER, &linger, sizeof (linger)));
255  TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (control, "inproc://control"));
256 
257  char content[CONTENT_SIZE_MAX] =
258  {}; // bigger than what we need to check that
259  char routing_id[ROUTING_ID_SIZE_MAX] =
260  {}; // the size received is the size sent
261 
262  zmq_pollitem_t items[] = {{control, 0, ZMQ_POLLIN, 0},
263  {worker, 0, ZMQ_POLLIN, 0}};
264  bool keep_sending = true;
265  while (true) {
266  zmq_poll (items, 2, 100);
267  if (items[0].revents & ZMQ_POLLIN) {
268  //Commands over the worker control socket
269  int rc = zmq_recv (control, content, CONTENT_SIZE_MAX, 0);
270  if (rc > 0) {
271  content[rc] = 0; // NULL-terminate the command string
272  if (is_verbose)
273  printf ("server_worker receives command = %s\n", content);
274  if (memcmp (content, "TERMINATE", 9) == 0)
275  break;
276  if (memcmp (content, "STOP", 4) == 0)
277  keep_sending = false;
278  }
279  }
280  if (items[1].revents & ZMQ_POLLIN) {
281  // The DEALER socket gives us the reply envelope and message
282  int rc = zmq_recv (worker, routing_id, ROUTING_ID_SIZE_MAX, 0);
283  if (rc != ROUTING_ID_SIZE) {
284  continue;
285  }
286  routing_id[rc] = 0; //null terminate
287  rc = zmq_recv (worker, content, CONTENT_SIZE_MAX, 0);
289  content[rc] = 0; //null terminate
290  if (is_verbose)
291  printf ("server receive - routing_id = %s content = %s\n",
292  routing_id, content);
293 
294  // Send 0..4 replies back
295  if (keep_sending) {
296  int reply, replies = rand () % 5;
297  for (reply = 0; reply < replies; reply++) {
298  // Sleep for some fraction of a second
299  msleep (rand () % 10 + 1);
300 
301  // Send message from server to client
302  if (is_verbose)
303  printf ("server send - routing_id = %s reply\n",
304  routing_id);
306 
307  rc = zmq_send (worker, routing_id, ROUTING_ID_SIZE,
308  ZMQ_SNDMORE);
310  rc = zmq_send (worker, content, CONTENT_SIZE, 0);
312  }
313  }
314  }
315  }
318 }
319 
320 // If STATISTICS is received, the proxy will reply on the control socket
321 // sending a multipart message with 8 frames, each with an unsigned integer
322 // 64-bit wide that provide in the following order:
323 //
324 // - 0/frn: number of messages received by the frontend socket
325 //
326 // - 1/frb: number of bytes received by the frontend socket
327 //
328 // - 2/fsn: number of messages sent out the frontend socket
329 //
330 // - 3/fsb: number of bytes sent out the frontend socket
331 //
332 // - 4/brn: number of messages received by the backend socket
333 //
334 // - 5/brb: number of bytes received by the backend socket
335 //
336 // - 6/bsn: number of messages sent out the backend socket
337 //
338 // - 7/bsb: number of bytes sent out the backend socket
339 
340 uint64_t read_stat_value (void *proxy_control)
341 {
342  zmq_msg_t stats_msg;
344  TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_recv (&stats_msg, proxy_control, 0));
345  TEST_ASSERT_EQUAL_INT (sizeof (uint64_t), zmq_msg_size (&stats_msg));
346  uint64_t val = *(uint64_t *) zmq_msg_data (&stats_msg);
348  return val;
349 }
350 
351 //return total bytes proxied, so we can test PAUSE/RESUME
352 uint64_t statistics (void *proxy_control, const char *runctx)
353 {
354  if (is_verbose) {
355  printf ("steer: sending STATISTICS - %s\n", runctx);
356  }
357 
358  TEST_ASSERT_SUCCESS_ERRNO (zmq_send (proxy_control, "STATISTICS", 10, 0));
359 
360  uint64_t total_bytes_proxied = 0;
361  for (int count = 0; count < 8; ++count) {
362  uint64_t val = read_stat_value (proxy_control);
363  if (is_verbose) {
364  if (count == 0) {
365  printf ("stats: client pkts out: %d worker pkts out: %d { ",
368  }
369  printf ("%" PRIu64 " ", val);
370  if (count == 7) {
371  printf ("}\n");
372  }
373  }
374  switch (count) {
375  case 3: //bytes sent on frontend
376  case 7: //bytes sent on backend
377  total_bytes_proxied += val;
378  }
379  }
380 
381  int rcvmore;
382  size_t sz = sizeof (rcvmore);
383  zmq_getsockopt (proxy_control, ZMQ_RCVMORE, &rcvmore, &sz);
384  TEST_ASSERT_EQUAL_INT (rcvmore, 0);
385  return total_bytes_proxied;
386 }
387 
388 
389 // The main thread simply starts several clients and a server, and then
390 // waits for the server to finish.
391 
392 void steer (void *proxy_control, const char *command, const char *runctx)
393 {
394  if (is_verbose) {
395  printf ("steer: sending %s - %s\n", command, runctx);
396  }
397 
399  zmq_send (proxy_control, command, strlen (command), 0));
400 
402  //expect an empty reply from REP for commands that need no response
403  zmq_msg_t stats_msg;
405  TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_recv (&stats_msg, proxy_control, 0));
406  TEST_ASSERT_EQUAL_INT (zmq_msg_size (&stats_msg), 0);
407  TEST_ASSERT (!zmq_msg_get (&stats_msg, ZMQ_MORE));
409  }
410 }
411 
413 {
414  int linger = 0;
415  void *threads[QT_CLIENTS + 1];
416 
421 
422  // Worker control socket receives terminate command from main over inproc
423  void *control = zmq_socket (control_context, ZMQ_PUB);
424  linger = 0;
426  zmq_setsockopt (control, ZMQ_LINGER, &linger, sizeof (linger)));
427  TEST_ASSERT_SUCCESS_ERRNO (zmq_bind (control, "inproc://control"));
428 
429  struct thread_data databags[QT_CLIENTS + 1];
430  for (int i = 0; i < QT_CLIENTS; i++) {
431  databags[i].id = i;
432  threads[i] = zmq_threadstart (&client_task, &databags[i]);
433  }
435  msleep (TEST_SLEEP_MS); // setup time
436 
437  // Proxy control socket
438  int control_socktype = ZMQ_PAIR;
439  switch (g_proxy_control_socktype) {
440  case ZMQ_REP:
441  control_socktype = ZMQ_REQ;
442  break;
443  case ZMQ_SUB:
444  control_socktype = ZMQ_PUB;
445  break;
446  default:
447  break;
448  }
449  void *proxy_control = zmq_socket (get_test_context (), control_socktype);
450  TEST_ASSERT_NOT_NULL (proxy_control);
451  linger = 0;
453  zmq_setsockopt (proxy_control, ZMQ_LINGER, &linger, sizeof (linger)));
455  zmq_connect (proxy_control, proxy_control_address));
456 
457  TEST_ASSERT (
458  statistics (proxy_control, "should be all 0s before clients start") == 0);
459 
460  send_string_expect_success (control, "START", 0);
461 
462  msleep (TEST_SLEEP_MS); // Run for some time
463 
464  TEST_ASSERT (statistics (proxy_control, "started clients") > 0);
465  steer (proxy_control, "PAUSE", "pausing proxying after 500ms");
466  uint64_t bytes = statistics (proxy_control, "post-pause");
467 
468  msleep (TEST_SLEEP_MS); // Paused for some time
469 
470  //check no more bytes have been proxied while paused
471  TEST_ASSERT (statistics (proxy_control, "post-pause") == bytes);
472 
473  steer (proxy_control, "RESUME", "resuming proxying after another 500ms");
474 
475  msleep (TEST_SLEEP_MS); // Resumed for a while
476 
477  TEST_ASSERT (statistics (proxy_control, "ran for a while") > bytes);
478 
479  if (is_verbose)
480  printf ("stopping all clients and server workers\n");
481  send_string_expect_success (control, "STOP", 0);
482 
483  statistics (proxy_control, "stopped clients and workers");
484 
485  msleep (TEST_SLEEP_MS); // Wait for all clients and workers to STOP
486 
487  if (is_verbose)
488  printf ("shutting down all clients and server workers\n");
489  send_string_expect_success (control, "TERMINATE", 0);
490 
492  statistics (proxy_control, "terminate clients and server workers");
493 
494  msleep (TEST_SLEEP_MS); // Wait for all clients and workers to terminate
495  steer (proxy_control, "TERMINATE", "terminate proxy");
496 
497  for (int i = 0; i < QT_CLIENTS + 1; i++)
498  zmq_threadclose (threads[i]);
499 
502 
503  TEST_ASSERT_SUCCESS_ERRNO (zmq_close (proxy_control));
504 
506 }
507 
508 int main (void)
509 {
511 
512  UNITY_BEGIN ();
514  return UNITY_END ();
515 }
TEST_ASSERT_EQUAL_STRING_LEN
#define TEST_ASSERT_EQUAL_STRING_LEN(expected, actual, len)
Definition: unity.h:236
client_task
static void client_task(void *db_)
Definition: test_proxy_steerable.cpp:52
g_workers_pkts_out
void * g_workers_pkts_out
Definition: test_proxy_steerable.cpp:27
NULL
NULL
Definition: test_security_zap.cpp:405
UNITY_END
return UNITY_END()
ZMQ_PUB
#define ZMQ_PUB
Definition: zmq.h:259
zmq_threadstart
ZMQ_EXPORT void * zmq_threadstart(zmq_thread_fn *func_, void *arg_)
Definition: zmq_utils.cpp:54
msleep
void msleep(int milliseconds_)
Definition: testutil.cpp:227
setUp
void setUp()
Definition: test_proxy_steerable.cpp:33
zmq_poll
ZMQ_EXPORT int zmq_poll(zmq_pollitem_t *items_, int nitems_, long timeout_)
Definition: zmq.cpp:827
RUN_TEST
#define RUN_TEST(func)
Definition: unity_internals.h:615
zmq_ctx_new
ZMQ_EXPORT void * zmq_ctx_new(void)
Definition: zmq.cpp:109
command
ROSLIB_DECL std::string command(const std::string &cmd)
zmq_msg_get
ZMQ_EXPORT int zmq_msg_get(const zmq_msg_t *msg_, int property_)
Definition: zmq.cpp:657
setup_test_context
void setup_test_context()
Definition: testutil_unity.cpp:179
thread_data::id
int id
Definition: test_proxy.cpp:19
server_worker
static void server_worker(void *)
Definition: test_proxy_steerable.cpp:240
ZMQ_SUBSCRIBE
#define ZMQ_SUBSCRIBE
Definition: zmq.h:278
bind_loopback_ipv4
void bind_loopback_ipv4(void *socket_, char *my_endpoint_, size_t len_)
Definition: testutil_unity.cpp:246
zmq_pollitem_t
Definition: zmq.h:487
teardown_test_context
void teardown_test_context()
Definition: testutil_unity.cpp:189
get_test_context
void * get_test_context()
Definition: testutil_unity.cpp:184
client
void client(int num)
Definition: test_multithread.cpp:134
ZMQ_MORE
#define ZMQ_MORE
Definition: zmq.h:354
ZMQ_SUB
#define ZMQ_SUB
Definition: zmq.h:260
TEST_ASSERT_FALSE
#define TEST_ASSERT_FALSE(condition)
Definition: unity.h:123
bytes
uint8 bytes[10]
Definition: coded_stream_unittest.cc:153
zmq_ctx_destroy
ZMQ_EXPORT int zmq_ctx_destroy(void *context_)
Definition: zmq.cpp:212
s_recv
char * s_recv(void *socket_)
Definition: testutil.cpp:123
ZMQ_REQ
#define ZMQ_REQ
Definition: zmq.h:261
zmq_msg_data
ZMQ_EXPORT void * zmq_msg_data(zmq_msg_t *msg_)
Definition: zmq.cpp:642
TEST_ASSERT
#define TEST_ASSERT(condition)
Definition: unity.h:120
zmq_connect
ZMQ_EXPORT int zmq_connect(void *s_, const char *addr_)
Definition: zmq.cpp:307
thread_data
Definition: test_proxy.cpp:17
ZMQ_POLLIN
#define ZMQ_POLLIN
Definition: zmq.h:482
g_clients_pkts_out
void * g_clients_pkts_out
Definition: test_proxy_steerable.cpp:26
zmq_msg_size
ZMQ_EXPORT size_t zmq_msg_size(const zmq_msg_t *msg_)
Definition: zmq.cpp:647
zmq_atomic_counter_inc
ZMQ_EXPORT int zmq_atomic_counter_inc(void *counter_)
Definition: zmq_utils.cpp:271
testutil_unity.hpp
snprintf
int snprintf(char *str, size_t size, const char *format,...)
Definition: port.cc:64
ZMQ_DEALER
#define ZMQ_DEALER
Definition: zmq.h:263
zmq_setsockopt
ZMQ_EXPORT int zmq_setsockopt(void *s_, int option_, const void *optval_, size_t optvallen_)
Definition: zmq.cpp:250
zmq_threadclose
ZMQ_EXPORT void zmq_threadclose(void *thread_)
Definition: zmq_utils.cpp:62
zmq_atomic_counter_new
ZMQ_EXPORT void * zmq_atomic_counter_new(void)
Definition: zmq_utils.cpp:255
proxy_control_address
const char * proxy_control_address
Definition: test_proxy_steerable.cpp:19
testutil.hpp
ZMQ_ROUTER
#define ZMQ_ROUTER
Definition: zmq.h:264
server_task
void server_task(void *)
Definition: test_proxy_steerable.cpp:168
zmq_msg_t
Definition: zmq.h:218
ZMQ_REP
#define ZMQ_REP
Definition: zmq.h:262
worker
void worker(int num)
Definition: test_multithread.cpp:83
ROUTING_ID_SIZE
#define ROUTING_ID_SIZE
Definition: test_proxy_steerable.cpp:12
control_context
void * control_context
Definition: test_proxy_steerable.cpp:28
my_endpoint
char my_endpoint[MAX_SOCKET_STRING]
Definition: test_security_curve.cpp:31
MAX_SOCKET_STRING
#define MAX_SOCKET_STRING
Definition: libzmq/tests/testutil.hpp:35
zmq_bind
ZMQ_EXPORT int zmq_bind(void *s_, const char *addr_)
Definition: zmq.cpp:299
CONTENT_SIZE
#define CONTENT_SIZE
Definition: test_proxy_steerable.cpp:10
QT_CLIENTS
#define QT_CLIENTS
Definition: test_proxy_steerable.cpp:15
zmq_msg_recv
ZMQ_EXPORT int zmq_msg_recv(zmq_msg_t *msg_, void *s_, int flags_)
Definition: zmq.cpp:617
zmq_socket
ZMQ_EXPORT void * zmq_socket(void *, int type_)
Definition: zmq.cpp:230
TEST_ASSERT_EQUAL_INT
#define TEST_ASSERT_EQUAL_INT(expected, actual)
Definition: unity.h:128
ROUTING_ID_SIZE_MAX
#define ROUTING_ID_SIZE_MAX
Definition: test_proxy_steerable.cpp:13
test_proxy_steerable
void test_proxy_steerable()
Definition: test_proxy_steerable.cpp:412
zmq_msg_init
ZMQ_EXPORT int zmq_msg_init(zmq_msg_t *msg_)
Definition: zmq.cpp:587
zmq_close
ZMQ_EXPORT int zmq_close(void *s_)
Definition: zmq.cpp:241
i
int i
Definition: gmock-matchers_test.cc:764
CONTENT_SIZE_MAX
#define CONTENT_SIZE_MAX
Definition: test_proxy_steerable.cpp:11
ZMQ_LINGER
#define ZMQ_LINGER
Definition: zmq.h:288
send_string_expect_success
void send_string_expect_success(void *socket_, const char *str_, int flags_)
Definition: testutil_unity.cpp:94
ZMQ_PAIR
#define ZMQ_PAIR
Definition: zmq.h:258
steer
void steer(void *proxy_control, const char *command, const char *runctx)
Definition: test_proxy_steerable.cpp:392
g_proxy_control_socktype
int g_proxy_control_socktype
Definition: test_proxy_steerable.cpp:30
zmq_recv
ZMQ_EXPORT int zmq_recv(void *s_, void *buf_, size_t len_, int flags_)
Definition: zmq.cpp:487
zmq_proxy_steerable
ZMQ_EXPORT int zmq_proxy_steerable(void *frontend_, void *backend_, void *capture_, void *control_)
Definition: zmq.cpp:1738
TEST_SLEEP_MS
#define TEST_SLEEP_MS
Definition: test_proxy_steerable.cpp:17
zmq_atomic_counter_value
ZMQ_EXPORT int zmq_atomic_counter_value(void *counter_)
Definition: zmq_utils.cpp:286
main
int main(void)
Definition: test_proxy_steerable.cpp:508
setup_test_environment
void setup_test_environment(int timeout_seconds_)
Definition: testutil.cpp:201
UNITY_BEGIN
UNITY_BEGIN()
ZMQ_SNDMORE
#define ZMQ_SNDMORE
Definition: zmq.h:359
is_verbose
#define is_verbose
Definition: test_proxy_steerable.cpp:16
ZMQ_ROUTING_ID
#define ZMQ_ROUTING_ID
Definition: zmq.h:277
val
GLuint GLfloat * val
Definition: glcorearb.h:3604
statistics
uint64_t statistics(void *proxy_control, const char *runctx)
Definition: test_proxy_steerable.cpp:352
zmq_send
ZMQ_EXPORT int zmq_send(void *s_, const void *buf_, size_t len_, int flags_)
Definition: zmq.cpp:377
count
GLint GLsizei count
Definition: glcorearb.h:2830
TEST_ASSERT_NOT_NULL
#define TEST_ASSERT_NOT_NULL(pointer)
Definition: unity.h:125
ZMQ_RCVMORE
#define ZMQ_RCVMORE
Definition: zmq.h:284
zmq_msg_close
ZMQ_EXPORT int zmq_msg_close(zmq_msg_t *msg_)
Definition: zmq.cpp:625
read_stat_value
uint64_t read_stat_value(void *proxy_control)
Definition: test_proxy_steerable.cpp:340
TEST_ASSERT_SUCCESS_ERRNO
#define TEST_ASSERT_SUCCESS_ERRNO(expr)
Definition: proxy_thr.cpp:47
zmq_getsockopt
ZMQ_EXPORT int zmq_getsockopt(void *s_, int option_, void *optval_, size_t *optvallen_)
Definition: zmq.cpp:261
QT_WORKERS
#define QT_WORKERS
Definition: test_proxy_steerable.cpp:14


libaditof
Author(s):
autogenerated on Wed May 21 2025 02:06:59