test_stream.cpp
Go to the documentation of this file.
1 /* SPDX-License-Identifier: MPL-2.0 */
2 
3 #include "testutil.hpp"
4 #include "testutil_unity.hpp"
5 
6 #include <string.h>
7 
9 
10 // ZMTP protocol greeting structure
11 
12 typedef uint8_t byte;
13 typedef struct
14 {
15  byte signature[10]; // 0xFF 8*0x00 0x7F
16  byte version[2]; // 0x03 0x01 for ZMTP/3.1
17  byte mechanism[20]; // "NULL"
18  byte as_server;
19  byte filler[31];
21 
22 #define ZMTP_DEALER 5 // Socket type constants
23 
24 // This is a greeting matching what 0MQ will send us; note the
25 // 8-byte size is set to 1 for backwards compatibility
26 
28  {0xFF, 0, 0, 0, 0, 0, 0, 0, 1, 0x7F}, {3, 1}, {'N', 'U', 'L', 'L'}, 0, {0}};
29 
30 static void test_stream_to_dealer ()
31 {
32  int rc;
34 
35  // We'll be using this socket in raw mode
37 
38  int zero = 0;
40  zmq_setsockopt (stream, ZMQ_LINGER, &zero, sizeof (zero)));
41  int enabled = 1;
45 
46  // We'll be using this socket as the other peer
47  void *dealer = test_context_socket (ZMQ_DEALER);
49  zmq_setsockopt (dealer, ZMQ_LINGER, &zero, sizeof (zero)));
51 
52  // Send a message on the dealer socket
53  send_string_expect_success (dealer, "Hello", 0);
54 
55  // Connecting sends a zero message
56  // First frame is routing id
57  zmq_msg_t routing_id;
59  TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_recv (&routing_id, stream, 0));
60  TEST_ASSERT_TRUE (zmq_msg_more (&routing_id));
61 
62  // Verify the existence of Peer-Address metadata
63  char const *peer_address = zmq_msg_gets (&routing_id, "Peer-Address");
64  TEST_ASSERT_NOT_NULL (peer_address);
65  TEST_ASSERT_EQUAL_STRING ("127.0.0.1", peer_address);
66 
67  // Second frame is zero
68  byte buffer[255];
71 
72  // Verify the existence of Peer-Address metadata
73  peer_address = zmq_msg_gets (&routing_id, "Peer-Address");
74  TEST_ASSERT_NOT_NULL (peer_address);
75  TEST_ASSERT_EQUAL_STRING ("127.0.0.1", peer_address);
76 
77  // Real data follows
78  // First frame is routing id
79  TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_recv (&routing_id, stream, 0));
80  TEST_ASSERT_TRUE (zmq_msg_more (&routing_id));
81 
82  // Verify the existence of Peer-Address metadata
83  peer_address = zmq_msg_gets (&routing_id, "Peer-Address");
84  TEST_ASSERT_NOT_NULL (peer_address);
85  TEST_ASSERT_EQUAL_STRING ("127.0.0.1", peer_address);
86 
87  // Second frame is greeting signature
89 
90  // Send our own protocol greeting
94  zmq_send (stream, &greeting, sizeof (greeting), 0)));
95 
96  // Now we expect the data from the DEALER socket
97  // We want the rest of greeting along with the Ready command
98  int bytes_read = 0;
99  while (bytes_read < 97) {
100  // First frame is the routing id of the connection (each time)
102  0, TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_recv (&routing_id, stream, 0)));
103  TEST_ASSERT_TRUE (zmq_msg_more (&routing_id));
104  // Second frame contains the next chunk of data
106  rc = zmq_recv (stream, buffer + bytes_read, 255 - bytes_read, 0));
107  bytes_read += rc;
108  }
109 
110  // First two bytes are major and minor version numbers.
111  TEST_ASSERT_EQUAL_INT (3, buffer[0]); // ZMTP/3.1
113 
114  // Mechanism is "NULL"
116  "NULL\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0", 20);
117  TEST_ASSERT_EQUAL_INT8_ARRAY (buffer + 54, "\4\51\5READY", 8);
118  TEST_ASSERT_EQUAL_INT8_ARRAY (buffer + 62, "\13Socket-Type\0\0\0\6DEALER",
119  22);
120  TEST_ASSERT_EQUAL_INT8_ARRAY (buffer + 84, "\10Identity\0\0\0\0", 13);
121 
122  // Announce we are ready
123  memcpy (buffer, "\4\51\5READY", 8);
124  memcpy (buffer + 8, "\13Socket-Type\0\0\0\6ROUTER", 22);
125  memcpy (buffer + 30, "\10Identity\0\0\0\0", 13);
126 
127  // Send Ready command
129  &routing_id, stream, ZMQ_SNDMORE)));
132 
133  // Now we expect the data from the DEALER socket
134  // First frame is, again, the routing id of the connection
136  0, TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_recv (&routing_id, stream, 0)));
137  TEST_ASSERT_TRUE (zmq_msg_more (&routing_id));
138 
139  // Third frame contains Hello message from DEALER
141  zmq_recv (stream, buffer, sizeof buffer, 0)));
142 
143  // Then we have a 5-byte message "Hello"
144  TEST_ASSERT_EQUAL_INT (0, buffer[0]); // Flags = 0
145  TEST_ASSERT_EQUAL_INT (5, buffer[1]); // Size = 5
146  TEST_ASSERT_EQUAL_INT8_ARRAY (buffer + 2, "Hello", 5);
147 
148  // Send "World" back to DEALER
150  &routing_id, stream, ZMQ_SNDMORE)));
151  byte world[] = {0, 5, 'W', 'o', 'r', 'l', 'd'};
153  sizeof (world),
154  TEST_ASSERT_SUCCESS_ERRNO (zmq_send (stream, world, sizeof (world), 0)));
155 
156  // Expect response on DEALER socket
157  recv_string_expect_success (dealer, "World", 0);
158 
159  // Test large messages over STREAM socket
160 #define size 64000
161  uint8_t msgout[size];
162  memset (msgout, 0xAB, size);
163  zmq_send (dealer, msgout, size, 0);
164 
165  uint8_t msgin[9 + size];
166  memset (msgin, 0, 9 + size);
167  bytes_read = 0;
168  while (bytes_read < 9 + size) {
169  // Get routing id frame
172  // Get next chunk
174  0,
175  TEST_ASSERT_SUCCESS_ERRNO (rc = zmq_recv (stream, msgin + bytes_read,
176  9 + size - bytes_read, 0)));
177  bytes_read += rc;
178  }
179  for (int byte_nbr = 0; byte_nbr < size; byte_nbr++) {
180  TEST_ASSERT_EQUAL_UINT8 (0xAB, msgin[9 + byte_nbr]);
181  }
182  test_context_socket_close (dealer);
184 }
185 
186 
187 static void test_stream_to_stream ()
188 {
190  // Set-up our context and sockets
191 
193  int enabled = 1;
197 
202  uint8_t id[256];
203  uint8_t buffer[256];
204 
205  // Connecting sends a zero message
206  // Server: First frame is routing id, second frame is zero
208  0, TEST_ASSERT_SUCCESS_ERRNO (zmq_recv (server, id, 256, 0)));
211  // Client: First frame is routing id, second frame is zero
213  0, TEST_ASSERT_SUCCESS_ERRNO (zmq_recv (client, id, 256, 0)));
216 
217  // Sent HTTP request on client socket
218  // Get server routing id
219  size_t id_size = sizeof id;
221  zmq_getsockopt (client, ZMQ_ROUTING_ID, id, &id_size));
222  // First frame is server routing id
224  client, id, id_size, ZMQ_SNDMORE)));
225  // Second frame is HTTP GET request
227  7, TEST_ASSERT_SUCCESS_ERRNO (zmq_send (client, "GET /\n\n", 7, 0)));
228 
229  // Get HTTP request; ID frame and then request
231  0, TEST_ASSERT_SUCCESS_ERRNO (zmq_recv (server, id, 256, 0)));
233  TEST_ASSERT_EQUAL_INT8_ARRAY (buffer, "GET /\n\n", 7);
234 
235  // Send reply back to client
236  char http_response[] = "HTTP/1.0 200 OK\r\n"
237  "Content-Type: text/plain\r\n"
238  "\r\n"
239  "Hello, World!";
242  zmq_send (server, http_response, sizeof (http_response), ZMQ_SNDMORE));
243 
244  // Send zero to close connection to client
247 
248  // Get reply at client and check that it's complete
250  0, TEST_ASSERT_SUCCESS_ERRNO (zmq_recv (client, id, 256, 0)));
252  sizeof http_response,
254  TEST_ASSERT_EQUAL_INT8_ARRAY (buffer, http_response,
255  sizeof (http_response));
256 
257  // // Get disconnection notification
258  // FIXME: why does this block? Bug in STREAM disconnect notification?
259  // id_size = zmq_recv (client, id, 256, 0);
260  // assert (id_size > 0);
261  // rc = zmq_recv (client, buffer, 256, 0);
262  // assert (rc == 0);
263 
266 }
267 
268 int main ()
269 {
271 
272  UNITY_BEGIN ();
275  return UNITY_END ();
276 }
recv_array_expect_success
void recv_array_expect_success(void *socket_, const uint8_t(&array_)[SIZE], int flags_)
Definition: testutil_unity.hpp:148
TEST_ASSERT_EQUAL_STRING
#define TEST_ASSERT_EQUAL_STRING(expected, actual)
Definition: unity.h:235
stream
GLuint GLuint stream
Definition: glcorearb.h:3946
NULL
NULL
Definition: test_security_zap.cpp:405
UNITY_END
return UNITY_END()
ZMQ_STREAM
#define ZMQ_STREAM
Definition: zmq.h:269
TEST_ASSERT_TRUE
#define TEST_ASSERT_TRUE(condition)
Definition: unity.h:121
zmq_msg_send
ZMQ_EXPORT int zmq_msg_send(zmq_msg_t *msg_, void *s_, int flags_)
Definition: zmq.cpp:609
RUN_TEST
#define RUN_TEST(func)
Definition: unity_internals.h:615
SETUP_TEARDOWN_TESTCONTEXT
#define SETUP_TEARDOWN_TESTCONTEXT
Definition: testutil_unity.hpp:172
ZMQ_STREAM_NOTIFY
#define ZMQ_STREAM_NOTIFY
Definition: zmq.h:334
bind_loopback_ipv4
void bind_loopback_ipv4(void *socket_, char *my_endpoint_, size_t len_)
Definition: testutil_unity.cpp:246
client
void client(int num)
Definition: test_multithread.cpp:134
zmq_connect
ZMQ_EXPORT int zmq_connect(void *s_, const char *addr_)
Definition: zmq.cpp:307
TEST_ASSERT_EQUAL_INT8_ARRAY
#define TEST_ASSERT_EQUAL_INT8_ARRAY(expected, actual, num_elements)
Definition: unity.h:241
enabled
GLenum GLenum GLsizei const GLuint GLboolean enabled
Definition: glcorearb.h:4174
TEST_ASSERT_GREATER_THAN_INT
#define TEST_ASSERT_GREATER_THAN_INT(threshold, actual)
Definition: unity.h:153
main
int main()
Definition: test_stream.cpp:268
testutil_unity.hpp
zmq_msg_gets
const ZMQ_EXPORT char * zmq_msg_gets(const zmq_msg_t *msg_, const char *property_)
Definition: zmq.cpp:711
zmtp_greeting_t::as_server
byte as_server
Definition: test_stream.cpp:18
ZMQ_DEALER
#define ZMQ_DEALER
Definition: zmq.h:263
zmq_setsockopt
ZMQ_EXPORT int zmq_setsockopt(void *s_, int option_, const void *optval_, size_t optvallen_)
Definition: zmq.cpp:250
id
GLenum GLuint id
Definition: glcorearb.h:2695
testutil.hpp
zmq_msg_t
Definition: zmq.h:218
my_endpoint
char my_endpoint[MAX_SOCKET_STRING]
Definition: test_security_curve.cpp:31
MAX_SOCKET_STRING
#define MAX_SOCKET_STRING
Definition: libzmq/tests/testutil.hpp:35
size
#define size
Definition: glcorearb.h:2944
zmq_msg_recv
ZMQ_EXPORT int zmq_msg_recv(zmq_msg_t *msg_, void *s_, int flags_)
Definition: zmq.cpp:617
buffer
Definition: buffer_processor.h:43
byte
SETUP_TEARDOWN_TESTCONTEXT typedef uint8_t byte
Definition: test_stream.cpp:12
test_context_socket
void * test_context_socket(int type_)
Definition: testutil_unity.cpp:200
server
void * server
Definition: test_security_curve.cpp:29
zmtp_greeting_t::signature
byte signature[10]
Definition: test_stream.cpp:15
TEST_ASSERT_EQUAL_INT
#define TEST_ASSERT_EQUAL_INT(expected, actual)
Definition: unity.h:128
test_stream_to_stream
static void test_stream_to_stream()
Definition: test_stream.cpp:187
zmq_msg_init
ZMQ_EXPORT int zmq_msg_init(zmq_msg_t *msg_)
Definition: zmq.cpp:587
ZMQ_LINGER
#define ZMQ_LINGER
Definition: zmq.h:288
send_string_expect_success
void send_string_expect_success(void *socket_, const char *str_, int flags_)
Definition: testutil_unity.cpp:94
TEST_ASSERT_EQUAL_UINT8
#define TEST_ASSERT_EQUAL_UINT8(expected, actual)
Definition: unity.h:136
zmq_recv
ZMQ_EXPORT int zmq_recv(void *s_, void *buf_, size_t len_, int flags_)
Definition: zmq.cpp:487
size
GLsizeiptr size
Definition: glcorearb.h:2943
recv_string_expect_success
void recv_string_expect_success(void *socket_, const char *str_, int flags_)
Definition: testutil_unity.cpp:101
test_stream_to_dealer
static void test_stream_to_dealer()
Definition: test_stream.cpp:30
setup_test_environment
void setup_test_environment(int timeout_seconds_)
Definition: testutil.cpp:201
UNITY_BEGIN
UNITY_BEGIN()
zmtp_greeting_t
Definition: test_stream.cpp:13
ZMQ_SNDMORE
#define ZMQ_SNDMORE
Definition: zmq.h:359
ZMQ_ROUTING_ID
#define ZMQ_ROUTING_ID
Definition: zmq.h:277
greeting
static zmtp_greeting_t greeting
Definition: test_stream.cpp:27
zmq_send
ZMQ_EXPORT int zmq_send(void *s_, const void *buf_, size_t len_, int flags_)
Definition: zmq.cpp:377
version
static struct @0 version
zmq_msg_more
ZMQ_EXPORT int zmq_msg_more(const zmq_msg_t *msg_)
Definition: zmq.cpp:652
test_context_socket_close
void * test_context_socket_close(void *socket_)
Definition: testutil_unity.cpp:208
TEST_ASSERT_NOT_NULL
#define TEST_ASSERT_NOT_NULL(pointer)
Definition: unity.h:125
TEST_ASSERT_SUCCESS_ERRNO
#define TEST_ASSERT_SUCCESS_ERRNO(expr)
Definition: proxy_thr.cpp:47
zmq_getsockopt
ZMQ_EXPORT int zmq_getsockopt(void *s_, int option_, void *optval_, size_t *optvallen_)
Definition: zmq.cpp:261


libaditof
Author(s):
autogenerated on Wed May 21 2025 02:06:59