test_mock_pub_sub.cpp
Go to the documentation of this file.
1 /* SPDX-License-Identifier: MPL-2.0 */
2 
3 #include "testutil.hpp"
4 #include "testutil_unity.hpp"
5 
6 #include <stdlib.h>
7 #include <string.h>
8 
10 
11 // Read one event off the monitor socket; return value and address
12 // by reference, if not null, and event number by value. Returns -1
13 // in case of error.
14 
15 static int get_monitor_event (void *monitor_)
16 {
17  for (int i = 0; i < 2; i++) {
18  // First frame in message contains event number and value
19  zmq_msg_t msg;
21  if (zmq_msg_recv (&msg, monitor_, ZMQ_DONTWAIT) == -1) {
23  continue; // Interrupted, presumably
24  }
26 
27  uint8_t *data = static_cast<uint8_t *> (zmq_msg_data (&msg));
28  uint16_t event = *reinterpret_cast<uint16_t *> (data);
29 
30  // Second frame in message contains event address
32  if (zmq_msg_recv (&msg, monitor_, 0) == -1) {
33  return -1; // Interrupted, presumably
34  }
36 
37  return event;
38  }
39  return -1;
40 }
41 
42 static void recv_with_retry (fd_t fd_, char *buffer_, int bytes_)
43 {
44  int received = 0;
45  while (true) {
47  recv (fd_, buffer_ + received, bytes_ - received, 0));
49  received += rc;
50  TEST_ASSERT_LESS_OR_EQUAL_INT (bytes_, received);
51  if (received == bytes_)
52  break;
53  }
54 }
55 
56 static void mock_handshake (fd_t fd_, bool sub_command, bool mock_pub)
57 {
58  char buffer[128];
59  memset (buffer, 0, sizeof (buffer));
60  memcpy (buffer, zmtp_greeting_null, sizeof (zmtp_greeting_null));
61 
62  // Mock ZMTP 3.1 which uses commands
63  if (sub_command) {
64  buffer[11] = 1;
65  }
66  int rc = TEST_ASSERT_SUCCESS_RAW_ERRNO (send (fd_, buffer, 64, 0));
67  TEST_ASSERT_EQUAL_INT (64, rc);
68 
69  recv_with_retry (fd_, buffer, 64);
70 
71  if (!mock_pub) {
73  fd_, (const char *) zmtp_ready_sub, sizeof (zmtp_ready_sub), 0));
75  } else {
77  fd_, (const char *) zmtp_ready_xpub, sizeof (zmtp_ready_xpub), 0));
79  }
80 
81  // greeting - XPUB has one extra byte
82  memset (buffer, 0, sizeof (buffer));
83  recv_with_retry (fd_, buffer,
84  mock_pub ? sizeof (zmtp_ready_sub)
85  : sizeof (zmtp_ready_xpub));
86 }
87 
88 static void prep_server_socket (void **server_out_,
89  void **mon_out_,
90  char *endpoint_,
91  size_t ep_length_,
92  int socket_type)
93 {
94  // We'll be using this socket in raw mode
96 
97  int value = 0;
99  zmq_setsockopt (server, ZMQ_LINGER, &value, sizeof (value)));
100 
101  bind_loopback_ipv4 (server, endpoint_, ep_length_);
102 
103  // Create and connect a socket for collecting monitor events on xpub
105 
107  server, "inproc://monitor-dealer",
109 
110  // Connect to the inproc endpoint so we'll get events
112  zmq_connect (server_mon, "inproc://monitor-dealer"));
113 
114  *server_out_ = server;
115  *mon_out_ = server_mon;
116 }
117 
118 static void test_mock_pub_sub (bool sub_command_, bool mock_pub_)
119 {
120  int rc;
122 
123  void *server, *server_mon;
125  mock_pub_ ? ZMQ_SUB : ZMQ_XPUB);
126 
128 
129  // Mock a ZMTP 3 client so we can forcibly try sub commands
130  mock_handshake (s, sub_command_, mock_pub_);
131 
132  // By now everything should report as connected
135 
136  char buffer[32];
137  memset (buffer, 0, sizeof (buffer));
138 
139  if (mock_pub_) {
140  rc = zmq_setsockopt (server, ZMQ_SUBSCRIBE, "A", 1);
141  TEST_ASSERT_EQUAL_INT (0, rc);
142  // SUB binds, let its state machine run
143  // Because zeromq attach the pipe after the handshake, we need more time here before we can run the state-machine
144  msleep (1);
146 
147  if (sub_command_) {
148  recv_with_retry (s, buffer, 13);
150  memcmp (buffer, "\4\xb\x9SUBSCRIBEA", 13));
151  } else {
152  recv_with_retry (s, buffer, 4);
153  TEST_ASSERT_EQUAL_INT (0, memcmp (buffer, "\0\2\1A", 4));
154  }
155 
156  memcpy (buffer, "\0\4ALOL", 6);
157  rc = TEST_ASSERT_SUCCESS_RAW_ERRNO (send (s, buffer, 6, 0));
158  TEST_ASSERT_EQUAL_INT (6, rc);
159 
160  memset (buffer, 0, sizeof (buffer));
161  rc = zmq_recv (server, buffer, 4, 0);
162  TEST_ASSERT_EQUAL_INT (4, rc);
163  TEST_ASSERT_EQUAL_INT (0, memcmp (buffer, "ALOL", 4));
164  } else {
165  if (sub_command_) {
166  const uint8_t sub[13] = {4, 11, 9, 'S', 'U', 'B', 'S',
167  'C', 'R', 'I', 'B', 'E', 'A'};
169  send (s, (const char *) sub, 13, 0));
170  TEST_ASSERT_EQUAL_INT (13, rc);
171  } else {
172  const uint8_t sub[4] = {0, 2, 1, 'A'};
174  send (s, (const char *) sub, 4, 0));
175  TEST_ASSERT_EQUAL_INT (4, rc);
176  }
177  rc = zmq_recv (server, buffer, 2, 0);
178  TEST_ASSERT_EQUAL_INT (2, rc);
179  TEST_ASSERT_EQUAL_INT (0, memcmp (buffer, "\1A", 2));
180 
181  rc = zmq_send (server, "ALOL", 4, 0);
182  TEST_ASSERT_EQUAL_INT (4, rc);
183 
184  memset (buffer, 0, sizeof (buffer));
185  recv_with_retry (s, buffer, 6);
186  TEST_ASSERT_EQUAL_INT (0, memcmp (buffer, "\0\4ALOL", 6));
187  }
188 
189  close (s);
190 
193 }
194 
196 {
197  test_mock_pub_sub (true, false);
198 }
199 
201 {
202  test_mock_pub_sub (false, false);
203 }
204 
206 {
207  test_mock_pub_sub (true, true);
208 }
209 
211 {
212  test_mock_pub_sub (false, true);
213 }
214 
215 int main (void)
216 {
218 
219  UNITY_BEGIN ();
220 
225 
226  return UNITY_END ();
227 }
TEST_ASSERT_LESS_OR_EQUAL_INT
#define TEST_ASSERT_LESS_OR_EQUAL_INT(threshold, actual)
Definition: unity.h:201
UNITY_END
return UNITY_END()
ZMQ_XPUB
#define ZMQ_XPUB
Definition: zmq.h:267
msleep
void msleep(int milliseconds_)
Definition: testutil.cpp:227
zmq_socket_monitor
ZMQ_EXPORT int zmq_socket_monitor(void *s_, const char *addr_, int events_)
Definition: zmq.cpp:278
TEST_ASSERT_TRUE
#define TEST_ASSERT_TRUE(condition)
Definition: unity.h:121
s
XmlRpcServer s
RUN_TEST
#define RUN_TEST(func)
Definition: unity_internals.h:615
SETUP_TEARDOWN_TESTCONTEXT
#define SETUP_TEARDOWN_TESTCONTEXT
Definition: testutil_unity.hpp:172
prep_server_socket
static void prep_server_socket(void **server_out_, void **mon_out_, char *endpoint_, size_t ep_length_, int socket_type)
Definition: test_mock_pub_sub.cpp:88
ZMQ_SUBSCRIBE
#define ZMQ_SUBSCRIBE
Definition: zmq.h:278
bind_loopback_ipv4
void bind_loopback_ipv4(void *socket_, char *my_endpoint_, size_t len_)
Definition: testutil_unity.cpp:246
mock_handshake
static void mock_handshake(fd_t fd_, bool sub_command, bool mock_pub)
Definition: test_mock_pub_sub.cpp:56
ZMQ_SUB
#define ZMQ_SUB
Definition: zmq.h:260
send
void send(fd_t fd_, const char(&data_)[N])
Definition: test_security_curve.cpp:209
TEST_ASSERT_FALSE
#define TEST_ASSERT_FALSE(condition)
Definition: unity.h:123
zmq_msg_data
ZMQ_EXPORT void * zmq_msg_data(zmq_msg_t *msg_)
Definition: zmq.cpp:642
zmq_connect
ZMQ_EXPORT int zmq_connect(void *s_, const char *addr_)
Definition: zmq.cpp:307
TEST_ASSERT_GREATER_THAN_INT
#define TEST_ASSERT_GREATER_THAN_INT(threshold, actual)
Definition: unity.h:153
testutil_unity.hpp
zmq_setsockopt
ZMQ_EXPORT int zmq_setsockopt(void *s_, int option_, const void *optval_, size_t optvallen_)
Definition: zmq.cpp:250
ZMQ_EVENT_CONNECTED
#define ZMQ_EVENT_CONNECTED
Definition: zmq.h:401
event
struct _cl_event * event
Definition: glcorearb.h:4163
TEST_ASSERT_SUCCESS_RAW_ERRNO
#define TEST_ASSERT_SUCCESS_RAW_ERRNO(expr)
Definition: testutil_unity.hpp:69
testutil.hpp
main
int main(void)
Definition: test_mock_pub_sub.cpp:215
zmq_msg_t
Definition: zmq.h:218
connect_socket
fd_t connect_socket(const char *endpoint_, const int af_, const int protocol_)
Definition: testutil.cpp:353
my_endpoint
char my_endpoint[MAX_SOCKET_STRING]
Definition: test_security_curve.cpp:31
MAX_SOCKET_STRING
#define MAX_SOCKET_STRING
Definition: libzmq/tests/testutil.hpp:35
zmq_msg_recv
ZMQ_EXPORT int zmq_msg_recv(zmq_msg_t *msg_, void *s_, int flags_)
Definition: zmq.cpp:617
buffer
Definition: buffer_processor.h:43
test_context_socket
void * test_context_socket(int type_)
Definition: testutil_unity.cpp:200
ZMQ_DONTWAIT
#define ZMQ_DONTWAIT
Definition: zmq.h:358
SETTLE_TIME
#define SETTLE_TIME
Definition: libzmq/tests/testutil.hpp:31
server
void * server
Definition: test_security_curve.cpp:29
test_mock_pub_legacy
void test_mock_pub_legacy()
Definition: test_mock_pub_sub.cpp:210
test_mock_pub_command
void test_mock_pub_command()
Definition: test_mock_pub_sub.cpp:205
TEST_ASSERT_EQUAL_INT
#define TEST_ASSERT_EQUAL_INT(expected, actual)
Definition: unity.h:128
zmq_msg_init
ZMQ_EXPORT int zmq_msg_init(zmq_msg_t *msg_)
Definition: zmq.cpp:587
zmtp_greeting_null
const uint8_t zmtp_greeting_null[64]
Definition: libzmq/tests/testutil.hpp:48
test_mock_pub_sub
static void test_mock_pub_sub(bool sub_command_, bool mock_pub_)
Definition: test_mock_pub_sub.cpp:118
i
int i
Definition: gmock-matchers_test.cc:764
socket_type
int socket_type
ZMQ_LINGER
#define ZMQ_LINGER
Definition: zmq.h:288
server_mon
void * server_mon
Definition: test_security_curve.cpp:30
ZMQ_PAIR
#define ZMQ_PAIR
Definition: zmq.h:258
zmq_recv
ZMQ_EXPORT int zmq_recv(void *s_, void *buf_, size_t len_, int flags_)
Definition: zmq.cpp:487
ZMQ_EVENT_ACCEPTED
#define ZMQ_EVENT_ACCEPTED
Definition: zmq.h:406
zmtp_ready_xpub
const uint8_t zmtp_ready_xpub[28]
Definition: libzmq/tests/testutil.hpp:61
ZMQ_EVENT_DISCONNECTED
#define ZMQ_EVENT_DISCONNECTED
Definition: zmq.h:410
setup_test_environment
void setup_test_environment(int timeout_seconds_)
Definition: testutil.cpp:201
UNITY_BEGIN
UNITY_BEGIN()
fd_t
zmq_fd_t fd_t
Definition: libzmq/tests/testutil.hpp:98
get_monitor_event
static SETUP_TEARDOWN_TESTCONTEXT int get_monitor_event(void *monitor_)
Definition: test_mock_pub_sub.cpp:15
data
GLint GLenum GLsizei GLsizei GLsizei GLint GLsizei const GLvoid * data
Definition: glcorearb.h:2879
zmtp_ready_sub
const uint8_t zmtp_ready_sub[27]
Definition: libzmq/tests/testutil.hpp:64
buffer_
static uint8 buffer_[kBufferSize]
Definition: coded_stream_unittest.cc:136
value
GLsizei const GLfloat * value
Definition: glcorearb.h:3093
zmq_send
ZMQ_EXPORT int zmq_send(void *s_, const void *buf_, size_t len_, int flags_)
Definition: zmq.cpp:377
test_mock_sub_legacy
void test_mock_sub_legacy()
Definition: test_mock_pub_sub.cpp:200
zmq_msg_more
ZMQ_EXPORT int zmq_msg_more(const zmq_msg_t *msg_)
Definition: zmq.cpp:652
test_context_socket_close
void * test_context_socket_close(void *socket_)
Definition: testutil_unity.cpp:208
TEST_ASSERT_SUCCESS_ERRNO
#define TEST_ASSERT_SUCCESS_ERRNO(expr)
Definition: proxy_thr.cpp:47
recv_with_retry
static void recv_with_retry(fd_t fd_, char *buffer_, int bytes_)
Definition: test_mock_pub_sub.cpp:42
test_mock_sub_command
void test_mock_sub_command()
Definition: test_mock_pub_sub.cpp:195


libaditof
Author(s):
autogenerated on Wed May 21 2025 02:06:59