test_immediate.cpp
Go to the documentation of this file.
1 /* SPDX-License-Identifier: MPL-2.0 */
2 
3 #include "testutil.hpp"
4 #include "testutil_unity.hpp"
5 
7 
9 {
10  int val;
11  int rc;
12  char buffer[16];
13  size_t len = MAX_SOCKET_STRING;
15  // TEST 1.
16  // First we're going to attempt to send messages to two
17  // pipes, one connected, the other not. We should see
18  // the PUSH load balancing to both pipes, and hence half
19  // of the messages getting queued, as connect() creates a
20  // pipe immediately.
21 
22  void *to = test_context_socket (ZMQ_PULL);
23 
24  // Bind the one valid receiver
25  val = 0;
27  zmq_setsockopt (to, ZMQ_LINGER, &val, sizeof (val)));
29 
30  // Create a socket pushing to two endpoints - only 1 message should arrive.
31  void *from = test_context_socket (ZMQ_PUSH);
32 
33  val = 0;
35  zmq_setsockopt (from, ZMQ_LINGER, &val, sizeof (val)));
36  // This pipe will not connect (provided the ephemeral port is not 5556)
37  TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (from, "tcp://localhost:5556"));
38  // This pipe will
40 
42 
43  // We send 10 messages, 5 should just get stuck in the queue
44  // for the not-yet-connected pipe
45  for (int i = 0; i < 10; ++i) {
46  send_string_expect_success (from, "Hello", 0);
47  }
48 
49  // We now consume from the connected pipe
50  // - we should see just 5
51  int timeout = 250;
53  zmq_setsockopt (to, ZMQ_RCVTIMEO, &timeout, sizeof (int)));
54 
55  int seen = 0;
56  while (true) {
57  rc = zmq_recv (to, &buffer, sizeof (buffer), 0);
58  if (rc == -1)
59  break; // Break when we didn't get a message
60  seen++;
61  }
62  // TODO: this fails ~1% of the runs on OBS but it does not seem to be reproducible anywhere else
63  if (seen == 0)
65  "Unreliable test occasionally fails on slow CIs, ignoring");
66  TEST_ASSERT_EQUAL_INT (5, seen);
67 
70 }
71 
72 
74 {
75  // This time we will do the same thing, connect two pipes,
76  // one of which will succeed in connecting to a bound
77  // receiver, the other of which will fail. However, we will
78  // also set the delay attach on connect flag, which should
79  // cause the pipe attachment to be delayed until the connection
80  // succeeds.
81 
82  // Bind the valid socket
83  void *to = test_context_socket (ZMQ_PULL);
84  size_t len = MAX_SOCKET_STRING;
87 
88  int val = 0;
90  zmq_setsockopt (to, ZMQ_LINGER, &val, sizeof (val)));
91 
92  // Create a socket pushing to two endpoints - all messages should arrive.
93  void *from = test_context_socket (ZMQ_PUSH);
94 
95  val = 0;
97  zmq_setsockopt (from, ZMQ_LINGER, &val, sizeof (val)));
98 
99  // Set the key flag
100  val = 1;
102  zmq_setsockopt (from, ZMQ_IMMEDIATE, &val, sizeof (val)));
103 
104  // Connect to the invalid socket
105  TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (from, "tcp://localhost:5561"));
106  // Connect to the valid socket
108 
109  // Send 10 messages, all should be routed to the connected pipe
110  for (int i = 0; i < 10; ++i) {
111  send_string_expect_success (from, "Hello", 0);
112  }
113  int timeout = 250;
115  zmq_setsockopt (to, ZMQ_RCVTIMEO, &timeout, sizeof (int)));
116 
117  int seen = 0;
118  while (true) {
119  char buffer[16];
120  int rc = zmq_recv (to, &buffer, sizeof (buffer), 0);
121  if (rc == -1)
122  break; // Break when we didn't get a message
123  seen++;
124  }
125  TEST_ASSERT_EQUAL_INT (10, seen);
126 
129 }
130 
132 {
133  // This time we want to validate that the same blocking behaviour
134  // occurs with an existing connection that is broken. We will send
135  // messages to a connected pipe, disconnect and verify the messages
136  // block. Then we reconnect and verify messages flow again.
137  void *backend = test_context_socket (ZMQ_DEALER);
138  void *frontend = test_context_socket (ZMQ_DEALER);
139 
140  int zero = 0;
142  zmq_setsockopt (backend, ZMQ_LINGER, &zero, sizeof (zero)));
144  zmq_setsockopt (frontend, ZMQ_LINGER, &zero, sizeof (zero)));
145 
146  // Frontend connects to backend using IMMEDIATE
147  int on = 1;
149  zmq_setsockopt (frontend, ZMQ_IMMEDIATE, &on, sizeof (on)));
150 
151  size_t len = MAX_SOCKET_STRING;
153  bind_loopback_ipv4 (backend, my_endpoint, len);
154 
156 
157  // Ping backend to frontend so we know when the connection is up
158  send_string_expect_success (backend, "Hello", 0);
159  recv_string_expect_success (frontend, "Hello", 0);
160 
161  // Send message from frontend to backend
162  send_string_expect_success (frontend, "Hello", ZMQ_DONTWAIT);
163 
164  test_context_socket_close (backend);
165 
166  // Give time to process disconnect
167  msleep (SETTLE_TIME * 10);
168 
169  // Send a message, should fail
171  zmq_send (frontend, "Hello", 5, ZMQ_DONTWAIT));
172 
173  // Recreate backend socket
174  backend = test_context_socket (ZMQ_DEALER);
176  zmq_setsockopt (backend, ZMQ_LINGER, &zero, sizeof (zero)));
178 
179  // Ping backend to frontend so we know when the connection is up
180  send_string_expect_success (backend, "Hello", 0);
181  recv_string_expect_success (frontend, "Hello", 0);
182 
183  // After the reconnect, should succeed
184  send_string_expect_success (frontend, "Hello", ZMQ_DONTWAIT);
185 
186  test_context_socket_close (backend);
187  test_context_socket_close (frontend);
188 }
189 
190 int main (void)
191 {
193  UNITY_BEGIN ();
197  return UNITY_END ();
198 }
UNITY_END
return UNITY_END()
msleep
void msleep(int milliseconds_)
Definition: testutil.cpp:227
EAGAIN
#define EAGAIN
Definition: errno.hpp:14
RUN_TEST
#define RUN_TEST(func)
Definition: unity_internals.h:615
SETUP_TEARDOWN_TESTCONTEXT
#define SETUP_TEARDOWN_TESTCONTEXT
Definition: testutil_unity.hpp:172
bind_loopback_ipv4
void bind_loopback_ipv4(void *socket_, char *my_endpoint_, size_t len_)
Definition: testutil_unity.cpp:246
zmq_connect
ZMQ_EXPORT int zmq_connect(void *s_, const char *addr_)
Definition: zmq.cpp:307
ZMQ_IMMEDIATE
#define ZMQ_IMMEDIATE
Definition: zmq.h:304
testutil_unity.hpp
ZMQ_DEALER
#define ZMQ_DEALER
Definition: zmq.h:263
zmq_setsockopt
ZMQ_EXPORT int zmq_setsockopt(void *s_, int option_, const void *optval_, size_t optvallen_)
Definition: zmq.cpp:250
main
int main(void)
Definition: test_immediate.cpp:190
ZMQ_PUSH
#define ZMQ_PUSH
Definition: zmq.h:266
testutil.hpp
my_endpoint
char my_endpoint[MAX_SOCKET_STRING]
Definition: test_security_curve.cpp:31
MAX_SOCKET_STRING
#define MAX_SOCKET_STRING
Definition: libzmq/tests/testutil.hpp:35
zmq_bind
ZMQ_EXPORT int zmq_bind(void *s_, const char *addr_)
Definition: zmq.cpp:299
timeout
GLbitfield GLuint64 timeout
Definition: glcorearb.h:3588
test_immediate_2
void test_immediate_2()
Definition: test_immediate.cpp:73
buffer
Definition: buffer_processor.h:43
test_context_socket
void * test_context_socket(int type_)
Definition: testutil_unity.cpp:200
test_immediate_3
void test_immediate_3()
Definition: test_immediate.cpp:131
ZMQ_DONTWAIT
#define ZMQ_DONTWAIT
Definition: zmq.h:358
SETTLE_TIME
#define SETTLE_TIME
Definition: libzmq/tests/testutil.hpp:31
TEST_ASSERT_EQUAL_INT
#define TEST_ASSERT_EQUAL_INT(expected, actual)
Definition: unity.h:128
i
int i
Definition: gmock-matchers_test.cc:764
TEST_IGNORE_MESSAGE
#define TEST_IGNORE_MESSAGE(message)
Definition: unity.h:103
ZMQ_LINGER
#define ZMQ_LINGER
Definition: zmq.h:288
send_string_expect_success
void send_string_expect_success(void *socket_, const char *str_, int flags_)
Definition: testutil_unity.cpp:94
len
int len
Definition: php/ext/google/protobuf/map.c:206
ZMQ_RCVTIMEO
#define ZMQ_RCVTIMEO
Definition: zmq.h:296
zmq_recv
ZMQ_EXPORT int zmq_recv(void *s_, void *buf_, size_t len_, int flags_)
Definition: zmq.cpp:487
recv_string_expect_success
void recv_string_expect_success(void *socket_, const char *str_, int flags_)
Definition: testutil_unity.cpp:101
setup_test_environment
void setup_test_environment(int timeout_seconds_)
Definition: testutil.cpp:201
test_immediate_1
SETUP_TEARDOWN_TESTCONTEXT void test_immediate_1()
Definition: test_immediate.cpp:8
UNITY_BEGIN
UNITY_BEGIN()
TEST_ASSERT_FAILURE_ERRNO
#define TEST_ASSERT_FAILURE_ERRNO(error_code, expr)
Definition: testutil_unity.hpp:95
val
GLuint GLfloat * val
Definition: glcorearb.h:3604
zmq_send
ZMQ_EXPORT int zmq_send(void *s_, const void *buf_, size_t len_, int flags_)
Definition: zmq.cpp:377
ZMQ_PULL
#define ZMQ_PULL
Definition: zmq.h:265
test_context_socket_close
void * test_context_socket_close(void *socket_)
Definition: testutil_unity.cpp:208
TEST_ASSERT_SUCCESS_ERRNO
#define TEST_ASSERT_SUCCESS_ERRNO(expr)
Definition: proxy_thr.cpp:47


libaditof
Author(s):
autogenerated on Wed May 21 2025 02:06:59