local_thr.cpp
Go to the documentation of this file.
1 /* SPDX-License-Identifier: MPL-2.0 */
2 
3 #include "../include/zmq.h"
4 #include <stdio.h>
5 #include <stdlib.h>
6 
7 // keys are arbitrary but must match remote_lat.cpp
8 const char server_prvkey[] = "{X}#>t#jRGaQ}gMhv=30r(Mw+87YGs+5%kh=i@f8";
9 
10 int main (int argc, char *argv[])
11 {
12  const char *bind_to;
13  int message_count;
14  size_t message_size;
15  void *ctx;
16  void *s;
17  int rc;
18  int i;
19  zmq_msg_t msg;
20  void *watch;
21  unsigned long elapsed;
22  double throughput;
23  double megabits;
24  int curve = 0;
25 
26  if (argc != 4 && argc != 5) {
27  printf ("usage: local_thr <bind-to> <message-size> <message-count> "
28  "[<enable_curve>]\n");
29  return 1;
30  }
31  bind_to = argv[1];
32  message_size = atoi (argv[2]);
33  message_count = atoi (argv[3]);
34  if (argc >= 5 && atoi (argv[4])) {
35  curve = 1;
36  }
37 
38  ctx = zmq_init (1);
39  if (!ctx) {
40  printf ("error in zmq_init: %s\n", zmq_strerror (errno));
41  return -1;
42  }
43 
44  s = zmq_socket (ctx, ZMQ_PULL);
45  if (!s) {
46  printf ("error in zmq_socket: %s\n", zmq_strerror (errno));
47  return -1;
48  }
49 
50  // Add your socket options here.
51  // For example ZMQ_RATE, ZMQ_RECOVERY_IVL and ZMQ_MCAST_LOOP for PGM.
52  if (curve) {
54  sizeof (server_prvkey));
55  if (rc != 0) {
56  printf ("error in zmq_setsockoopt: %s\n", zmq_strerror (errno));
57  return -1;
58  }
59  int server = 1;
60  rc = zmq_setsockopt (s, ZMQ_CURVE_SERVER, &server, sizeof (int));
61  if (rc != 0) {
62  printf ("error in zmq_setsockoopt: %s\n", zmq_strerror (errno));
63  return -1;
64  }
65  }
66 
67  rc = zmq_bind (s, bind_to);
68  if (rc != 0) {
69  printf ("error in zmq_bind: %s\n", zmq_strerror (errno));
70  return -1;
71  }
72 
73  rc = zmq_msg_init (&msg);
74  if (rc != 0) {
75  printf ("error in zmq_msg_init: %s\n", zmq_strerror (errno));
76  return -1;
77  }
78 
79  rc = zmq_recvmsg (s, &msg, 0);
80  if (rc < 0) {
81  printf ("error in zmq_recvmsg: %s\n", zmq_strerror (errno));
82  return -1;
83  }
84  if (zmq_msg_size (&msg) != message_size) {
85  printf ("message of incorrect size received\n");
86  return -1;
87  }
88 
89  watch = zmq_stopwatch_start ();
90 
91  for (i = 0; i != message_count - 1; i++) {
92  rc = zmq_recvmsg (s, &msg, 0);
93  if (rc < 0) {
94  printf ("error in zmq_recvmsg: %s\n", zmq_strerror (errno));
95  return -1;
96  }
97  if (zmq_msg_size (&msg) != message_size) {
98  printf ("message of incorrect size received\n");
99  return -1;
100  }
101  }
102 
103  elapsed = zmq_stopwatch_stop (watch);
104  if (elapsed == 0)
105  elapsed = 1;
106 
107  rc = zmq_msg_close (&msg);
108  if (rc != 0) {
109  printf ("error in zmq_msg_close: %s\n", zmq_strerror (errno));
110  return -1;
111  }
112 
113  throughput = ((double) message_count / (double) elapsed * 1000000);
114  megabits = ((double) throughput * message_size * 8) / 1000000;
115 
116  printf ("message size: %d [B]\n", (int) message_size);
117  printf ("message count: %d\n", (int) message_count);
118  printf ("mean throughput: %d [msg/s]\n", (int) throughput);
119  printf ("mean throughput: %.3f [Mb/s]\n", (double) megabits);
120 
121  rc = zmq_close (s);
122  if (rc != 0) {
123  printf ("error in zmq_close: %s\n", zmq_strerror (errno));
124  return -1;
125  }
126 
127  rc = zmq_ctx_term (ctx);
128  if (rc != 0) {
129  printf ("error in zmq_ctx_term: %s\n", zmq_strerror (errno));
130  return -1;
131  }
132 
133  return 0;
134 }
zmq_strerror
const ZMQ_EXPORT char * zmq_strerror(int errnum_)
Definition: zmq.cpp:96
s
XmlRpcServer s
ZMQ_CURVE_SECRETKEY
#define ZMQ_CURVE_SECRETKEY
Definition: zmq.h:314
errno
int errno
zmq_msg_size
ZMQ_EXPORT size_t zmq_msg_size(const zmq_msg_t *msg_)
Definition: zmq.cpp:647
message_size
static size_t message_size
Definition: inproc_lat.cpp:18
zmq_setsockopt
ZMQ_EXPORT int zmq_setsockopt(void *s_, int option_, const void *optval_, size_t optvallen_)
Definition: zmq.cpp:250
zmq_stopwatch_start
ZMQ_EXPORT void * zmq_stopwatch_start(void)
Definition: zmq_utils.cpp:32
zmq_msg_t
Definition: zmq.h:218
zmq_bind
ZMQ_EXPORT int zmq_bind(void *s_, const char *addr_)
Definition: zmq.cpp:299
zmq_socket
ZMQ_EXPORT void * zmq_socket(void *, int type_)
Definition: zmq.cpp:230
server
void * server
Definition: test_security_curve.cpp:29
zmq_msg_init
ZMQ_EXPORT int zmq_msg_init(zmq_msg_t *msg_)
Definition: zmq.cpp:587
zmq_close
ZMQ_EXPORT int zmq_close(void *s_)
Definition: zmq.cpp:241
i
int i
Definition: gmock-matchers_test.cc:764
zmq_recvmsg
ZMQ_EXPORT int zmq_recvmsg(void *s_, zmq_msg_t *msg_, int flags_)
Definition: zmq.cpp:481
ZMQ_CURVE_SERVER
#define ZMQ_CURVE_SERVER
Definition: zmq.h:312
main
int main(int argc, char *argv[])
Definition: local_thr.cpp:10
zmq_init
ZMQ_EXPORT void * zmq_init(int io_threads_)
Definition: zmq.cpp:196
server_prvkey
const char server_prvkey[]
Definition: local_thr.cpp:8
zmq_ctx_term
ZMQ_EXPORT int zmq_ctx_term(void *context_)
Definition: zmq.cpp:128
ZMQ_PULL
#define ZMQ_PULL
Definition: zmq.h:265
message_count
static int message_count
Definition: inproc_thr.cpp:18
zmq_msg_close
ZMQ_EXPORT int zmq_msg_close(zmq_msg_t *msg_)
Definition: zmq.cpp:625
zmq_stopwatch_stop
ZMQ_EXPORT unsigned long zmq_stopwatch_stop(void *watch_)
Definition: zmq_utils.cpp:47


libaditof
Author(s):
autogenerated on Wed May 21 2025 02:06:55