inproc_thr.cpp
Go to the documentation of this file.
1 /* SPDX-License-Identifier: MPL-2.0 */
2 
3 #include "../include/zmq.h"
4 
5 #include <stdio.h>
6 #include <stdlib.h>
7 #include <string.h>
8 
9 #include "platform.hpp"
10 
11 #if defined ZMQ_HAVE_WINDOWS
12 #include <windows.h>
13 #include <process.h>
14 #else
15 #include <pthread.h>
16 #endif
17 
18 static int message_count;
19 static size_t message_size;
20 
21 #if defined ZMQ_HAVE_WINDOWS
22 static unsigned int __stdcall worker (void *ctx_)
23 #else
24 static void *worker (void *ctx_)
25 #endif
26 {
27  void *s;
28  int rc;
29  int i;
30  zmq_msg_t msg;
31 
32  s = zmq_socket (ctx_, ZMQ_PUSH);
33  if (!s) {
34  printf ("error in zmq_socket: %s\n", zmq_strerror (errno));
35  exit (1);
36  }
37 
38  rc = zmq_connect (s, "inproc://thr_test");
39  if (rc != 0) {
40  printf ("error in zmq_connect: %s\n", zmq_strerror (errno));
41  exit (1);
42  }
43 
44  for (i = 0; i != message_count; i++) {
45  rc = zmq_msg_init_size (&msg, message_size);
46  if (rc != 0) {
47  printf ("error in zmq_msg_init_size: %s\n", zmq_strerror (errno));
48  exit (1);
49  }
50 #if defined ZMQ_MAKE_VALGRIND_HAPPY
51  memset (zmq_msg_data (&msg), 0, message_size);
52 #endif
53 
54  rc = zmq_sendmsg (s, &msg, 0);
55  if (rc < 0) {
56  printf ("error in zmq_sendmsg: %s\n", zmq_strerror (errno));
57  exit (1);
58  }
59  rc = zmq_msg_close (&msg);
60  if (rc != 0) {
61  printf ("error in zmq_msg_close: %s\n", zmq_strerror (errno));
62  exit (1);
63  }
64  }
65 
66  rc = zmq_close (s);
67  if (rc != 0) {
68  printf ("error in zmq_close: %s\n", zmq_strerror (errno));
69  exit (1);
70  }
71 
72 #if defined ZMQ_HAVE_WINDOWS
73  return 0;
74 #else
75  return NULL;
76 #endif
77 }
78 
79 int main (int argc, char *argv[])
80 {
81 #if defined ZMQ_HAVE_WINDOWS
82  HANDLE local_thread;
83 #else
84  pthread_t local_thread;
85 #endif
86  void *ctx;
87  void *s;
88  int rc;
89  int i;
90  zmq_msg_t msg;
91  void *watch;
92  unsigned long elapsed;
93  unsigned long throughput;
94  double megabits;
95 
96  if (argc != 3) {
97  printf ("usage: inproc_thr <message-size> <message-count>\n");
98  return 1;
99  }
100 
101  message_size = atoi (argv[1]);
102  message_count = atoi (argv[2]);
103 
104  ctx = zmq_init (1);
105  if (!ctx) {
106  printf ("error in zmq_init: %s\n", zmq_strerror (errno));
107  return -1;
108  }
109 
110  s = zmq_socket (ctx, ZMQ_PULL);
111  if (!s) {
112  printf ("error in zmq_socket: %s\n", zmq_strerror (errno));
113  return -1;
114  }
115 
116  rc = zmq_bind (s, "inproc://thr_test");
117  if (rc != 0) {
118  printf ("error in zmq_bind: %s\n", zmq_strerror (errno));
119  return -1;
120  }
121 
122 #if defined ZMQ_HAVE_WINDOWS
123  local_thread = (HANDLE) _beginthreadex (NULL, 0, worker, ctx, 0, NULL);
124  if (local_thread == 0) {
125  printf ("error in _beginthreadex\n");
126  return -1;
127  }
128 #else
129  rc = pthread_create (&local_thread, NULL, worker, ctx);
130  if (rc != 0) {
131  printf ("error in pthread_create: %s\n", zmq_strerror (rc));
132  return -1;
133  }
134 #endif
135 
136  rc = zmq_msg_init (&msg);
137  if (rc != 0) {
138  printf ("error in zmq_msg_init: %s\n", zmq_strerror (errno));
139  return -1;
140  }
141 
142  printf ("message size: %d [B]\n", (int) message_size);
143  printf ("message count: %d\n", (int) message_count);
144 
145  rc = zmq_recvmsg (s, &msg, 0);
146  if (rc < 0) {
147  printf ("error in zmq_recvmsg: %s\n", zmq_strerror (errno));
148  return -1;
149  }
150  if (zmq_msg_size (&msg) != message_size) {
151  printf ("message of incorrect size received\n");
152  return -1;
153  }
154 
155  watch = zmq_stopwatch_start ();
156 
157  for (i = 0; i != message_count - 1; i++) {
158  rc = zmq_recvmsg (s, &msg, 0);
159  if (rc < 0) {
160  printf ("error in zmq_recvmsg: %s\n", zmq_strerror (errno));
161  return -1;
162  }
163  if (zmq_msg_size (&msg) != message_size) {
164  printf ("message of incorrect size received\n");
165  return -1;
166  }
167  }
168 
169  elapsed = zmq_stopwatch_stop (watch);
170  if (elapsed == 0)
171  elapsed = 1;
172 
173  rc = zmq_msg_close (&msg);
174  if (rc != 0) {
175  printf ("error in zmq_msg_close: %s\n", zmq_strerror (errno));
176  return -1;
177  }
178 
179 #if defined ZMQ_HAVE_WINDOWS
180  DWORD rc2 = WaitForSingleObject (local_thread, INFINITE);
181  if (rc2 == WAIT_FAILED) {
182  printf ("error in WaitForSingleObject\n");
183  return -1;
184  }
185  BOOL rc3 = CloseHandle (local_thread);
186  if (rc3 == 0) {
187  printf ("error in CloseHandle\n");
188  return -1;
189  }
190 #else
191  rc = pthread_join (local_thread, NULL);
192  if (rc != 0) {
193  printf ("error in pthread_join: %s\n", zmq_strerror (rc));
194  return -1;
195  }
196 #endif
197 
198  rc = zmq_close (s);
199  if (rc != 0) {
200  printf ("error in zmq_close: %s\n", zmq_strerror (errno));
201  return -1;
202  }
203 
204  rc = zmq_ctx_term (ctx);
205  if (rc != 0) {
206  printf ("error in zmq_ctx_term: %s\n", zmq_strerror (errno));
207  return -1;
208  }
209 
210  throughput =
211  (unsigned long) ((double) message_count / (double) elapsed * 1000000);
212  megabits = (double) (throughput * message_size * 8) / 1000000;
213 
214  printf ("mean throughput: %d [msg/s]\n", (int) throughput);
215  printf ("mean throughput: %.3f [Mb/s]\n", (double) megabits);
216 
217  return 0;
218 }
zmq_strerror
const ZMQ_EXPORT char * zmq_strerror(int errnum_)
Definition: zmq.cpp:96
NULL
NULL
Definition: test_security_zap.cpp:405
s
XmlRpcServer s
message_size
static size_t message_size
Definition: inproc_thr.cpp:19
zmq_sendmsg
ZMQ_EXPORT int zmq_sendmsg(void *s_, zmq_msg_t *msg_, int flags_)
Definition: zmq.cpp:372
errno
int errno
worker
static void * worker(void *ctx_)
Definition: inproc_thr.cpp:24
zmq_msg_data
ZMQ_EXPORT void * zmq_msg_data(zmq_msg_t *msg_)
Definition: zmq.cpp:642
zmq_connect
ZMQ_EXPORT int zmq_connect(void *s_, const char *addr_)
Definition: zmq.cpp:307
zmq_msg_size
ZMQ_EXPORT size_t zmq_msg_size(const zmq_msg_t *msg_)
Definition: zmq.cpp:647
ZMQ_PUSH
#define ZMQ_PUSH
Definition: zmq.h:266
zmq_stopwatch_start
ZMQ_EXPORT void * zmq_stopwatch_start(void)
Definition: zmq_utils.cpp:32
zmq_msg_t
Definition: zmq.h:218
zmq_bind
ZMQ_EXPORT int zmq_bind(void *s_, const char *addr_)
Definition: zmq.cpp:299
zmq_socket
ZMQ_EXPORT void * zmq_socket(void *, int type_)
Definition: zmq.cpp:230
main
int main(int argc, char *argv[])
Definition: inproc_thr.cpp:79
zmq_msg_init
ZMQ_EXPORT int zmq_msg_init(zmq_msg_t *msg_)
Definition: zmq.cpp:587
zmq_close
ZMQ_EXPORT int zmq_close(void *s_)
Definition: zmq.cpp:241
i
int i
Definition: gmock-matchers_test.cc:764
zmq_recvmsg
ZMQ_EXPORT int zmq_recvmsg(void *s_, zmq_msg_t *msg_, int flags_)
Definition: zmq.cpp:481
zmq_msg_init_size
ZMQ_EXPORT int zmq_msg_init_size(zmq_msg_t *msg_, size_t size_)
Definition: zmq.cpp:592
zmq_init
ZMQ_EXPORT void * zmq_init(int io_threads_)
Definition: zmq.cpp:196
HANDLE
void * HANDLE
Definition: wepoll.c:70
zmq_ctx_term
ZMQ_EXPORT int zmq_ctx_term(void *context_)
Definition: zmq.cpp:128
ZMQ_PULL
#define ZMQ_PULL
Definition: zmq.h:265
message_count
static int message_count
Definition: inproc_thr.cpp:18
zmq_msg_close
ZMQ_EXPORT int zmq_msg_close(zmq_msg_t *msg_)
Definition: zmq.cpp:625
zmq_stopwatch_stop
ZMQ_EXPORT unsigned long zmq_stopwatch_stop(void *watch_)
Definition: zmq_utils.cpp:47
google::protobuf.internal.decoder.long
long
Definition: decoder.py:89


libaditof
Author(s):
autogenerated on Wed May 21 2025 02:06:54