test_heartbeats.cpp
Go to the documentation of this file.
1 /* SPDX-License-Identifier: MPL-2.0 */
2 
3 #include "testutil.hpp"
4 #if defined(ZMQ_HAVE_WINDOWS)
5 #include <winsock2.h>
6 #include <ws2tcpip.h>
7 #include <stdexcept>
8 #define close closesocket
9 typedef SOCKET raw_socket;
10 #else
11 #include <arpa/inet.h>
12 #include <unistd.h>
13 typedef int raw_socket;
14 #endif
15 
16 #include <limits.h>
17 #include <stdlib.h>
18 #include <string.h>
19 
20 // TODO remove this here, either ensure that UINT16_MAX is always properly
21 // defined or handle this at a more central location
22 #ifndef UINT16_MAX
23 #define UINT16_MAX 65535
24 #endif
25 
26 #include "testutil_unity.hpp"
27 
29 
30 // Read one event off the monitor socket; return value and address
31 // by reference, if not null, and event number by value. Returns -1
32 // in case of error.
33 
34 static int get_monitor_event (void *monitor_)
35 {
36  for (int i = 0; i < 10; i++) {
37  // First frame in message contains event number and value
38  zmq_msg_t msg;
40  if (zmq_msg_recv (&msg, monitor_, ZMQ_DONTWAIT) == -1) {
42  continue; // Interrupted, presumably
43  }
45 
46  uint8_t *data = static_cast<uint8_t *> (zmq_msg_data (&msg));
47  uint16_t event = *reinterpret_cast<uint16_t *> (data);
48 
49  // Second frame in message contains event address
51  if (zmq_msg_recv (&msg, monitor_, 0) == -1) {
52  return -1; // Interrupted, presumably
53  }
55 
56  return event;
57  }
58  return -1;
59 }
60 
61 static void recv_with_retry (raw_socket fd_, char *buffer_, int bytes_)
62 {
63  int received = 0;
64  while (true) {
66  recv (fd_, buffer_ + received, bytes_ - received, 0));
68  received += rc;
69  TEST_ASSERT_LESS_OR_EQUAL_INT (bytes_, received);
70  if (received == bytes_)
71  break;
72  }
73 }
74 
75 static void mock_handshake (raw_socket fd_, int mock_ping_)
76 {
77  char buffer[128];
78  memset (buffer, 0, sizeof (buffer));
79  memcpy (buffer, zmtp_greeting_null, sizeof (zmtp_greeting_null));
80 
82  send (fd_, buffer, sizeof (zmtp_greeting_null), 0));
84 
85  recv_with_retry (fd_, buffer, sizeof (zmtp_greeting_null));
86 
87  memset (buffer, 0, sizeof (buffer));
88  memcpy (buffer, zmtp_ready_dealer, sizeof (zmtp_ready_dealer));
90  send (fd_, buffer, sizeof (zmtp_ready_dealer), 0));
92 
93  // greeting
94  recv_with_retry (fd_, buffer, sizeof (zmtp_ready_dealer));
95 
96  if (mock_ping_) {
97  // test PING context - should be replicated in the PONG
98  // to avoid timeouts, do a bulk send
99  const uint8_t zmtp_ping[12] = {4, 10, 4, 'P', 'I', 'N',
100  'G', 0, 0, 'L', 'O', 'L'};
101  uint8_t zmtp_pong[10] = {4, 8, 4, 'P', 'O', 'N', 'G', 'L', 'O', 'L'};
102  memset (buffer, 0, sizeof (buffer));
103  memcpy (buffer, zmtp_ping, 12);
104  rc = TEST_ASSERT_SUCCESS_RAW_ERRNO (send (fd_, buffer, 12, 0));
105  TEST_ASSERT_EQUAL_INT (12, rc);
106 
107  // test a larger body that won't fit in a small message and should get
108  // truncated
109  memset (buffer, 'z', sizeof (buffer));
110  memcpy (buffer, zmtp_ping, 12);
111  buffer[1] = 65;
112  rc = TEST_ASSERT_SUCCESS_RAW_ERRNO (send (fd_, buffer, 67, 0));
113  TEST_ASSERT_EQUAL_INT (67, rc);
114 
115  // small pong
116  recv_with_retry (fd_, buffer, 10);
117  TEST_ASSERT_EQUAL_INT (0, memcmp (zmtp_pong, buffer, 10));
118  // large pong
119  recv_with_retry (fd_, buffer, 23);
120  uint8_t zmtp_pooong[65] = {4, 21, 4, 'P', 'O', 'N', 'G', 'L', 'O', 'L'};
121  memset (zmtp_pooong + 10, 'z', 55);
122  TEST_ASSERT_EQUAL_INT (0, memcmp (zmtp_pooong, buffer, 23));
123  }
124 }
125 
126 static void setup_curve (void *socket_, int is_server_)
127 {
128  const char *secret_key;
129  const char *public_key;
130  const char *server_key;
131 
132  if (is_server_) {
133  secret_key = "JTKVSB%%)wK0E.X)V>+}o?pNmC{O&4W4b!Ni{Lh6";
134  public_key = "rq:rM>}U?@Lns47E1%kR.o@n%FcmmsL/@{H8]yf7";
135  server_key = NULL;
136  } else {
137  secret_key = "D:)Q[IlAW!ahhC2ac:9*A}h:p?([4%wOTJ%JR%cs";
138  public_key = "Yne@$w-vo<fVvi]a<NY6T1ed:M$fCG*[IaLV{hID";
139  server_key = "rq:rM>}U?@Lns47E1%kR.o@n%FcmmsL/@{H8]yf7";
140  }
141 
142  zmq_setsockopt (socket_, ZMQ_CURVE_SECRETKEY, secret_key,
143  strlen (secret_key));
144  zmq_setsockopt (socket_, ZMQ_CURVE_PUBLICKEY, public_key,
145  strlen (public_key));
146  if (is_server_)
147  zmq_setsockopt (socket_, ZMQ_CURVE_SERVER, &is_server_,
148  sizeof (is_server_));
149  else
150  zmq_setsockopt (socket_, ZMQ_CURVE_SERVERKEY, server_key,
151  strlen (server_key));
152 }
153 
154 static void prep_server_socket (int set_heartbeats_,
155  int is_curve_,
156  void **server_out_,
157  void **mon_out_,
158  char *endpoint_,
159  size_t ep_length_,
160  int socket_type_)
161 {
162  // We'll be using this socket in raw mode
163  void *server = test_context_socket (socket_type_);
164 
165  int value = 0;
167  zmq_setsockopt (server, ZMQ_LINGER, &value, sizeof (value)));
168 
169  if (set_heartbeats_) {
170  value = 50;
173  }
174 
175  if (is_curve_)
176  setup_curve (server, 1);
177 
178  bind_loopback_ipv4 (server, endpoint_, ep_length_);
179 
180  // Create and connect a socket for collecting monitor events on dealer
182 
184  server, "inproc://monitor-dealer",
186 
187  // Connect to the inproc endpoint so we'll get events
189  zmq_connect (server_mon, "inproc://monitor-dealer"));
190 
191  *server_out_ = server;
192  *mon_out_ = server_mon;
193 }
194 
195 // This checks for a broken TCP connection (or, in this case a stuck one
196 // where the peer never responds to PINGS). There should be an accepted event
197 // then a disconnect event.
198 static void test_heartbeat_timeout (int server_type_, int mock_ping_)
199 {
200  int rc;
202 
203  void *server, *server_mon;
204  prep_server_socket (!mock_ping_, 0, &server, &server_mon, my_endpoint,
205  MAX_SOCKET_STRING, server_type_);
206 
208 
209  // Mock a ZMTP 3 client so we can forcibly time out a connection
210  mock_handshake (s, mock_ping_);
211 
212  // By now everything should report as connected
215 
216  if (!mock_ping_) {
217  // We should have been disconnected
220  }
221 
222  close (s);
223 
226 }
227 
228 // This checks that peers respect the TTL value in ping messages
229 // We set up a mock ZMTP 3 client and send a ping message with a TLL
230 // to a server that is not doing any heartbeating. Then we sleep,
231 // if the server disconnects the client, then we know the TTL did
232 // its thing correctly.
233 static void test_heartbeat_ttl (int client_type_, int server_type_)
234 {
235  int rc, value;
237 
238  void *server, *server_mon, *client;
240  MAX_SOCKET_STRING, server_type_);
241 
242  client = test_context_socket (client_type_);
243 
244  // Set the heartbeat TTL to 0.1 seconds
245  value = 100;
248 
249  // Set the heartbeat interval to much longer than the TTL so that
250  // the socket times out oon the remote side.
251  value = 250;
254 
256 
257  // By now everything should report as connected
260 
262 
263  // We should have been disconnected
266 
270 }
271 
272 // This checks for normal operation - that is pings and pongs being
273 // exchanged normally. There should be an accepted event on the server,
274 // and then no event afterwards.
275 static void
276 test_heartbeat_notimeout (int is_curve_, int client_type_, int server_type_)
277 {
278  int rc;
280 
281  void *server, *server_mon;
282  prep_server_socket (1, is_curve_, &server, &server_mon, my_endpoint,
283  MAX_SOCKET_STRING, server_type_);
284 
285  void *client = test_context_socket (client_type_);
286  if (is_curve_)
287  setup_curve (client, 0);
289 
290  // Give it a sec to connect and handshake
292 
293  // By now everything should report as connected
296 
297  // We should still be connected because pings and pongs are happenin'
299  // TODO: this fails ~1% of the runs on OBS but it does not seem to be reproducible anywhere else
300  if (rc == 512)
302  "Unreliable test occasionally fails on slow CIs, ignoring");
303  TEST_ASSERT_EQUAL_INT (-1, rc);
304 
308 }
309 
311 {
313 }
314 
316 {
318 }
319 
320 #define DEFINE_TESTS(first, second, first_define, second_define) \
321  void test_heartbeat_ttl_##first##_##second () \
322  { \
323  test_heartbeat_ttl (first_define, second_define); \
324  } \
325  void test_heartbeat_notimeout_##first##_##second () \
326  { \
327  test_heartbeat_notimeout (0, first_define, second_define); \
328  } \
329  void test_heartbeat_notimeout_##first##_##second##_with_curve () \
330  { \
331  test_heartbeat_notimeout (1, first_define, second_define); \
332  }
333 
334 DEFINE_TESTS (dealer, router, ZMQ_DEALER, ZMQ_ROUTER)
337 DEFINE_TESTS (sub, pub, ZMQ_SUB, ZMQ_PUB)
338 DEFINE_TESTS (pair, pair, ZMQ_PAIR, ZMQ_PAIR)
339 
340 #ifdef ZMQ_BUILD_DRAFT_API
341 DEFINE_TESTS (gather, scatter, ZMQ_GATHER, ZMQ_SCATTER)
343 #endif
344 
346 const int heartbeat_ttl_max =
348 
350 {
351  void *const socket = test_context_socket (ZMQ_PAIR);
353  zmq_setsockopt (socket, ZMQ_HEARTBEAT_TTL, &value_, sizeof (value_)));
354 
355  int value_read;
356  size_t value_read_size = sizeof (value_read);
358  &value_read, &value_read_size));
359 
361  value_read);
362 
363  test_context_socket_close (socket);
364 }
365 
367 {
369 }
370 
372 {
373  void *const socket = test_context_socket (ZMQ_PAIR);
374  const int value = heartbeat_ttl_max + 1;
376  EINVAL,
377  zmq_setsockopt (socket, ZMQ_HEARTBEAT_TTL, &value, sizeof (value)));
378 
379  test_context_socket_close (socket);
380 }
381 
383 {
385 }
386 
387 int main (void)
388 {
389  // The test cases are very long-running. The default timeout of 60 seconds
390  // is not always enough.
392 
393  UNITY_BEGIN ();
394 
397 
398  RUN_TEST (test_heartbeat_ttl_dealer_router);
399  RUN_TEST (test_heartbeat_ttl_req_rep);
400  RUN_TEST (test_heartbeat_ttl_pull_push);
401  RUN_TEST (test_heartbeat_ttl_sub_pub);
402  RUN_TEST (test_heartbeat_ttl_pair_pair);
403 
407 
408  RUN_TEST (test_heartbeat_notimeout_dealer_router);
409  RUN_TEST (test_heartbeat_notimeout_req_rep);
410  RUN_TEST (test_heartbeat_notimeout_pull_push);
411  RUN_TEST (test_heartbeat_notimeout_sub_pub);
412  RUN_TEST (test_heartbeat_notimeout_pair_pair);
413 
414  RUN_TEST (test_heartbeat_notimeout_dealer_router_with_curve);
415  RUN_TEST (test_heartbeat_notimeout_req_rep_with_curve);
416  RUN_TEST (test_heartbeat_notimeout_pull_push_with_curve);
417  RUN_TEST (test_heartbeat_notimeout_sub_pub_with_curve);
418  RUN_TEST (test_heartbeat_notimeout_pair_pair_with_curve);
419 
420 #ifdef ZMQ_BUILD_DRAFT_API
421  RUN_TEST (test_heartbeat_ttl_client_server);
422  RUN_TEST (test_heartbeat_ttl_gather_scatter);
423 
424  RUN_TEST (test_heartbeat_notimeout_client_server);
425  RUN_TEST (test_heartbeat_notimeout_gather_scatter);
426 
427  RUN_TEST (test_heartbeat_notimeout_client_server_with_curve);
428  RUN_TEST (test_heartbeat_notimeout_gather_scatter_with_curve);
429 #endif
430 
431  return UNITY_END ();
432 }
test_heartbeat_timeout
static void test_heartbeat_timeout(int server_type_, int mock_ping_)
Definition: test_heartbeats.cpp:198
test_heartbeat_timeout_router_mock_ping
void test_heartbeat_timeout_router_mock_ping()
Definition: test_heartbeats.cpp:315
TEST_ASSERT_LESS_OR_EQUAL_INT
#define TEST_ASSERT_LESS_OR_EQUAL_INT(threshold, actual)
Definition: unity.h:201
ZMQ_SERVER
#define ZMQ_SERVER
Definition: zmq_draft.h:14
ZMQ_HEARTBEAT_IVL
#define ZMQ_HEARTBEAT_IVL
Definition: zmq.h:336
ZMQ_GATHER
#define ZMQ_GATHER
Definition: zmq_draft.h:18
NULL
NULL
Definition: test_security_zap.cpp:405
UNITY_END
return UNITY_END()
ZMQ_PUB
#define ZMQ_PUB
Definition: zmq.h:259
prep_server_socket
static void prep_server_socket(int set_heartbeats_, int is_curve_, void **server_out_, void **mon_out_, char *endpoint_, size_t ep_length_, int socket_type_)
Definition: test_heartbeats.cpp:154
EINVAL
#define EINVAL
Definition: errno.hpp:25
msleep
void msleep(int milliseconds_)
Definition: testutil.cpp:227
zmq_socket_monitor
ZMQ_EXPORT int zmq_socket_monitor(void *s_, const char *addr_, int events_)
Definition: zmq.cpp:278
TEST_ASSERT_TRUE
#define TEST_ASSERT_TRUE(condition)
Definition: unity.h:121
s
XmlRpcServer s
ZMQ_CLIENT
#define ZMQ_CLIENT
Definition: zmq_draft.h:15
RUN_TEST
#define RUN_TEST(func)
Definition: unity_internals.h:615
ZMQ_CURVE_SECRETKEY
#define ZMQ_CURVE_SECRETKEY
Definition: zmq.h:314
main
int main(void)
Definition: test_heartbeats.cpp:387
ZMQ_CURVE_SERVERKEY
#define ZMQ_CURVE_SERVERKEY
Definition: zmq.h:315
SETUP_TEARDOWN_TESTCONTEXT
#define SETUP_TEARDOWN_TESTCONTEXT
Definition: testutil_unity.hpp:172
mock_handshake
static void mock_handshake(raw_socket fd_, int mock_ping_)
Definition: test_heartbeats.cpp:75
bind_loopback_ipv4
void bind_loopback_ipv4(void *socket_, char *my_endpoint_, size_t len_)
Definition: testutil_unity.cpp:246
client
void client(int num)
Definition: test_multithread.cpp:134
ZMQ_SUB
#define ZMQ_SUB
Definition: zmq.h:260
send
void send(fd_t fd_, const char(&data_)[N])
Definition: test_security_curve.cpp:209
rep
void * rep[services]
Definition: test_req_relaxed.cpp:11
setup_curve
static void setup_curve(void *socket_, int is_server_)
Definition: test_heartbeats.cpp:126
TEST_ASSERT_FALSE
#define TEST_ASSERT_FALSE(condition)
Definition: unity.h:123
ZMQ_REQ
#define ZMQ_REQ
Definition: zmq.h:261
zmq_msg_data
ZMQ_EXPORT void * zmq_msg_data(zmq_msg_t *msg_)
Definition: zmq.cpp:642
zmq_connect
ZMQ_EXPORT int zmq_connect(void *s_, const char *addr_)
Definition: zmq.cpp:307
UINT16_MAX
#define UINT16_MAX
Definition: test_heartbeats.cpp:23
raw_socket
int raw_socket
Definition: test_heartbeats.cpp:13
TEST_ASSERT_GREATER_THAN_INT
#define TEST_ASSERT_GREATER_THAN_INT(threshold, actual)
Definition: unity.h:153
testutil_unity.hpp
ZMQ_DEALER
#define ZMQ_DEALER
Definition: zmq.h:263
test_setsockopt_heartbeat_ttl_more_than_max_fails
void test_setsockopt_heartbeat_ttl_more_than_max_fails()
Definition: test_heartbeats.cpp:371
zmq_setsockopt
ZMQ_EXPORT int zmq_setsockopt(void *s_, int option_, const void *optval_, size_t optvallen_)
Definition: zmq.cpp:250
test_setsockopt_heartbeat_success
void test_setsockopt_heartbeat_success(const int value_)
Definition: test_heartbeats.cpp:349
ZMQ_EVENT_CONNECTED
#define ZMQ_EVENT_CONNECTED
Definition: zmq.h:401
event
struct _cl_event * event
Definition: glcorearb.h:4163
ZMQ_PUSH
#define ZMQ_PUSH
Definition: zmq.h:266
TEST_ASSERT_SUCCESS_RAW_ERRNO
#define TEST_ASSERT_SUCCESS_RAW_ERRNO(expr)
Definition: testutil_unity.hpp:69
testutil.hpp
ZMQ_ROUTER
#define ZMQ_ROUTER
Definition: zmq.h:264
zmq_msg_t
Definition: zmq.h:218
ZMQ_REP
#define ZMQ_REP
Definition: zmq.h:262
connect_socket
fd_t connect_socket(const char *endpoint_, const int af_, const int protocol_)
Definition: testutil.cpp:353
DEFINE_TESTS
#define DEFINE_TESTS(first, second, first_define, second_define)
Definition: test_heartbeats.cpp:320
my_endpoint
char my_endpoint[MAX_SOCKET_STRING]
Definition: test_security_curve.cpp:31
MAX_SOCKET_STRING
#define MAX_SOCKET_STRING
Definition: libzmq/tests/testutil.hpp:35
zmq_msg_recv
ZMQ_EXPORT int zmq_msg_recv(zmq_msg_t *msg_, void *s_, int flags_)
Definition: zmq.cpp:617
buffer
Definition: buffer_processor.h:43
test_context_socket
void * test_context_socket(int type_)
Definition: testutil_unity.cpp:200
ZMQ_DONTWAIT
#define ZMQ_DONTWAIT
Definition: zmq.h:358
SETTLE_TIME
#define SETTLE_TIME
Definition: libzmq/tests/testutil.hpp:31
test_setsockopt_heartbeat_ttl_near_zero
void test_setsockopt_heartbeat_ttl_near_zero()
Definition: test_heartbeats.cpp:382
server
void * server
Definition: test_security_curve.cpp:29
TEST_ASSERT_EQUAL_INT
#define TEST_ASSERT_EQUAL_INT(expected, actual)
Definition: unity.h:128
zmq_msg_init
ZMQ_EXPORT int zmq_msg_init(zmq_msg_t *msg_)
Definition: zmq.cpp:587
zmtp_greeting_null
const uint8_t zmtp_greeting_null[64]
Definition: libzmq/tests/testutil.hpp:48
ZMQ_HEARTBEAT_TTL
#define ZMQ_HEARTBEAT_TTL
Definition: zmq.h:337
i
int i
Definition: gmock-matchers_test.cc:764
TEST_IGNORE_MESSAGE
#define TEST_IGNORE_MESSAGE(message)
Definition: unity.h:103
zmtp_ready_dealer
const uint8_t zmtp_ready_dealer[43]
Definition: libzmq/tests/testutil.hpp:57
value_
int value_
Definition: gmock-matchers_test.cc:571
ZMQ_LINGER
#define ZMQ_LINGER
Definition: zmq.h:288
req
void * req
Definition: test_req_relaxed.cpp:10
server_mon
void * server_mon
Definition: test_security_curve.cpp:30
ZMQ_PAIR
#define ZMQ_PAIR
Definition: zmq.h:258
ZMQ_SCATTER
#define ZMQ_SCATTER
Definition: zmq_draft.h:19
ZMQ_EVENT_ACCEPTED
#define ZMQ_EVENT_ACCEPTED
Definition: zmq.h:406
ZMQ_CURVE_SERVER
#define ZMQ_CURVE_SERVER
Definition: zmq.h:312
ZMQ_EVENT_DISCONNECTED
#define ZMQ_EVENT_DISCONNECTED
Definition: zmq.h:410
get_monitor_event
static SETUP_TEARDOWN_TESTCONTEXT int get_monitor_event(void *monitor_)
Definition: test_heartbeats.cpp:34
setup_test_environment
void setup_test_environment(int timeout_seconds_)
Definition: testutil.cpp:201
test_heartbeat_ttl
static void test_heartbeat_ttl(int client_type_, int server_type_)
Definition: test_heartbeats.cpp:233
UNITY_BEGIN
UNITY_BEGIN()
fd_t
zmq_fd_t fd_t
Definition: libzmq/tests/testutil.hpp:98
test_setsockopt_heartbeat_ttl_max
void test_setsockopt_heartbeat_ttl_max()
Definition: test_heartbeats.cpp:366
push
static void push(tarjan *t, const upb_refcounted *r)
Definition: ruby/ext/google/protobuf_c/upb.c:5890
TEST_ASSERT_FAILURE_ERRNO
#define TEST_ASSERT_FAILURE_ERRNO(error_code, expr)
Definition: testutil_unity.hpp:95
heartbeat_ttl_max
const int heartbeat_ttl_max
Definition: test_heartbeats.cpp:346
data
GLint GLenum GLsizei GLsizei GLsizei GLint GLsizei const GLvoid * data
Definition: glcorearb.h:2879
buffer_
static uint8 buffer_[kBufferSize]
Definition: coded_stream_unittest.cc:136
value
GLsizei const GLfloat * value
Definition: glcorearb.h:3093
ZMQ_PULL
#define ZMQ_PULL
Definition: zmq.h:265
zmq_msg_more
ZMQ_EXPORT int zmq_msg_more(const zmq_msg_t *msg_)
Definition: zmq.cpp:652
test_context_socket_close
void * test_context_socket_close(void *socket_)
Definition: testutil_unity.cpp:208
SOCKET
uintptr_t SOCKET
Definition: wepoll.c:71
TEST_ASSERT_SUCCESS_ERRNO
#define TEST_ASSERT_SUCCESS_ERRNO(expr)
Definition: proxy_thr.cpp:47
deciseconds_per_millisecond
const int deciseconds_per_millisecond
Definition: test_heartbeats.cpp:345
test_heartbeat_timeout_router
void test_heartbeat_timeout_router()
Definition: test_heartbeats.cpp:310
zmq_getsockopt
ZMQ_EXPORT int zmq_getsockopt(void *s_, int option_, void *optval_, size_t *optvallen_)
Definition: zmq.cpp:261
ZMQ_CURVE_PUBLICKEY
#define ZMQ_CURVE_PUBLICKEY
Definition: zmq.h:313
recv_with_retry
static void recv_with_retry(raw_socket fd_, char *buffer_, int bytes_)
Definition: test_heartbeats.cpp:61
test_heartbeat_notimeout
static void test_heartbeat_notimeout(int is_curve_, int client_type_, int server_type_)
Definition: test_heartbeats.cpp:276


libaditof
Author(s):
autogenerated on Wed May 21 2025 02:06:59