test_inproc_connect.cpp
Go to the documentation of this file.
1 /* SPDX-License-Identifier: MPL-2.0 */
2 
3 #include "testutil.hpp"
4 #include "testutil_unity.hpp"
5 
7 
8 static void pusher (void * /*unused*/)
9 {
10  // Connect first
11  // do not use test_context_socket here, as it is not thread-safe
13 
15 
16  // Queue up some data
18 
19  // Cleanup
21 }
22 
23 static void simult_conn (void *endpt_)
24 {
25  // Pull out arguments - endpoint string
26  const char *endpt = static_cast<const char *> (endpt_);
27 
28  // Connect
29  // do not use test_context_socket here, as it is not thread-safe
33 
34  // Cleanup
36 }
37 
38 static void simult_bind (void *endpt_)
39 {
40  // Pull out arguments - context followed by endpoint string
41  const char *endpt = static_cast<const char *> (endpt_);
42 
43  // Bind
44  // do not use test_context_socket here, as it is not thread-safe
45  void *bind_socket = zmq_socket (get_test_context (), ZMQ_PAIR);
46  TEST_ASSERT_SUCCESS_ERRNO (zmq_bind (bind_socket, endpt));
47  send_string_expect_success (bind_socket, "foobar", 0);
48 
49  // Cleanup
50  TEST_ASSERT_SUCCESS_ERRNO (zmq_close (bind_socket));
51 }
52 
54 {
55  // Bind first
56  void *bind_socket = test_context_socket (ZMQ_PAIR);
57  TEST_ASSERT_SUCCESS_ERRNO (zmq_bind (bind_socket, "inproc://bbc"));
58 
59  // Now connect
62 
63  // Queue up some data
65 
66  // Read pending message
67  recv_string_expect_success (bind_socket, "foobar", 0);
68 
69  // Cleanup
71  test_context_socket_close (bind_socket);
72 }
73 
75 {
76  // Connect first
79 
80  // Queue up some data
82 
83  // Now bind
84  void *bind_socket = test_context_socket (ZMQ_PAIR);
85  TEST_ASSERT_SUCCESS_ERRNO (zmq_bind (bind_socket, "inproc://cbb"));
86 
87  // Read pending message
88  recv_string_expect_success (bind_socket, "foobar", 0);
89 
90  // Cleanup
92  test_context_socket_close (bind_socket);
93 }
94 
96 {
97  // Connect first
100 
101  // Queue up some data, this will be dropped
103 
104  // Now bind
105  void *bind_socket = test_context_socket (ZMQ_SUB);
107  zmq_setsockopt (bind_socket, ZMQ_SUBSCRIBE, "", 0));
108  TEST_ASSERT_SUCCESS_ERRNO (zmq_bind (bind_socket, "inproc://cbbps"));
109 
110  // Wait for pub-sub connection to happen
112 
113  // Queue up some data, this not will be dropped
115 
116  // Read pending message
117  recv_string_expect_success (bind_socket, "after", 0);
118 
119  // Cleanup
121  test_context_socket_close (bind_socket);
122 }
123 
125 {
126  for (int i = 0; i < 20; ++i) {
127  // Connect first
129 
130  char ep[32];
131  snprintf (ep, 32 * sizeof (char), "inproc://cbbrr%d", i);
133 
134  // Cleanup
136  }
137 }
138 
140 {
141  const unsigned int no_of_connects = 10;
142 
143  void *connect_socket[no_of_connects];
144 
145  // Connect first
146  for (unsigned int i = 0; i < no_of_connects; ++i) {
149  zmq_connect (connect_socket[i], "inproc://multiple"));
150 
151  // Queue up some data
153  }
154 
155  // Now bind
156  void *bind_socket = test_context_socket (ZMQ_PULL);
157  TEST_ASSERT_SUCCESS_ERRNO (zmq_bind (bind_socket, "inproc://multiple"));
158 
159  for (unsigned int i = 0; i < no_of_connects; ++i) {
160  recv_string_expect_success (bind_socket, "foobar", 0);
161  }
162 
163  // Cleanup
164  for (unsigned int i = 0; i < no_of_connects; ++i) {
166  }
167 
168  test_context_socket_close (bind_socket);
169 }
170 
172 {
173  const unsigned int no_of_threads = 30;
174 
175  void *threads[no_of_threads];
176 
177  // Connect first
178  for (unsigned int i = 0; i < no_of_threads; ++i) {
179  threads[i] = zmq_threadstart (&pusher, NULL);
180  }
181 
182  // Now bind
183  void *bind_socket = test_context_socket (ZMQ_PULL);
184  TEST_ASSERT_SUCCESS_ERRNO (zmq_bind (bind_socket, "inproc://sink"));
185 
186  for (unsigned int i = 0; i < no_of_threads; ++i) {
187  // Read pending message
188  recv_string_expect_success (bind_socket, "foobar", 0);
189  }
190 
191  // Cleanup
192  for (unsigned int i = 0; i < no_of_threads; ++i) {
193  zmq_threadclose (threads[i]);
194  }
195 
196  test_context_socket_close (bind_socket);
197 }
198 
200 {
201  const unsigned int no_of_times = 50;
202  void *threads[no_of_times * 2];
203  void *thr_args[no_of_times];
204  char endpts[no_of_times][20];
205 
206  // Set up thread arguments: context followed by endpoint string
207  for (unsigned int i = 0; i < no_of_times; ++i) {
208  thr_args[i] = (void *) endpts[i];
209  snprintf (endpts[i], 20 * sizeof (char), "inproc://foo_%d", i);
210  }
211 
212  // Spawn all threads as simultaneously as possible
213  for (unsigned int i = 0; i < no_of_times; ++i) {
214  threads[i * 2 + 0] = zmq_threadstart (&simult_conn, thr_args[i]);
215  threads[i * 2 + 1] = zmq_threadstart (&simult_bind, thr_args[i]);
216  }
217 
218  // Close all threads
219  for (unsigned int i = 0; i < no_of_times; ++i) {
220  zmq_threadclose (threads[i * 2 + 0]);
221  zmq_threadclose (threads[i * 2 + 1]);
222  }
223 }
224 
226 {
227  // Create the infrastructure
229 
230  TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (sc, "inproc://routing_id"));
231 
233 
234  TEST_ASSERT_SUCCESS_ERRNO (zmq_bind (sb, "inproc://routing_id"));
235 
236  // Send 2-part message.
240  1, TEST_ASSERT_SUCCESS_ERRNO (zmq_send (sc, "B", 1, 0)));
241 
242  // Routing id comes first.
243  zmq_msg_t msg;
247 
248  // Then the first part of the message body.
250  1, TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_recv (&msg, sb, 0)));
252 
253  // And finally, the second part of the message body.
255  1, TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_recv (&msg, sb, 0)));
257 
258  // Deallocate the infrastructure.
261 }
262 
264 {
267 
269 }
270 
271 
272 void test_unbind ()
273 {
274  // Bind and unbind socket 1
275  void *bind_socket1 = test_context_socket (ZMQ_PAIR);
276  TEST_ASSERT_SUCCESS_ERRNO (zmq_bind (bind_socket1, "inproc://unbind"));
277  TEST_ASSERT_SUCCESS_ERRNO (zmq_unbind (bind_socket1, "inproc://unbind"));
278 
279  // Bind socket 2
280  void *bind_socket2 = test_context_socket (ZMQ_PAIR);
281  TEST_ASSERT_SUCCESS_ERRNO (zmq_bind (bind_socket2, "inproc://unbind"));
282 
283  // Now connect
286 
287  // Queue up some data
289 
290  // Read pending message
291  recv_string_expect_success (bind_socket2, "foobar", 0);
292 
293  // Cleanup
295  test_context_socket_close (bind_socket1);
296  test_context_socket_close (bind_socket2);
297 }
298 
300 {
301  // Connect first
304 
306 
307  // Cleanup
309 }
310 
311 int main (void)
312 {
314 
315  UNITY_BEGIN ();
327  return UNITY_END ();
328 }
zmq_ctx_shutdown
ZMQ_EXPORT int zmq_ctx_shutdown(void *context_)
Definition: zmq.cpp:147
NULL
NULL
Definition: test_security_zap.cpp:405
UNITY_END
return UNITY_END()
ZMQ_PUB
#define ZMQ_PUB
Definition: zmq.h:259
zmq_threadstart
ZMQ_EXPORT void * zmq_threadstart(zmq_thread_fn *func_, void *arg_)
Definition: zmq_utils.cpp:54
msleep
void msleep(int milliseconds_)
Definition: testutil.cpp:227
zmq_unbind
ZMQ_EXPORT int zmq_unbind(void *s_, const char *addr_)
Definition: zmq.cpp:337
RUN_TEST
#define RUN_TEST(func)
Definition: unity_internals.h:615
SETUP_TEARDOWN_TESTCONTEXT
#define SETUP_TEARDOWN_TESTCONTEXT
Definition: testutil_unity.hpp:172
ZMQ_SUBSCRIBE
#define ZMQ_SUBSCRIBE
Definition: zmq.h:278
get_test_context
void * get_test_context()
Definition: testutil_unity.cpp:184
simult_bind
static void simult_bind(void *endpt_)
Definition: test_inproc_connect.cpp:38
ZMQ_SUB
#define ZMQ_SUB
Definition: zmq.h:260
zmq_connect
ZMQ_EXPORT int zmq_connect(void *s_, const char *addr_)
Definition: zmq.cpp:307
test_multiple_threads
void test_multiple_threads()
Definition: test_inproc_connect.cpp:171
test_simultaneous_connect_bind_threads
void test_simultaneous_connect_bind_threads()
Definition: test_inproc_connect.cpp:199
test_unbind
void test_unbind()
Definition: test_inproc_connect.cpp:272
test_connect_before_bind_ctx_term
void test_connect_before_bind_ctx_term()
Definition: test_inproc_connect.cpp:124
testutil_unity.hpp
snprintf
int snprintf(char *str, size_t size, const char *format,...)
Definition: port.cc:64
ZMQ_DEALER
#define ZMQ_DEALER
Definition: zmq.h:263
zmq_setsockopt
ZMQ_EXPORT int zmq_setsockopt(void *s_, int option_, const void *optval_, size_t optvallen_)
Definition: zmq.cpp:250
zmq_threadclose
ZMQ_EXPORT void zmq_threadclose(void *thread_)
Definition: zmq_utils.cpp:62
main
int main(void)
Definition: test_inproc_connect.cpp:311
ZMQ_PUSH
#define ZMQ_PUSH
Definition: zmq.h:266
test_connect_before_bind_pub_sub
void test_connect_before_bind_pub_sub()
Definition: test_inproc_connect.cpp:95
sb
void * sb
Definition: test_channel.cpp:8
testutil.hpp
ZMQ_ROUTER
#define ZMQ_ROUTER
Definition: zmq.h:264
test_multiple_connects
void test_multiple_connects()
Definition: test_inproc_connect.cpp:139
zmq_msg_t
Definition: zmq.h:218
connect_socket
fd_t connect_socket(const char *endpoint_, const int af_, const int protocol_)
Definition: testutil.cpp:353
simult_conn
static void simult_conn(void *endpt_)
Definition: test_inproc_connect.cpp:23
zmq_bind
ZMQ_EXPORT int zmq_bind(void *s_, const char *addr_)
Definition: zmq.cpp:299
zmq_msg_recv
ZMQ_EXPORT int zmq_msg_recv(zmq_msg_t *msg_, void *s_, int flags_)
Definition: zmq.cpp:617
zmq_socket
ZMQ_EXPORT void * zmq_socket(void *, int type_)
Definition: zmq.cpp:230
sc
void * sc
Definition: test_channel.cpp:9
test_context_socket
void * test_context_socket(int type_)
Definition: testutil_unity.cpp:200
SETTLE_TIME
#define SETTLE_TIME
Definition: libzmq/tests/testutil.hpp:31
TEST_ASSERT_EQUAL_INT
#define TEST_ASSERT_EQUAL_INT(expected, actual)
Definition: unity.h:128
zmq_msg_init
ZMQ_EXPORT int zmq_msg_init(zmq_msg_t *msg_)
Definition: zmq.cpp:587
zmq_close
ZMQ_EXPORT int zmq_close(void *s_)
Definition: zmq.cpp:241
pusher
static SETUP_TEARDOWN_TESTCONTEXT void pusher(void *)
Definition: test_inproc_connect.cpp:8
i
int i
Definition: gmock-matchers_test.cc:764
test_bind_before_connect
void test_bind_before_connect()
Definition: test_inproc_connect.cpp:53
send_string_expect_success
void send_string_expect_success(void *socket_, const char *str_, int flags_)
Definition: testutil_unity.cpp:94
ZMQ_PAIR
#define ZMQ_PAIR
Definition: zmq.h:258
recv_string_expect_success
void recv_string_expect_success(void *socket_, const char *str_, int flags_)
Definition: testutil_unity.cpp:101
setup_test_environment
void setup_test_environment(int timeout_seconds_)
Definition: testutil.cpp:201
UNITY_BEGIN
UNITY_BEGIN()
ZMQ_SNDMORE
#define ZMQ_SNDMORE
Definition: zmq.h:359
zmq_send
ZMQ_EXPORT int zmq_send(void *s_, const void *buf_, size_t len_, int flags_)
Definition: zmq.cpp:377
test_connect_only
void test_connect_only()
Definition: test_inproc_connect.cpp:263
ZMQ_PULL
#define ZMQ_PULL
Definition: zmq.h:265
zmq_msg_more
ZMQ_EXPORT int zmq_msg_more(const zmq_msg_t *msg_)
Definition: zmq.cpp:652
test_context_socket_close
void * test_context_socket_close(void *socket_)
Definition: testutil_unity.cpp:208
TEST_ASSERT_SUCCESS_ERRNO
#define TEST_ASSERT_SUCCESS_ERRNO(expr)
Definition: proxy_thr.cpp:47
test_routing_id
void test_routing_id()
Definition: test_inproc_connect.cpp:225
ep
const SETUP_TEARDOWN_TESTCONTEXT char ep[]
Definition: test_term_endpoint_tipc.cpp:8
test_shutdown_during_pend
void test_shutdown_during_pend()
Definition: test_inproc_connect.cpp:299
test_connect_before_bind
void test_connect_before_bind()
Definition: test_inproc_connect.cpp:74


libaditof
Author(s):
autogenerated on Wed May 21 2025 02:06:59