Go to the documentation of this file.
10 #define CONTENT_SIZE 13
11 #define CONTENT_SIZE_MAX 32
12 #define ROUTING_ID_SIZE 10
13 #define ROUTING_ID_SIZE_MAX 32
17 #define TEST_SLEEP_MS 500
61 char endpoint_source[256];
62 snprintf (endpoint_source, 256 *
sizeof (
char),
"inproc://endpoint%d",
84 rand () % 0xFFFF, rand () % 0xFFFF);
98 bool enable_send =
false;
102 for (centitick = 0; centitick < 20; centitick++) {
106 size_t sz =
sizeof (rcvmore);
112 "client receive - routing_id = %s content = %s\n",
113 routing_id, content);
127 "client receive - routing_id = %s command = %s\n",
128 routing_id, content);
129 if (memcmp (content,
"TERMINATE", 9) == 0) {
131 }
else if (memcmp (content,
"STOP", 4) == 0) {
133 }
else if (memcmp (content,
"START", 5) == 0) {
143 "request #%03d", ++request_nbr);
145 printf (
"client send - routing_id = %s request #%03d\n",
146 routing_id, request_nbr);
189 for (thread_nbr = 0; thread_nbr <
QT_WORKERS; thread_nbr++)
194 char endpoint_source[256];
199 endpoint_receivers[
i],
ZMQ_LINGER, &linger,
sizeof (linger)));
200 snprintf (endpoint_source, 256 *
sizeof (
char),
"inproc://endpoint%d",
203 zmq_bind (endpoint_receivers[
i], endpoint_source));
211 void *proxy_control =
224 for (thread_nbr = 0; thread_nbr <
QT_WORKERS; thread_nbr++) {
264 bool keep_sending =
true;
273 printf (
"server_worker receives command = %s\n", content);
274 if (memcmp (content,
"TERMINATE", 9) == 0)
276 if (memcmp (content,
"STOP", 4) == 0)
277 keep_sending =
false;
291 printf (
"server receive - routing_id = %s content = %s\n",
292 routing_id, content);
296 int reply, replies = rand () % 5;
297 for (reply = 0; reply < replies; reply++) {
299 msleep (rand () % 10 + 1);
303 printf (
"server send - routing_id = %s reply\n",
352 uint64_t
statistics (
void *proxy_control,
const char *runctx)
355 printf (
"steer: sending STATISTICS - %s\n", runctx);
360 uint64_t total_bytes_proxied = 0;
365 printf (
"stats: client pkts out: %d worker pkts out: %d { ",
369 printf (
"%" PRIu64
" ",
val);
377 total_bytes_proxied +=
val;
382 size_t sz =
sizeof (rcvmore);
385 return total_bytes_proxied;
392 void steer (
void *proxy_control,
const char *command,
const char *runctx)
395 printf (
"steer: sending %s - %s\n",
command, runctx);
458 statistics (proxy_control,
"should be all 0s before clients start") == 0);
465 steer (proxy_control,
"PAUSE",
"pausing proxying after 500ms");
473 steer (proxy_control,
"RESUME",
"resuming proxying after another 500ms");
480 printf (
"stopping all clients and server workers\n");
483 statistics (proxy_control,
"stopped clients and workers");
488 printf (
"shutting down all clients and server workers\n");
492 statistics (proxy_control,
"terminate clients and server workers");
495 steer (proxy_control,
"TERMINATE",
"terminate proxy");
#define TEST_ASSERT_EQUAL_STRING_LEN(expected, actual, len)
static void client_task(void *db_)
void * g_workers_pkts_out
ZMQ_EXPORT void * zmq_threadstart(zmq_thread_fn *func_, void *arg_)
void msleep(int milliseconds_)
ZMQ_EXPORT int zmq_poll(zmq_pollitem_t *items_, int nitems_, long timeout_)
ZMQ_EXPORT void * zmq_ctx_new(void)
ROSLIB_DECL std::string command(const std::string &cmd)
ZMQ_EXPORT int zmq_msg_get(const zmq_msg_t *msg_, int property_)
void setup_test_context()
static void server_worker(void *)
void bind_loopback_ipv4(void *socket_, char *my_endpoint_, size_t len_)
void teardown_test_context()
void * get_test_context()
#define TEST_ASSERT_FALSE(condition)
ZMQ_EXPORT int zmq_ctx_destroy(void *context_)
char * s_recv(void *socket_)
ZMQ_EXPORT void * zmq_msg_data(zmq_msg_t *msg_)
#define TEST_ASSERT(condition)
ZMQ_EXPORT int zmq_connect(void *s_, const char *addr_)
void * g_clients_pkts_out
ZMQ_EXPORT size_t zmq_msg_size(const zmq_msg_t *msg_)
ZMQ_EXPORT int zmq_atomic_counter_inc(void *counter_)
int snprintf(char *str, size_t size, const char *format,...)
ZMQ_EXPORT int zmq_setsockopt(void *s_, int option_, const void *optval_, size_t optvallen_)
ZMQ_EXPORT void zmq_threadclose(void *thread_)
ZMQ_EXPORT void * zmq_atomic_counter_new(void)
const char * proxy_control_address
char my_endpoint[MAX_SOCKET_STRING]
#define MAX_SOCKET_STRING
ZMQ_EXPORT int zmq_bind(void *s_, const char *addr_)
ZMQ_EXPORT int zmq_msg_recv(zmq_msg_t *msg_, void *s_, int flags_)
ZMQ_EXPORT void * zmq_socket(void *, int type_)
#define TEST_ASSERT_EQUAL_INT(expected, actual)
#define ROUTING_ID_SIZE_MAX
void test_proxy_steerable()
ZMQ_EXPORT int zmq_msg_init(zmq_msg_t *msg_)
ZMQ_EXPORT int zmq_close(void *s_)
void send_string_expect_success(void *socket_, const char *str_, int flags_)
void steer(void *proxy_control, const char *command, const char *runctx)
int g_proxy_control_socktype
ZMQ_EXPORT int zmq_recv(void *s_, void *buf_, size_t len_, int flags_)
ZMQ_EXPORT int zmq_proxy_steerable(void *frontend_, void *backend_, void *capture_, void *control_)
ZMQ_EXPORT int zmq_atomic_counter_value(void *counter_)
void setup_test_environment(int timeout_seconds_)
uint64_t statistics(void *proxy_control, const char *runctx)
ZMQ_EXPORT int zmq_send(void *s_, const void *buf_, size_t len_, int flags_)
#define TEST_ASSERT_NOT_NULL(pointer)
ZMQ_EXPORT int zmq_msg_close(zmq_msg_t *msg_)
uint64_t read_stat_value(void *proxy_control)
#define TEST_ASSERT_SUCCESS_ERRNO(expr)
ZMQ_EXPORT int zmq_getsockopt(void *s_, int option_, void *optval_, size_t *optvallen_)
libaditof
Author(s):
autogenerated on Wed May 21 2025 02:06:59