test_spec_dealer.cpp
Go to the documentation of this file.
1 /* SPDX-License-Identifier: MPL-2.0 */
2 
3 #include "testutil.hpp"
4 #include "testutil_unity.hpp"
5 
7 
8 // SHALL route outgoing messages to available peers using a round-robin
9 // strategy.
10 void test_round_robin_out (const char *bind_address_)
11 {
12  void *dealer = test_context_socket (ZMQ_DEALER);
13 
15  test_bind (dealer, bind_address_, connect_address,
16  sizeof (connect_address));
17 
18  const size_t services = 5;
19  void *rep[services];
20  for (size_t peer = 0; peer < services; ++peer) {
22 
24  }
25 
26  // Wait for connections.
28 
29  // Send all requests
30  for (size_t i = 0; i < services; ++i)
31  s_send_seq (dealer, 0, "ABC", SEQ_END);
32 
33  // Expect every REP got one message
34  zmq_msg_t msg;
35  zmq_msg_init (&msg);
36 
37  for (size_t peer = 0; peer < services; ++peer)
38  s_recv_seq (rep[peer], "ABC", SEQ_END);
39 
41 
43 
44  for (size_t peer = 0; peer < services; ++peer)
46 }
47 
48 // SHALL receive incoming messages from its peers using a fair-queuing
49 // strategy.
50 void test_fair_queue_in (const char *bind_address_)
51 {
53 
55  test_bind (receiver, bind_address_, connect_address,
56  sizeof (connect_address));
57 
58  const size_t services = 5;
59  void *senders[services];
60  for (size_t peer = 0; peer < services; ++peer) {
61  senders[peer] = test_context_socket (ZMQ_DEALER);
62 
64  zmq_connect (senders[peer], connect_address));
65  }
66 
67  zmq_msg_t msg;
69 
70  s_send_seq (senders[0], "A", SEQ_END);
71  s_recv_seq (receiver, "A", SEQ_END);
72 
73  s_send_seq (senders[0], "A", SEQ_END);
74  s_recv_seq (receiver, "A", SEQ_END);
75 
76  // send our requests
77  for (size_t peer = 0; peer < services; ++peer)
78  s_send_seq (senders[peer], "B", SEQ_END);
79 
80  // Wait for data.
82 
83  // handle the requests
84  for (size_t peer = 0; peer < services; ++peer)
85  s_recv_seq (receiver, "B", SEQ_END);
86 
88 
90 
91  for (size_t peer = 0; peer < services; ++peer)
93 }
94 
95 // SHALL create a double queue when a peer connects to it. If this peer
96 // disconnects, the DEALER socket SHALL destroy its double queue and SHALL
97 // discard any messages it contains.
98 void test_destroy_queue_on_disconnect (const char *bind_address_)
99 {
100  void *a = test_context_socket (ZMQ_DEALER);
101 
103  test_bind (a, bind_address_, connect_address, sizeof (connect_address));
104 
105  void *b = test_context_socket (ZMQ_DEALER);
106 
108 
109  // Send a message in both directions
110  s_send_seq (a, "ABC", SEQ_END);
111  s_send_seq (b, "DEF", SEQ_END);
112 
114 
115  // Disconnect may take time and need command processing.
116  zmq_pollitem_t poller[2] = {{a, 0, 0, 0}, {b, 0, 0, 0}};
117  TEST_ASSERT_SUCCESS_ERRNO (zmq_poll (poller, 2, 100));
118  TEST_ASSERT_SUCCESS_ERRNO (zmq_poll (poller, 2, 100));
119 
120  // No messages should be available, sending should fail.
121  zmq_msg_t msg;
122  zmq_msg_init (&msg);
123 
125 
127 
128  // After a reconnect of B, the messages should still be gone
130 
132 
134 
136 
139 }
140 
141 // SHALL block on sending, or return a suitable error, when it has no connected peers.
142 void test_block_on_send_no_peers (const char *bind_address_)
143 {
144  LIBZMQ_UNUSED (bind_address_);
146 
147  int timeout = 250;
149  zmq_setsockopt (sc, ZMQ_SNDTIMEO, &timeout, sizeof (timeout)));
150 
153 
155 }
156 
157 #define TEST_CASES(name, bind_address) \
158  void test_round_robin_out_##name () \
159  { \
160  test_round_robin_out (bind_address); \
161  } \
162  void test_fair_queue_in_##name () \
163  { \
164  test_fair_queue_in (bind_address); \
165  } \
166  void test_block_on_send_no_peers_##name () \
167  { \
168  test_block_on_send_no_peers (bind_address); \
169  }
170 
171 TEST_CASES (inproc, "inproc://a")
172 TEST_CASES (tcp, "tcp://127.0.0.1:*")
173 
174 int main (void)
175 {
177 
178  UNITY_BEGIN ();
179 
182  RUN_TEST (test_block_on_send_no_peers_inproc);
183 
186  RUN_TEST (test_block_on_send_no_peers_tcp);
187 
188  // TODO *** Test disabled until libzmq does this properly ***
189  // test_destroy_queue_on_disconnect (ctx);
190 
191  return UNITY_END ();
192 }
TEST_CASES
#define TEST_CASES(name, bind_address)
Definition: test_spec_dealer.cpp:157
UNITY_END
return UNITY_END()
test_fair_queue_in_inproc
void test_fair_queue_in_inproc()
Definition: test_spec_rep.cpp:99
msleep
void msleep(int milliseconds_)
Definition: testutil.cpp:227
zmq::protocol_name::tcp
static const char tcp[]
Definition: address.hpp:38
main
int main(void)
Definition: test_spec_dealer.cpp:174
zmq_poll
ZMQ_EXPORT int zmq_poll(zmq_pollitem_t *items_, int nitems_, long timeout_)
Definition: zmq.cpp:827
EAGAIN
#define EAGAIN
Definition: errno.hpp:14
RUN_TEST
#define RUN_TEST(func)
Definition: unity_internals.h:615
test_round_robin_out
SETUP_TEARDOWN_TESTCONTEXT void test_round_robin_out(const char *bind_address_)
Definition: test_spec_dealer.cpp:10
SETUP_TEARDOWN_TESTCONTEXT
#define SETUP_TEARDOWN_TESTCONTEXT
Definition: testutil_unity.hpp:172
zmq_pollitem_t
Definition: zmq.h:487
rep
void * rep[services]
Definition: test_req_relaxed.cpp:11
services
const size_t services
Definition: test_req_relaxed.cpp:8
test_context_socket_close_zero_linger
void * test_context_socket_close_zero_linger(void *socket_)
Definition: testutil_unity.cpp:215
zmq_connect
ZMQ_EXPORT int zmq_connect(void *s_, const char *addr_)
Definition: zmq.cpp:307
b
GLboolean GLboolean GLboolean b
Definition: glcorearb.h:3228
test_round_robin_out_tcp
void test_round_robin_out_tcp()
Definition: test_spec_req.cpp:170
testutil_unity.hpp
ZMQ_DEALER
#define ZMQ_DEALER
Definition: zmq.h:263
test_round_robin_out_inproc
void test_round_robin_out_inproc()
Definition: test_spec_req.cpp:165
zmq_setsockopt
ZMQ_EXPORT int zmq_setsockopt(void *s_, int option_, const void *optval_, size_t optvallen_)
Definition: zmq.cpp:250
ZMQ_SNDTIMEO
#define ZMQ_SNDTIMEO
Definition: zmq.h:297
zmq_disconnect
ZMQ_EXPORT int zmq_disconnect(void *s_, const char *addr_)
Definition: zmq.cpp:345
LIBZMQ_UNUSED
#define LIBZMQ_UNUSED(object)
Definition: macros.hpp:6
testutil.hpp
test_destroy_queue_on_disconnect
void test_destroy_queue_on_disconnect(const char *bind_address_)
Definition: test_spec_dealer.cpp:98
zmq_msg_t
Definition: zmq.h:218
ZMQ_REP
#define ZMQ_REP
Definition: zmq.h:262
MAX_SOCKET_STRING
#define MAX_SOCKET_STRING
Definition: libzmq/tests/testutil.hpp:35
test_fair_queue_in_tcp
void test_fair_queue_in_tcp()
Definition: test_spec_rep.cpp:104
zmq_msg_recv
ZMQ_EXPORT int zmq_msg_recv(zmq_msg_t *msg_, void *s_, int flags_)
Definition: zmq.cpp:617
timeout
GLbitfield GLuint64 timeout
Definition: glcorearb.h:3588
sc
void * sc
Definition: test_channel.cpp:9
test_block_on_send_no_peers
void test_block_on_send_no_peers(const char *bind_address_)
Definition: test_spec_dealer.cpp:142
test_context_socket
void * test_context_socket(int type_)
Definition: testutil_unity.cpp:200
ZMQ_DONTWAIT
#define ZMQ_DONTWAIT
Definition: zmq.h:358
SETTLE_TIME
#define SETTLE_TIME
Definition: libzmq/tests/testutil.hpp:31
zmq::protocol_name::inproc
static const char inproc[]
Definition: address.hpp:37
test_fair_queue_in
void test_fair_queue_in(const char *bind_address_)
Definition: test_spec_dealer.cpp:50
zmq_msg_init
ZMQ_EXPORT int zmq_msg_init(zmq_msg_t *msg_)
Definition: zmq.cpp:587
i
int i
Definition: gmock-matchers_test.cc:764
SEQ_END
const char * SEQ_END
Definition: testutil.cpp:47
s_send_seq
void s_send_seq(void *socket_,...)
Definition: testutil.cpp:135
setup_test_environment
void setup_test_environment(int timeout_seconds_)
Definition: testutil.cpp:201
UNITY_BEGIN
UNITY_BEGIN()
s_recv_seq
void s_recv_seq(void *socket_,...)
Definition: testutil.cpp:158
connect_address
SETUP_TEARDOWN_TESTCONTEXT char connect_address[MAX_SOCKET_STRING]
Definition: tests/test_fork.cpp:14
receiver
static void receiver(void *socket_)
Definition: test_ctx_destroy.cpp:16
TEST_ASSERT_FAILURE_ERRNO
#define TEST_ASSERT_FAILURE_ERRNO(error_code, expr)
Definition: testutil_unity.hpp:95
zmq_send
ZMQ_EXPORT int zmq_send(void *s_, const void *buf_, size_t len_, int flags_)
Definition: zmq.cpp:377
test_context_socket_close
void * test_context_socket_close(void *socket_)
Definition: testutil_unity.cpp:208
a
GLboolean GLboolean GLboolean GLboolean a
Definition: glcorearb.h:3228
zmq_msg_close
ZMQ_EXPORT int zmq_msg_close(zmq_msg_t *msg_)
Definition: zmq.cpp:625
TEST_ASSERT_SUCCESS_ERRNO
#define TEST_ASSERT_SUCCESS_ERRNO(expr)
Definition: proxy_thr.cpp:47
test_bind
void test_bind(void *socket_, const char *bind_address_, char *my_endpoint_, size_t len_)
Definition: testutil_unity.cpp:223


libaditof
Author(s):
autogenerated on Wed May 21 2025 02:06:59