test_multithread.cpp
Go to the documentation of this file.
1 /*
2 Server thread listen ZMQ_SERVER socket and transfer incoming message
3 to worker threads by ZMQ_PUSH-ZMQ_PULL
4 Worker thread receive message and send back to ZMQ_SERVER socket
5 
6 Each client thread open CLIENT_CONNECTION ZMQ_CLIENT sockets,
7 send random size message to each socket and check server answer
8 */
9 
10 #define ZMQ_BUILD_DRAFT_API
11 #include "../../../../include/zmq.h"
12 
13 #pragma comment(lib,"libzmq.lib")
14 
15 #include <assert.h>
16 #include <stdlib.h>
17 #include <thread>
18 #include <atomic>
19 
20 #define SERVER_ADDR "tcp://127.0.0.1:12345"
21 #define SERVER_WORKER_COUNT 3 // worker threads count
22 
23 #define CLIENT_COUNT 5 // client threads count
24 #define CLIENT_CONNECTION 100 // ZMQ_CLIENT sockets at each client
25 #define CLIENT_RECONNECT 1000 // reconnect one socket after messages
26 
27 #define MESSAGE_MAX_SIZE 1024
28 
29 //*******************************************************************
30 //****** MESSAGE ****************************************************
31 //*******************************************************************
32 
33 void message_fill(zmq_msg_t* msg, int val) {
34  assert(val > 0);
35  int size = sizeof(int) * 2 + val;
36  int rc = zmq_msg_init_size(msg, size); assert(rc == 0);
37  uint8_t* data = (uint8_t*)zmq_msg_data(msg);
38  memcpy(data, &val, sizeof(int));
39  data += sizeof(int);
40  memset(data, val & 0xFF, val);
41  int check_sum = val + (val & 0xFF) * val;
42  data += val;
43  memcpy(data, &check_sum, sizeof(int));
44 }
45 
47  uint8_t* data = (uint8_t*)zmq_msg_data(msg);
48  int size = zmq_msg_size(msg);
49  assert(size > sizeof(int) * 2);
50  // check size
51  int val;
52  memcpy(&val, data, sizeof(int));
53  if(size != sizeof(int) * 2 + val) {
54  fprintf(stderr, "wrong message: val = %d size = %d\n", val, size);
55  return -1;
56  }
57  // check sum
58  data += sizeof(int);
59  int cs = val;
60  for(int i = 0; i < val; i++) {
61  cs += data[i];
62  }
63  data += val;
64  int check_sum;
65  memcpy(&check_sum, data, sizeof(int));
66  if(check_sum != cs) {
67  fprintf(stderr, "wrong message: cs = %d check_sum = %d\n", cs, check_sum);
68  return -1;
69  }
70  return val;
71 }
72 
73 //*******************************************************************
74 //****** SERVER *****************************************************
75 //*******************************************************************
76 
77 void *server_ctx = NULL;
78 void *server_sock = NULL;
79 
80 std::atomic<int> worker_cnt[SERVER_WORKER_COUNT] = {0}; // statistic
81 
82 // worker thread
83 void worker(int num) {
84  printf("worker %d start\n", num);
85  void* queue = zmq_socket(server_ctx, ZMQ_PULL); assert(queue);
86  int rc = zmq_connect(queue, "inproc://queue"); assert(rc == 0);
87 
88  while (1) {
89  // receive messages from the queue
90  zmq_msg_t msg;
91  rc = zmq_msg_init(&msg); assert(rc == 0);
92  rc = zmq_msg_recv(&msg, queue, 0); assert(rc > 0);
93  // check message
94  //printf("worker %d recv %d bytes at %X from %X\n", num, zmq_msg_size(&msg), zmq_msg_data(&msg), zmq_msg_routing_id(&msg));
95  // send to client
96  rc = zmq_msg_send(&msg, server_sock, 0); assert(rc != -1);
97  worker_cnt[num]++;
98  }
100 }
101 
102 // server thread
103 void server() {
104  server_ctx = zmq_ctx_new(); assert(server_ctx);
105  // create queue
106  void* queue = zmq_socket(server_ctx, ZMQ_PUSH); assert(queue);
107  int rc = zmq_bind(queue, "inproc://queue"); assert(rc == 0);
108  // start workers
109  std::thread w[SERVER_WORKER_COUNT];
110  for (int i = 0; i < SERVER_WORKER_COUNT; i++) w[i] = std::thread(worker, i);
111  // ZMQ_SERVER for client messages
113  rc = zmq_bind(server_sock, SERVER_ADDR); assert(rc == 0);
114 
115  while (1) {
116  // wait client message
117  zmq_msg_t msg;
118  rc = zmq_msg_init(&msg); assert(rc == 0);
119  rc = zmq_msg_recv(&msg, server_sock, 0); assert(rc > 0);
120  //printf("recv %d bytes at %X from %X\n", zmq_msg_size(&msg), zmq_msg_data(&msg), zmq_msg_routing_id(&msg));
121  // send message to queue
122  rc = zmq_msg_send(&msg, queue, 0); assert(rc > 0);
123  }
124 }
125 
126 //*******************************************************************
127 //****** CLIENT *****************************************************
128 //*******************************************************************
129 
130 std::atomic<int> client_cnt[CLIENT_COUNT] = { 0 }; // statistic
131 std::atomic<int> client_ready = 0;
132 
133 // client thread
134 void client(int num)
135 {
136  //printf("client %d start. Open %d connections\n", num, CLIENT_CONNECTION);
137 
138  void *ctx = zmq_ctx_new(); assert(ctx);
139 
140  void *sock[CLIENT_CONNECTION];
141  int rc;
142  // open ZMQ_CLIENT connections
143  for (int i = 0; i < CLIENT_CONNECTION; i++) {
144  sock[i] = zmq_socket(ctx, ZMQ_CLIENT); assert(sock[i]);
145  rc = zmq_connect(sock[i], SERVER_ADDR); assert(rc == 0);
146  // test connection
147  zmq_msg_t msg;
148  int v = rand() % 256 + 1;
149  message_fill(&msg, v);
150  rc = zmq_msg_send(&msg, sock[i], 0); assert(rc > 0);
151  rc = zmq_msg_init(&msg); assert(rc == 0);
152  rc = zmq_msg_recv(&msg, sock[i], 0); assert(rc > 0);
153  rc = message_check(&msg); assert(rc == v);
154  zmq_msg_close(&msg);
155  }
156  printf("client %d open %d connections\n", num, CLIENT_CONNECTION);
157  client_ready++;
158  while (client_ready < CLIENT_COUNT) Sleep(10); // wait while all clients open sockets
159 
160  int reconnect = 0;
161  while(1) {
162  int val[CLIENT_CONNECTION];
163  zmq_msg_t msg;
164  // send messages
165  for(int i = 0; i < CLIENT_CONNECTION; i++) {
166  val[i] = rand() % MESSAGE_MAX_SIZE + 1;
167  message_fill(&msg, val[i]);
168  rc = zmq_msg_send(&msg, sock[i], 0); assert(rc > 0);
169  }
170  // recv and check
171  for (int i = 0; i < CLIENT_CONNECTION; i++) {
172  rc = zmq_msg_init(&msg); assert(rc == 0);
173  rc = zmq_msg_recv(&msg, sock[i], 0); assert(rc > 0);
174  rc = message_check(&msg);
175  if(rc != val[i] && rc > 0) {
176  fprintf(stderr, "wrong message: send %d recv %d \n", val[i], rc);
177  }
178  zmq_msg_close(&msg);
179  client_cnt[num]++;
180  }
181  // reconnect one
182  reconnect++;
183  if(reconnect == CLIENT_RECONNECT) {
184  int n = rand() % CLIENT_CONNECTION;
185  zmq_close(sock[n]);
186  sock[n] = zmq_socket(ctx, ZMQ_CLIENT); assert(sock[n]);
187  int rc = zmq_connect(sock[n], SERVER_ADDR); assert(rc == 0);
188  }
189  }
190 }
191 
192 //*******************************************************************
193 int main (void) {
194  int v1, v2, v3; zmq_version(&v1, &v2, &v3);
195  printf("ZMQ version %d.%d.%d. Compile %s %s\n", v1, v2, v3, __DATE__, __TIME__);
196 
197  std::thread ct[CLIENT_COUNT];
198  for (int i = 0; i < CLIENT_COUNT; i++) ct[i] = std::thread(client, i);
199 
200  std::thread st(server);
201 
202  int w[SERVER_WORKER_COUNT] = { 0 };
203  int c[CLIENT_COUNT] = { 0 };
204  int total = 0;
205 
206  while(1) {
207  Sleep(1000);
208  if (client_ready < CLIENT_COUNT) continue;
209  // check workers
210  for(int i = 0; i < SERVER_WORKER_COUNT; i++) {
211  if(w[i] == worker_cnt[i]) {
212  fprintf(stderr, "worker %d not work \n", i);
213  }
214  w[i] = worker_cnt[i];
215  }
216  // check clients
217  int t = 0;
218  for (int i = 0; i < CLIENT_COUNT; i++) {
219  if (c[i] == client_cnt[i]) {
220  fprintf(stderr, "client %d not work \n", i);
221  }
222  c[i] = client_cnt[i];
223  t += c[i];
224  }
225  printf("\rTotal %d messages. Speed %d per second ", t, t - total);
226  total = t;
227  }
228  return 0;
229 }
zmq_version
ZMQ_EXPORT void zmq_version(int *major_, int *minor_, int *patch_)
Definition: zmq.cpp:88
CLIENT_COUNT
#define CLIENT_COUNT
Definition: test_multithread.cpp:23
ZMQ_SERVER
#define ZMQ_SERVER
Definition: zmq_draft.h:14
NULL
NULL
Definition: test_security_zap.cpp:405
CLIENT_CONNECTION
#define CLIENT_CONNECTION
Definition: test_multithread.cpp:24
server_sock
void * server_sock
Definition: test_multithread.cpp:78
sock
void * sock
Definition: test_connect_resolve.cpp:9
zmq_msg_send
ZMQ_EXPORT int zmq_msg_send(zmq_msg_t *msg_, void *s_, int flags_)
Definition: zmq.cpp:609
ZMQ_CLIENT
#define ZMQ_CLIENT
Definition: zmq_draft.h:15
zmq_ctx_new
ZMQ_EXPORT void * zmq_ctx_new(void)
Definition: zmq.cpp:109
client_ready
std::atomic< int > client_ready
Definition: test_multithread.cpp:131
client
void client(int num)
Definition: test_multithread.cpp:134
zmq_msg_data
ZMQ_EXPORT void * zmq_msg_data(zmq_msg_t *msg_)
Definition: zmq.cpp:642
zmq_connect
ZMQ_EXPORT int zmq_connect(void *s_, const char *addr_)
Definition: zmq.cpp:307
client_cnt
std::atomic< int > client_cnt[CLIENT_COUNT]
Definition: test_multithread.cpp:130
zmq_msg_size
ZMQ_EXPORT size_t zmq_msg_size(const zmq_msg_t *msg_)
Definition: zmq.cpp:647
queue
Definition: wepoll.c:927
CLIENT_RECONNECT
#define CLIENT_RECONNECT
Definition: test_multithread.cpp:25
ZMQ_PUSH
#define ZMQ_PUSH
Definition: zmq.h:266
zmq_msg_t
Definition: zmq.h:218
worker
void worker(int num)
Definition: test_multithread.cpp:83
zmq_bind
ZMQ_EXPORT int zmq_bind(void *s_, const char *addr_)
Definition: zmq.cpp:299
zmq_msg_recv
ZMQ_EXPORT int zmq_msg_recv(zmq_msg_t *msg_, void *s_, int flags_)
Definition: zmq.cpp:617
zmq_socket
ZMQ_EXPORT void * zmq_socket(void *, int type_)
Definition: zmq.cpp:230
SERVER_ADDR
#define SERVER_ADDR
Definition: test_multithread.cpp:20
MESSAGE_MAX_SIZE
#define MESSAGE_MAX_SIZE
Definition: test_multithread.cpp:27
zmq_msg_init
ZMQ_EXPORT int zmq_msg_init(zmq_msg_t *msg_)
Definition: zmq.cpp:587
n
GLdouble n
Definition: glcorearb.h:4153
zmq_close
ZMQ_EXPORT int zmq_close(void *s_)
Definition: zmq.cpp:241
i
int i
Definition: gmock-matchers_test.cc:764
v1
GLfloat GLfloat v1
Definition: glcorearb.h:3086
main
int main(void)
Definition: test_multithread.cpp:193
SERVER_WORKER_COUNT
#define SERVER_WORKER_COUNT
Definition: test_multithread.cpp:21
v
const GLdouble * v
Definition: glcorearb.h:3106
size
GLsizeiptr size
Definition: glcorearb.h:2943
zmq_msg_init_size
ZMQ_EXPORT int zmq_msg_init_size(zmq_msg_t *msg_, size_t size_)
Definition: zmq.cpp:592
v3
GLfloat GLfloat GLfloat GLfloat v3
Definition: glcorearb.h:3088
message_fill
void message_fill(zmq_msg_t *msg, int val)
Definition: test_multithread.cpp:33
message_check
int message_check(zmq_msg_t *msg)
Definition: test_multithread.cpp:46
data
GLint GLenum GLsizei GLsizei GLsizei GLint GLsizei const GLvoid * data
Definition: glcorearb.h:2879
val
GLuint GLfloat * val
Definition: glcorearb.h:3604
assert.h
server_ctx
void * server_ctx
Definition: test_multithread.cpp:77
ZMQ_PULL
#define ZMQ_PULL
Definition: zmq.h:265
w
GLubyte GLubyte GLubyte GLubyte w
Definition: glcorearb.h:3126
worker_cnt
std::atomic< int > worker_cnt[SERVER_WORKER_COUNT]
Definition: test_multithread.cpp:80
zmq_msg_close
ZMQ_EXPORT int zmq_msg_close(zmq_msg_t *msg_)
Definition: zmq.cpp:625
v2
GLfloat GLfloat GLfloat v2
Definition: glcorearb.h:3087
server
void server()
Definition: test_multithread.cpp:103


libaditof
Author(s):
autogenerated on Wed May 21 2025 02:06:59