test_hwm_pubsub.cpp
Go to the documentation of this file.
1 /* SPDX-License-Identifier: MPL-2.0 */
2 
3 #include "testutil.hpp"
4 #include "testutil_unity.hpp"
5 
6 #include <string.h>
7 
8 // NOTE: on OSX the endpoint returned by ZMQ_LAST_ENDPOINT may be quite long,
9 // ensure we have extra space for that:
10 #define SOCKET_STRING_LEN (MAX_SOCKET_STRING * 4)
11 
13 
14 int test_defaults (int send_hwm_, int msg_cnt_, const char *endpoint_)
15 {
16  char pub_endpoint[SOCKET_STRING_LEN];
17 
18  // Set up and bind XPUB socket
19  void *pub_socket = test_context_socket (ZMQ_XPUB);
20  test_bind (pub_socket, endpoint_, pub_endpoint, sizeof pub_endpoint);
21 
22  // Set up and connect SUB socket
23  void *sub_socket = test_context_socket (ZMQ_SUB);
24  TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (sub_socket, pub_endpoint));
25 
26  //set a hwm on publisher
28  zmq_setsockopt (pub_socket, ZMQ_SNDHWM, &send_hwm_, sizeof (send_hwm_)));
30  zmq_setsockopt (sub_socket, ZMQ_SUBSCRIBE, 0, 0));
31 
32  // Wait before starting TX operations till 1 subscriber has subscribed
33  // (in this test there's 1 subscriber only)
34  const char subscription_to_all_topics[] = {1, 0};
35  recv_string_expect_success (pub_socket, subscription_to_all_topics, 0);
36 
37  // Send until we reach "mute" state
38  int send_count = 0;
39  while (send_count < msg_cnt_
40  && zmq_send (pub_socket, "test message", 13, ZMQ_DONTWAIT) == 13)
41  ++send_count;
42 
43  TEST_ASSERT_EQUAL_INT (send_hwm_, send_count);
45 
46  // Now receive all sent messages
47  int recv_count = 0;
48  char dummybuff[64];
49  while (13 == zmq_recv (sub_socket, &dummybuff, 64, ZMQ_DONTWAIT)) {
50  ++recv_count;
51  }
52 
53  TEST_ASSERT_EQUAL_INT (send_hwm_, recv_count);
54 
55  // Clean up
56  test_context_socket_close (sub_socket);
57  test_context_socket_close (pub_socket);
58 
59  return recv_count;
60 }
61 
62 int receive (void *socket_, int *is_termination_)
63 {
64  int recv_count = 0;
65  *is_termination_ = 0;
66 
67  // Now receive all sent messages
68  char buffer[255];
69  int len;
70  while ((len = zmq_recv (socket_, buffer, sizeof (buffer), 0)) >= 0) {
71  ++recv_count;
72 
73  if (len == 3 && strncmp (buffer, "end", len) == 0) {
74  *is_termination_ = 1;
75  return recv_count;
76  }
77  }
78 
79  return recv_count;
80 }
81 
82 int test_blocking (int send_hwm_, int msg_cnt_, const char *endpoint_)
83 {
84  char pub_endpoint[SOCKET_STRING_LEN];
85 
86  // Set up bind socket
87  void *pub_socket = test_context_socket (ZMQ_XPUB);
88  test_bind (pub_socket, endpoint_, pub_endpoint, sizeof pub_endpoint);
89 
90  // Set up connect socket
91  void *sub_socket = test_context_socket (ZMQ_SUB);
92  TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (sub_socket, pub_endpoint));
93 
94  //set a hwm on publisher
96  zmq_setsockopt (pub_socket, ZMQ_SNDHWM, &send_hwm_, sizeof (send_hwm_)));
97  int wait = 1;
99  zmq_setsockopt (pub_socket, ZMQ_XPUB_NODROP, &wait, sizeof (wait)));
100  int timeout_ms = 10;
102  sub_socket, ZMQ_RCVTIMEO, &timeout_ms, sizeof (timeout_ms)));
104  zmq_setsockopt (sub_socket, ZMQ_SUBSCRIBE, 0, 0));
105 
106  // Wait before starting TX operations till 1 subscriber has subscribed
107  // (in this test there's 1 subscriber only)
108  const uint8_t subscription_to_all_topics[] = {1};
109  recv_array_expect_success (pub_socket, subscription_to_all_topics, 0);
110 
111  // Send until we block
112  int send_count = 0;
113  int recv_count = 0;
114  int blocked_count = 0;
115  int is_termination = 0;
116  while (send_count < msg_cnt_) {
117  const int rc = zmq_send (pub_socket, NULL, 0, ZMQ_DONTWAIT);
118  if (rc == 0) {
119  ++send_count;
120  } else if (-1 == rc) {
121  // if the PUB socket blocks due to HWM, errno should be EAGAIN:
122  blocked_count++;
124  recv_count += receive (sub_socket, &is_termination);
125  }
126  }
127 
128  // if send_hwm_ < msg_cnt_, we should block at least once:
129  char counts_string[128];
130  snprintf (counts_string, sizeof counts_string - 1,
131  "sent = %i, received = %i", send_count, recv_count);
132  TEST_ASSERT_GREATER_THAN_INT_MESSAGE (0, blocked_count, counts_string);
133 
134  // dequeue SUB socket again, to make sure XPUB has space to send the termination message
135  recv_count += receive (sub_socket, &is_termination);
136 
137  // send termination message
138  send_string_expect_success (pub_socket, "end", 0);
139 
140  // now block on the SUB side till we get the termination message
141  while (is_termination == 0)
142  recv_count += receive (sub_socket, &is_termination);
143 
144  // remove termination message from the count:
145  recv_count--;
146 
147  TEST_ASSERT_EQUAL_INT (send_count, recv_count);
148 
149  // Clean up
150  test_context_socket_close (sub_socket);
151  test_context_socket_close (pub_socket);
152 
153  return recv_count;
154 }
155 
156 // hwm should apply to the messages that have already been received
157 // with hwm 11024: send 9999 msg, receive 9999, send 1100, receive 1100
159 {
160  const int first_count = 9999;
161  const int second_count = 1100;
162  int hwm = 11024;
164 
165  // Set up bind socket
166  void *pub_socket = test_context_socket (ZMQ_PUB);
168  zmq_setsockopt (pub_socket, ZMQ_SNDHWM, &hwm, sizeof (hwm)));
170 
171  // Set up connect socket
172  void *sub_socket = test_context_socket (ZMQ_SUB);
174  zmq_setsockopt (sub_socket, ZMQ_RCVHWM, &hwm, sizeof (hwm)));
177  zmq_setsockopt (sub_socket, ZMQ_SUBSCRIBE, 0, 0));
178 
180 
181  // Send messages
182  int send_count = 0;
183  while (send_count < first_count
184  && zmq_send (pub_socket, NULL, 0, ZMQ_DONTWAIT) == 0)
185  ++send_count;
186  TEST_ASSERT_EQUAL_INT (first_count, send_count);
187 
189 
190  // Now receive all sent messages
191  int recv_count = 0;
192  while (0 == zmq_recv (sub_socket, NULL, 0, ZMQ_DONTWAIT)) {
193  ++recv_count;
194  }
195  TEST_ASSERT_EQUAL_INT (first_count, recv_count);
196 
198 
199  // Send messages
200  send_count = 0;
201  while (send_count < second_count
202  && zmq_send (pub_socket, NULL, 0, ZMQ_DONTWAIT) == 0)
203  ++send_count;
204  TEST_ASSERT_EQUAL_INT (second_count, send_count);
205 
207 
208  // Now receive all sent messages
209  recv_count = 0;
210  while (0 == zmq_recv (sub_socket, NULL, 0, ZMQ_DONTWAIT)) {
211  ++recv_count;
212  }
213  TEST_ASSERT_EQUAL_INT (second_count, recv_count);
214 
215  // Clean up
216  test_context_socket_close (sub_socket);
217  test_context_socket_close (pub_socket);
218 }
219 
220 void test_defaults_large (const char *bind_endpoint_)
221 {
222  // send 1000 msg on hwm 1000, receive 1000
223  TEST_ASSERT_EQUAL_INT (1000, test_defaults (1000, 1000, bind_endpoint_));
224 }
225 
226 void test_defaults_small (const char *bind_endpoint_)
227 {
228  // send 1000 msg on hwm 100, receive 100
229  TEST_ASSERT_EQUAL_INT (100, test_defaults (100, 100, bind_endpoint_));
230 }
231 
232 void test_blocking (const char *bind_endpoint_)
233 {
234  // send 6000 msg on hwm 2000, drops above hwm, only receive hwm:
235  TEST_ASSERT_EQUAL_INT (6000, test_blocking (2000, 6000, bind_endpoint_));
236 }
237 
238 #define DEFINE_REGULAR_TEST_CASES(name, bind_endpoint) \
239  void test_defaults_large_##name () \
240  { \
241  test_defaults_large (bind_endpoint); \
242  } \
243  \
244  void test_defaults_small_##name () \
245  { \
246  test_defaults_small (bind_endpoint); \
247  } \
248  \
249  void test_blocking_##name () \
250  { \
251  test_blocking (bind_endpoint); \
252  }
253 
254 #define RUN_REGULAR_TEST_CASES(name) \
255  RUN_TEST (test_defaults_large_##name); \
256  RUN_TEST (test_defaults_small_##name); \
257  RUN_TEST (test_blocking_##name)
258 
259 DEFINE_REGULAR_TEST_CASES (tcp, "tcp://127.0.0.1:*")
261 
262 #if !defined(ZMQ_HAVE_WINDOWS) && !defined(ZMQ_HAVE_GNU)
263 DEFINE_REGULAR_TEST_CASES (ipc, "ipc://*")
264 #endif
265 
266 int main ()
267 {
269 
270  UNITY_BEGIN ();
271 
274 
275 #if !defined(ZMQ_HAVE_WINDOWS) && !defined(ZMQ_HAVE_GNU)
277 #endif
279  return UNITY_END ();
280 }
recv_array_expect_success
void recv_array_expect_success(void *socket_, const uint8_t(&array_)[SIZE], int flags_)
Definition: testutil_unity.hpp:148
receive
int receive(void *socket_, int *is_termination_)
Definition: test_hwm_pubsub.cpp:62
NULL
NULL
Definition: test_security_zap.cpp:405
UNITY_END
return UNITY_END()
ZMQ_XPUB
#define ZMQ_XPUB
Definition: zmq.h:267
ZMQ_PUB
#define ZMQ_PUB
Definition: zmq.h:259
msleep
void msleep(int milliseconds_)
Definition: testutil.cpp:227
zmq::protocol_name::tcp
static const char tcp[]
Definition: address.hpp:38
EAGAIN
#define EAGAIN
Definition: errno.hpp:14
RUN_TEST
#define RUN_TEST(func)
Definition: unity_internals.h:615
TEST_ASSERT_GREATER_THAN_INT_MESSAGE
#define TEST_ASSERT_GREATER_THAN_INT_MESSAGE(threshold, actual, message)
Definition: unity.h:345
SETUP_TEARDOWN_TESTCONTEXT
#define SETUP_TEARDOWN_TESTCONTEXT
Definition: testutil_unity.hpp:172
ZMQ_SUBSCRIBE
#define ZMQ_SUBSCRIBE
Definition: zmq.h:278
bind_loopback_ipv4
void bind_loopback_ipv4(void *socket_, char *my_endpoint_, size_t len_)
Definition: testutil_unity.cpp:246
test_blocking
int test_blocking(int send_hwm_, int msg_cnt_, const char *endpoint_)
Definition: test_hwm_pubsub.cpp:82
ZMQ_SUB
#define ZMQ_SUB
Definition: zmq.h:260
main
int main()
Definition: test_hwm_pubsub.cpp:266
zmq_connect
ZMQ_EXPORT int zmq_connect(void *s_, const char *addr_)
Definition: zmq.cpp:307
testutil_unity.hpp
snprintf
int snprintf(char *str, size_t size, const char *format,...)
Definition: port.cc:64
ZMQ_RCVHWM
#define ZMQ_RCVHWM
Definition: zmq.h:294
test_defaults_large
void test_defaults_large(const char *bind_endpoint_)
Definition: test_hwm_pubsub.cpp:220
zmq_setsockopt
ZMQ_EXPORT int zmq_setsockopt(void *s_, int option_, const void *optval_, size_t optvallen_)
Definition: zmq.cpp:250
DEFINE_REGULAR_TEST_CASES
#define DEFINE_REGULAR_TEST_CASES(name, bind_endpoint)
Definition: test_hwm_pubsub.cpp:238
testutil.hpp
test_defaults
SETUP_TEARDOWN_TESTCONTEXT int test_defaults(int send_hwm_, int msg_cnt_, const char *endpoint_)
Definition: test_hwm_pubsub.cpp:14
my_endpoint
char my_endpoint[MAX_SOCKET_STRING]
Definition: test_security_curve.cpp:31
MAX_SOCKET_STRING
#define MAX_SOCKET_STRING
Definition: libzmq/tests/testutil.hpp:35
RUN_REGULAR_TEST_CASES
#define RUN_REGULAR_TEST_CASES(name)
Definition: test_hwm_pubsub.cpp:254
buffer
Definition: buffer_processor.h:43
test_context_socket
void * test_context_socket(int type_)
Definition: testutil_unity.cpp:200
ZMQ_DONTWAIT
#define ZMQ_DONTWAIT
Definition: zmq.h:358
SETTLE_TIME
#define SETTLE_TIME
Definition: libzmq/tests/testutil.hpp:31
zmq::protocol_name::inproc
static const char inproc[]
Definition: address.hpp:37
TEST_ASSERT_EQUAL_INT
#define TEST_ASSERT_EQUAL_INT(expected, actual)
Definition: unity.h:128
test_reset_hwm
void test_reset_hwm()
Definition: test_hwm_pubsub.cpp:158
SOCKET_STRING_LEN
#define SOCKET_STRING_LEN
Definition: test_hwm_pubsub.cpp:10
ZMQ_SNDHWM
#define ZMQ_SNDHWM
Definition: zmq.h:293
send_string_expect_success
void send_string_expect_success(void *socket_, const char *str_, int flags_)
Definition: testutil_unity.cpp:94
len
int len
Definition: php/ext/google/protobuf/map.c:206
ZMQ_RCVTIMEO
#define ZMQ_RCVTIMEO
Definition: zmq.h:296
zmq_recv
ZMQ_EXPORT int zmq_recv(void *s_, void *buf_, size_t len_, int flags_)
Definition: zmq.cpp:487
recv_string_expect_success
void recv_string_expect_success(void *socket_, const char *str_, int flags_)
Definition: testutil_unity.cpp:101
setup_test_environment
void setup_test_environment(int timeout_seconds_)
Definition: testutil.cpp:201
UNITY_BEGIN
UNITY_BEGIN()
TEST_ASSERT_FAILURE_ERRNO
#define TEST_ASSERT_FAILURE_ERRNO(error_code, expr)
Definition: testutil_unity.hpp:95
test_defaults_small
void test_defaults_small(const char *bind_endpoint_)
Definition: test_hwm_pubsub.cpp:226
zmq_send
ZMQ_EXPORT int zmq_send(void *s_, const void *buf_, size_t len_, int flags_)
Definition: zmq.cpp:377
test_context_socket_close
void * test_context_socket_close(void *socket_)
Definition: testutil_unity.cpp:208
TEST_ASSERT_SUCCESS_ERRNO
#define TEST_ASSERT_SUCCESS_ERRNO(expr)
Definition: proxy_thr.cpp:47
test_bind
void test_bind(void *socket_, const char *bind_address_, char *my_endpoint_, size_t len_)
Definition: testutil_unity.cpp:223
ZMQ_XPUB_NODROP
#define ZMQ_XPUB_NODROP
Definition: zmq.h:330


libaditof
Author(s):
autogenerated on Wed May 21 2025 02:06:59