inproc_lat.cpp
Go to the documentation of this file.
1 /* SPDX-License-Identifier: MPL-2.0 */
2 
3 #include "../include/zmq.h"
4 
5 #include <stdio.h>
6 #include <stdlib.h>
7 #include <string.h>
8 
9 #include "platform.hpp"
10 
11 #if defined ZMQ_HAVE_WINDOWS
12 #include <windows.h>
13 #include <process.h>
14 #else
15 #include <pthread.h>
16 #endif
17 
18 static size_t message_size;
19 static int roundtrip_count;
20 
21 #if defined ZMQ_HAVE_WINDOWS
22 static unsigned int __stdcall worker (void *ctx_)
23 #else
24 static void *worker (void *ctx_)
25 #endif
26 {
27  void *s;
28  int rc;
29  int i;
30  zmq_msg_t msg;
31 
32  s = zmq_socket (ctx_, ZMQ_REP);
33  if (!s) {
34  printf ("error in zmq_socket: %s\n", zmq_strerror (errno));
35  exit (1);
36  }
37 
38  rc = zmq_connect (s, "inproc://lat_test");
39  if (rc != 0) {
40  printf ("error in zmq_connect: %s\n", zmq_strerror (errno));
41  exit (1);
42  }
43 
44  rc = zmq_msg_init (&msg);
45  if (rc != 0) {
46  printf ("error in zmq_msg_init: %s\n", zmq_strerror (errno));
47  exit (1);
48  }
49 
50  for (i = 0; i != roundtrip_count; i++) {
51  rc = zmq_recvmsg (s, &msg, 0);
52  if (rc < 0) {
53  printf ("error in zmq_recvmsg: %s\n", zmq_strerror (errno));
54  exit (1);
55  }
56  rc = zmq_sendmsg (s, &msg, 0);
57  if (rc < 0) {
58  printf ("error in zmq_sendmsg: %s\n", zmq_strerror (errno));
59  exit (1);
60  }
61  }
62 
63  rc = zmq_msg_close (&msg);
64  if (rc != 0) {
65  printf ("error in zmq_msg_close: %s\n", zmq_strerror (errno));
66  exit (1);
67  }
68 
69  rc = zmq_close (s);
70  if (rc != 0) {
71  printf ("error in zmq_close: %s\n", zmq_strerror (errno));
72  exit (1);
73  }
74 
75 #if defined ZMQ_HAVE_WINDOWS
76  return 0;
77 #else
78  return NULL;
79 #endif
80 }
81 
82 int main (int argc, char *argv[])
83 {
84 #if defined ZMQ_HAVE_WINDOWS
85  HANDLE local_thread;
86 #else
87  pthread_t local_thread;
88 #endif
89  void *ctx;
90  void *s;
91  int rc;
92  int i;
93  zmq_msg_t msg;
94  void *watch;
95  unsigned long elapsed;
96  double latency;
97 
98  if (argc != 3) {
99  printf ("usage: inproc_lat <message-size> <roundtrip-count>\n");
100  return 1;
101  }
102 
103  message_size = atoi (argv[1]);
104  roundtrip_count = atoi (argv[2]);
105 
106  ctx = zmq_init (1);
107  if (!ctx) {
108  printf ("error in zmq_init: %s\n", zmq_strerror (errno));
109  return -1;
110  }
111 
112  s = zmq_socket (ctx, ZMQ_REQ);
113  if (!s) {
114  printf ("error in zmq_socket: %s\n", zmq_strerror (errno));
115  return -1;
116  }
117 
118  rc = zmq_bind (s, "inproc://lat_test");
119  if (rc != 0) {
120  printf ("error in zmq_bind: %s\n", zmq_strerror (errno));
121  return -1;
122  }
123 
124 #if defined ZMQ_HAVE_WINDOWS
125  local_thread = (HANDLE) _beginthreadex (NULL, 0, worker, ctx, 0, NULL);
126  if (local_thread == 0) {
127  printf ("error in _beginthreadex\n");
128  return -1;
129  }
130 #else
131  rc = pthread_create (&local_thread, NULL, worker, ctx);
132  if (rc != 0) {
133  printf ("error in pthread_create: %s\n", zmq_strerror (rc));
134  return -1;
135  }
136 #endif
137 
138  rc = zmq_msg_init_size (&msg, message_size);
139  if (rc != 0) {
140  printf ("error in zmq_msg_init_size: %s\n", zmq_strerror (errno));
141  return -1;
142  }
143  memset (zmq_msg_data (&msg), 0, message_size);
144 
145  printf ("message size: %d [B]\n", (int) message_size);
146  printf ("roundtrip count: %d\n", (int) roundtrip_count);
147 
148  watch = zmq_stopwatch_start ();
149 
150  for (i = 0; i != roundtrip_count; i++) {
151  rc = zmq_sendmsg (s, &msg, 0);
152  if (rc < 0) {
153  printf ("error in zmq_sendmsg: %s\n", zmq_strerror (errno));
154  return -1;
155  }
156  rc = zmq_recvmsg (s, &msg, 0);
157  if (rc < 0) {
158  printf ("error in zmq_recvmsg: %s\n", zmq_strerror (errno));
159  return -1;
160  }
161  if (zmq_msg_size (&msg) != message_size) {
162  printf ("message of incorrect size received\n");
163  return -1;
164  }
165  }
166 
167  elapsed = zmq_stopwatch_stop (watch);
168 
169  rc = zmq_msg_close (&msg);
170  if (rc != 0) {
171  printf ("error in zmq_msg_close: %s\n", zmq_strerror (errno));
172  return -1;
173  }
174 
175  latency = (double) elapsed / (roundtrip_count * 2);
176 
177 #if defined ZMQ_HAVE_WINDOWS
178  DWORD rc2 = WaitForSingleObject (local_thread, INFINITE);
179  if (rc2 == WAIT_FAILED) {
180  printf ("error in WaitForSingleObject\n");
181  return -1;
182  }
183  BOOL rc3 = CloseHandle (local_thread);
184  if (rc3 == 0) {
185  printf ("error in CloseHandle\n");
186  return -1;
187  }
188 #else
189  rc = pthread_join (local_thread, NULL);
190  if (rc != 0) {
191  printf ("error in pthread_join: %s\n", zmq_strerror (rc));
192  return -1;
193  }
194 #endif
195 
196  printf ("average latency: %.3f [us]\n", (double) latency);
197 
198  rc = zmq_close (s);
199  if (rc != 0) {
200  printf ("error in zmq_close: %s\n", zmq_strerror (errno));
201  return -1;
202  }
203 
204  rc = zmq_ctx_term (ctx);
205  if (rc != 0) {
206  printf ("error in zmq_ctx_term: %s\n", zmq_strerror (errno));
207  return -1;
208  }
209 
210  return 0;
211 }
zmq_strerror
const ZMQ_EXPORT char * zmq_strerror(int errnum_)
Definition: zmq.cpp:96
main
int main(int argc, char *argv[])
Definition: inproc_lat.cpp:82
NULL
NULL
Definition: test_security_zap.cpp:405
s
XmlRpcServer s
zmq_sendmsg
ZMQ_EXPORT int zmq_sendmsg(void *s_, zmq_msg_t *msg_, int flags_)
Definition: zmq.cpp:372
errno
int errno
ZMQ_REQ
#define ZMQ_REQ
Definition: zmq.h:261
zmq_msg_data
ZMQ_EXPORT void * zmq_msg_data(zmq_msg_t *msg_)
Definition: zmq.cpp:642
zmq_connect
ZMQ_EXPORT int zmq_connect(void *s_, const char *addr_)
Definition: zmq.cpp:307
zmq_msg_size
ZMQ_EXPORT size_t zmq_msg_size(const zmq_msg_t *msg_)
Definition: zmq.cpp:647
message_size
static size_t message_size
Definition: inproc_lat.cpp:18
roundtrip_count
static int roundtrip_count
Definition: inproc_lat.cpp:19
zmq_stopwatch_start
ZMQ_EXPORT void * zmq_stopwatch_start(void)
Definition: zmq_utils.cpp:32
zmq_msg_t
Definition: zmq.h:218
ZMQ_REP
#define ZMQ_REP
Definition: zmq.h:262
zmq_bind
ZMQ_EXPORT int zmq_bind(void *s_, const char *addr_)
Definition: zmq.cpp:299
zmq_socket
ZMQ_EXPORT void * zmq_socket(void *, int type_)
Definition: zmq.cpp:230
zmq_msg_init
ZMQ_EXPORT int zmq_msg_init(zmq_msg_t *msg_)
Definition: zmq.cpp:587
zmq_close
ZMQ_EXPORT int zmq_close(void *s_)
Definition: zmq.cpp:241
i
int i
Definition: gmock-matchers_test.cc:764
zmq_recvmsg
ZMQ_EXPORT int zmq_recvmsg(void *s_, zmq_msg_t *msg_, int flags_)
Definition: zmq.cpp:481
zmq_msg_init_size
ZMQ_EXPORT int zmq_msg_init_size(zmq_msg_t *msg_, size_t size_)
Definition: zmq.cpp:592
zmq_init
ZMQ_EXPORT void * zmq_init(int io_threads_)
Definition: zmq.cpp:196
HANDLE
void * HANDLE
Definition: wepoll.c:70
zmq_ctx_term
ZMQ_EXPORT int zmq_ctx_term(void *context_)
Definition: zmq.cpp:128
zmq_msg_close
ZMQ_EXPORT int zmq_msg_close(zmq_msg_t *msg_)
Definition: zmq.cpp:625
worker
static void * worker(void *ctx_)
Definition: inproc_lat.cpp:24
zmq_stopwatch_stop
ZMQ_EXPORT unsigned long zmq_stopwatch_stop(void *watch_)
Definition: zmq_utils.cpp:47


libaditof
Author(s):
autogenerated on Wed May 21 2025 02:06:54