test_connect_delay_tipc.cpp
Go to the documentation of this file.
1 /* SPDX-License-Identifier: MPL-2.0 */
2 
3 #include "testutil.hpp"
4 #include "testutil_unity.hpp"
5 #include "testutil_security.hpp"
6 
8 
10 {
11  int val;
12  // TEST 1.
13  // First we're going to attempt to send messages to two
14  // pipes, one connected, the other not. We should see
15  // the PUSH load balancing to both pipes, and hence half
16  // of the messages getting queued, as connect() creates a
17  // pipe immediately.
18 
19  void *to = test_context_socket (ZMQ_PULL);
20  int timeout = 5000;
22  zmq_setsockopt (to, ZMQ_LINGER, &timeout, sizeof (timeout)));
23 
24  // Bind the one valid receiver
25  val = 0;
27  zmq_setsockopt (to, ZMQ_LINGER, &val, sizeof (val)));
28  TEST_ASSERT_SUCCESS_ERRNO (zmq_bind (to, "tipc://{6555,0,0}"));
29 
30  // Create a socket pushing to two endpoints - only 1 message should arrive.
31  void *from = test_context_socket (ZMQ_PUSH);
32 
33  val = 0;
35  zmq_setsockopt (from, ZMQ_LINGER, &val, sizeof (val)));
37  zmq_setsockopt (from, ZMQ_LINGER, &timeout, sizeof (timeout)));
38  // This pipe will not connect
39  TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (from, "tipc://{5556,0}@0.0.0"));
40  // This pipe will
41  TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (from, "tipc://{6555,0}@0.0.0"));
42 
43  // We send 10 messages, 5 should just get stuck in the queue
44  // for the not-yet-connected pipe
45  const int send_count = 10;
46  for (int i = 0; i < send_count; ++i) {
47  send_string_expect_success (from, "Hello", 0);
48  }
49 
50  // We now consume from the connected pipe
51  // - we should see just 5
54  zmq_setsockopt (to, ZMQ_RCVTIMEO, &timeout, sizeof (int)));
55 
56  int seen = 0;
57  while (true) {
58  char buffer[16];
59  int rc = zmq_recv (to, &buffer, sizeof (buffer), 0);
60  if (rc == -1) {
62  break; // Break when we didn't get a message
63  }
64  seen++;
65  }
66  TEST_ASSERT_EQUAL_INT (send_count / 2, seen);
67 
70 }
71 
73 {
74  int val;
75 
76  // TEST 2
77  // This time we will do the same thing, connect two pipes,
78  // one of which will succeed in connecting to a bound
79  // receiver, the other of which will fail. However, we will
80  // also set the delay attach on connect flag, which should
81  // cause the pipe attachment to be delayed until the connection
82  // succeeds.
83 
84  // Bind the valid socket
85  void *to = test_context_socket (ZMQ_PULL);
86  TEST_ASSERT_SUCCESS_ERRNO (zmq_bind (to, "tipc://{5560,0,0}"));
87  int timeout = 5000;
89  zmq_setsockopt (to, ZMQ_LINGER, &timeout, sizeof (timeout)));
90 
91  val = 0;
93  zmq_setsockopt (to, ZMQ_LINGER, &val, sizeof (val)));
94 
95  // Create a socket pushing to two endpoints - all messages should arrive.
96  void *from = test_context_socket (ZMQ_PUSH);
97 
98  val = 0;
100  zmq_setsockopt (from, ZMQ_LINGER, &val, sizeof (val)));
102  zmq_setsockopt (from, ZMQ_LINGER, &timeout, sizeof (timeout)));
103 
104  // Set the key flag
105  val = 1;
107  zmq_setsockopt (from, ZMQ_DELAY_ATTACH_ON_CONNECT, &val, sizeof (val)));
108 
109  // Connect to the invalid socket
110  TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (from, "tipc://{5561,0}@0.0.0"));
111  // Connect to the valid socket
112  TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (from, "tipc://{5560,0}@0.0.0"));
113 
114  // Send 10 messages, all should be routed to the connected pipe
115  const int send_count = 10;
116  for (int i = 0; i < send_count; ++i) {
117  send_string_expect_success (from, "Hello", 0);
118  }
121  zmq_setsockopt (to, ZMQ_RCVTIMEO, &timeout, sizeof (int)));
122 
123  int seen = 0;
124  while (true) {
125  char buffer[16];
126  int rc = zmq_recv (to, &buffer, sizeof (buffer), 0);
127  if (rc == -1) {
129  break; // Break when we didn't get a message
130  }
131  seen++;
132  }
133  TEST_ASSERT_EQUAL_INT (send_count, seen);
134 
137 }
138 
140 {
141  // TEST 3
142  // This time we want to validate that the same blocking behaviour
143  // occurs with an existing connection that is broken. We will send
144  // messages to a connected pipe, disconnect and verify the messages
145  // block. Then we reconnect and verify messages flow again.
146  void *backend = test_context_socket (ZMQ_DEALER);
147  void *frontend = test_context_socket (ZMQ_DEALER);
148  void *monitor = test_context_socket (ZMQ_PAIR);
149  int rc;
150  int zero = 0;
152  zmq_setsockopt (backend, ZMQ_LINGER, &zero, sizeof (zero)));
154  zmq_setsockopt (frontend, ZMQ_LINGER, &zero, sizeof (zero)));
155  TEST_ASSERT_SUCCESS_ERRNO (zmq_socket_monitor (frontend, "inproc://monitor",
157  int timeout = 5000;
159  zmq_setsockopt (backend, ZMQ_LINGER, &timeout, sizeof (timeout)));
161  zmq_setsockopt (frontend, ZMQ_LINGER, &timeout, sizeof (timeout)));
162 
163  // Frontend connects to backend using DELAY_ATTACH_ON_CONNECT
164  int on = 1;
166  zmq_setsockopt (frontend, ZMQ_DELAY_ATTACH_ON_CONNECT, &on, sizeof (on)));
167  TEST_ASSERT_SUCCESS_ERRNO (zmq_bind (backend, "tipc://{5560,0,0}"));
168  TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (monitor, "inproc://monitor"));
169  TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (frontend, "tipc://{5560,0}@0.0.0"));
170 
171  // Ping backend to frontend so we know when the connection is up
172  send_string_expect_success (backend, "Hello", 0);
173  recv_string_expect_success (frontend, "Hello", 0);
174 
175  // Send message from frontend to backend
176  send_string_expect_success (frontend, "Hello", ZMQ_DONTWAIT);
177 
178  test_context_socket_close (backend);
179 
180  // Wait for disconnect to happen
182 
183  // Send a message, might succeed depending on scheduling of the I/O thread
184  do {
185  rc = zmq_send (frontend, "Hello", 5, ZMQ_DONTWAIT);
186  TEST_ASSERT_TRUE (rc == 5 || (rc == -1 && zmq_errno () == EAGAIN));
187  } while (rc == 5);
188 
189  // Recreate backend socket
190  backend = test_context_socket (ZMQ_DEALER);
192  zmq_setsockopt (backend, ZMQ_LINGER, &zero, sizeof (zero)));
193  TEST_ASSERT_SUCCESS_ERRNO (zmq_bind (backend, "tipc://{5560,0,0}"));
194 
195  // Ping backend to frontend so we know when the connection is up
196  send_string_expect_success (backend, "Hello", 0);
197  recv_string_expect_success (frontend, "Hello", 0);
198 
199  // After the reconnect, should succeed
200  send_string_expect_success (frontend, "Hello", ZMQ_DONTWAIT);
201 
202  test_context_socket_close (monitor);
203  test_context_socket_close (backend);
204  test_context_socket_close (frontend);
205 }
206 
207 int main (void)
208 {
209  if (!is_tipc_available ()) {
210  printf ("TIPC environment unavailable, skipping test\n");
211  return 77;
212  }
213 
214  UNITY_BEGIN ();
218  return UNITY_END ();
219 }
UNITY_END
return UNITY_END()
zmq_errno
ZMQ_EXPORT int zmq_errno(void)
Definition: zmq.cpp:101
zmq_socket_monitor
ZMQ_EXPORT int zmq_socket_monitor(void *s_, const char *addr_, int events_)
Definition: zmq.cpp:278
TEST_ASSERT_TRUE
#define TEST_ASSERT_TRUE(condition)
Definition: unity.h:121
EAGAIN
#define EAGAIN
Definition: errno.hpp:14
ZMQ_DELAY_ATTACH_ON_CONNECT
#define ZMQ_DELAY_ATTACH_ON_CONNECT
Definition: zmq.h:378
RUN_TEST
#define RUN_TEST(func)
Definition: unity_internals.h:615
testutil_security.hpp
SETUP_TEARDOWN_TESTCONTEXT
#define SETUP_TEARDOWN_TESTCONTEXT
Definition: testutil_unity.hpp:172
test_send_one_connected_one_unconnected_with_delay
void test_send_one_connected_one_unconnected_with_delay()
Definition: test_connect_delay_tipc.cpp:72
zmq_connect
ZMQ_EXPORT int zmq_connect(void *s_, const char *addr_)
Definition: zmq.cpp:307
testutil_unity.hpp
ZMQ_DEALER
#define ZMQ_DEALER
Definition: zmq.h:263
zmq_setsockopt
ZMQ_EXPORT int zmq_setsockopt(void *s_, int option_, const void *optval_, size_t optvallen_)
Definition: zmq.cpp:250
ZMQ_PUSH
#define ZMQ_PUSH
Definition: zmq.h:266
testutil.hpp
test_send_one_connected_one_unconnected
SETUP_TEARDOWN_TESTCONTEXT void test_send_one_connected_one_unconnected()
Definition: test_connect_delay_tipc.cpp:9
zmq_bind
ZMQ_EXPORT int zmq_bind(void *s_, const char *addr_)
Definition: zmq.cpp:299
timeout
GLbitfield GLuint64 timeout
Definition: glcorearb.h:3588
is_tipc_available
int is_tipc_available()
Definition: testutil.cpp:283
buffer
Definition: buffer_processor.h:43
test_context_socket
void * test_context_socket(int type_)
Definition: testutil_unity.cpp:200
ZMQ_DONTWAIT
#define ZMQ_DONTWAIT
Definition: zmq.h:358
SETTLE_TIME
#define SETTLE_TIME
Definition: libzmq/tests/testutil.hpp:31
TEST_ASSERT_EQUAL_INT
#define TEST_ASSERT_EQUAL_INT(expected, actual)
Definition: unity.h:128
i
int i
Definition: gmock-matchers_test.cc:764
expect_monitor_event
void expect_monitor_event(void *monitor_, int expected_event_)
Definition: testutil_monitoring.cpp:96
ZMQ_LINGER
#define ZMQ_LINGER
Definition: zmq.h:288
send_string_expect_success
void send_string_expect_success(void *socket_, const char *str_, int flags_)
Definition: testutil_unity.cpp:94
main
int main(void)
Definition: test_connect_delay_tipc.cpp:207
ZMQ_PAIR
#define ZMQ_PAIR
Definition: zmq.h:258
ZMQ_RCVTIMEO
#define ZMQ_RCVTIMEO
Definition: zmq.h:296
zmq_recv
ZMQ_EXPORT int zmq_recv(void *s_, void *buf_, size_t len_, int flags_)
Definition: zmq.cpp:487
ZMQ_EVENT_DISCONNECTED
#define ZMQ_EVENT_DISCONNECTED
Definition: zmq.h:410
recv_string_expect_success
void recv_string_expect_success(void *socket_, const char *str_, int flags_)
Definition: testutil_unity.cpp:101
test_send_disconnected_with_delay
void test_send_disconnected_with_delay()
Definition: test_connect_delay_tipc.cpp:139
UNITY_BEGIN
UNITY_BEGIN()
val
GLuint GLfloat * val
Definition: glcorearb.h:3604
zmq_send
ZMQ_EXPORT int zmq_send(void *s_, const void *buf_, size_t len_, int flags_)
Definition: zmq.cpp:377
ZMQ_PULL
#define ZMQ_PULL
Definition: zmq.h:265
test_context_socket_close
void * test_context_socket_close(void *socket_)
Definition: testutil_unity.cpp:208
TEST_ASSERT_SUCCESS_ERRNO
#define TEST_ASSERT_SUCCESS_ERRNO(expr)
Definition: proxy_thr.cpp:47


libaditof
Author(s):
autogenerated on Wed May 21 2025 02:06:59