benchmark-pump.c
Go to the documentation of this file.
1 /* Copyright Joyent, Inc. and other Node contributors. All rights reserved.
2  *
3  * Permission is hereby granted, free of charge, to any person obtaining a copy
4  * of this software and associated documentation files (the "Software"), to
5  * deal in the Software without restriction, including without limitation the
6  * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
7  * sell copies of the Software, and to permit persons to whom the Software is
8  * furnished to do so, subject to the following conditions:
9  *
10  * The above copyright notice and this permission notice shall be included in
11  * all copies or substantial portions of the Software.
12  *
13  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
14  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18  * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
19  * IN THE SOFTWARE.
20  */
21 
22 #include "task.h"
23 #include "uv.h"
24 
25 #include <math.h>
26 #include <stdio.h>
27 
28 
29 static int TARGET_CONNECTIONS;
30 #define WRITE_BUFFER_SIZE 8192
31 #define MAX_SIMULTANEOUS_CONNECTS 100
32 
33 #define PRINT_STATS 0
34 #define STATS_INTERVAL 1000 /* msec */
35 #define STATS_COUNT 5
36 
37 
38 static void do_write(uv_stream_t*);
39 static void maybe_connect_some(void);
40 
41 static uv_req_t* req_alloc(void);
42 static void req_free(uv_req_t* uv_req);
43 
44 static void buf_alloc(uv_handle_t* handle, size_t size, uv_buf_t* buf);
45 static void buf_free(const uv_buf_t* buf);
46 
47 static uv_loop_t* loop;
48 
52 static struct sockaddr_in listen_addr;
53 static struct sockaddr_in connect_addr;
54 
56 
57 static int max_connect_socket = 0;
58 static int max_read_sockets = 0;
59 static int read_sockets = 0;
60 static int write_sockets = 0;
61 
62 static int64_t nrecv = 0;
63 static int64_t nrecv_total = 0;
64 static int64_t nsent = 0;
65 static int64_t nsent_total = 0;
66 
67 static int stats_left = 0;
68 
70 
71 /* Make this as large as you need. */
72 #define MAX_WRITE_HANDLES 1000
73 
75 
78 
80 
81 
82 static double gbit(int64_t bytes, int64_t passed_ms) {
83  double gbits = ((double)bytes / (1024 * 1024 * 1024)) * 8;
84  return gbits / ((double)passed_ms / 1000);
85 }
86 
87 
88 static void show_stats(uv_timer_t* handle) {
89  int64_t diff;
90  int i;
91 
92 #if PRINT_STATS
93  fprintf(stderr, "connections: %d, write: %.1f gbit/s\n",
96  fflush(stderr);
97 #endif
98 
99  /* Exit if the show is over */
100  if (!--stats_left) {
101 
103  diff = uv_now(loop) - start_time;
104 
105  fprintf(stderr, "%s_pump%d_client: %.1f gbit/s\n",
106  type == TCP ? "tcp" : "pipe",
108  gbit(nsent_total, diff));
109  fflush(stderr);
110 
111  for (i = 0; i < write_sockets; i++) {
112  if (type == TCP)
114  else
116  }
117 
118  exit(0);
119  }
120 
121  /* Reset read and write counters */
122  nrecv = 0;
123  nsent = 0;
124 }
125 
126 
127 static void read_show_stats(void) {
128  int64_t diff;
129 
131  diff = uv_now(loop) - start_time;
132 
133  fprintf(stderr, "%s_pump%d_server: %.1f gbit/s\n",
134  type == TCP ? "tcp" : "pipe",
136  gbit(nrecv_total, diff));
137  fflush(stderr);
138 }
139 
140 
141 
143  free(handle);
144  read_sockets--;
145 
146  /* If it's past the first second and everyone has closed their connection
147  * Then print stats.
148  */
149  if (uv_now(loop) - start_time > 1000 && read_sockets == 0) {
150  read_show_stats();
151  uv_close((uv_handle_t*)server, NULL);
152  }
153 }
154 
155 
156 static void start_stats_collection(void) {
157  int r;
158 
159  /* Show-stats timer */
162  ASSERT(r == 0);
164  ASSERT(r == 0);
165 
168 }
169 
170 
172  if (nrecv_total == 0) {
173  ASSERT(start_time == 0);
176  }
177 
178  if (bytes < 0) {
180  return;
181  }
182 
183  buf_free(buf);
184 
185  nrecv += bytes;
186  nrecv_total += bytes;
187 }
188 
189 
190 static void write_cb(uv_write_t* req, int status) {
191  ASSERT(status == 0);
192 
193  req_free((uv_req_t*) req);
194 
195  nsent += sizeof write_buffer;
196  nsent_total += sizeof write_buffer;
197 
199 }
200 
201 
202 static void do_write(uv_stream_t* stream) {
203  uv_write_t* req;
204  uv_buf_t buf;
205  int r;
206 
207  buf.base = (char*) &write_buffer;
208  buf.len = sizeof write_buffer;
209 
210  req = (uv_write_t*) req_alloc();
211  r = uv_write(req, stream, &buf, 1, write_cb);
212  ASSERT(r == 0);
213 }
214 
215 
216 static void connect_cb(uv_connect_t* req, int status) {
217  int i;
218 
219  if (status) {
220  fprintf(stderr, "%s", uv_strerror(status));
221  fflush(stderr);
222  }
223  ASSERT(status == 0);
224 
225  write_sockets++;
226  req_free((uv_req_t*) req);
227 
229 
232 
233  /* Yay! start writing */
234  for (i = 0; i < write_sockets; i++) {
235  if (type == TCP)
237  else
239  }
240  }
241 }
242 
243 
244 static void maybe_connect_some(void) {
245  uv_connect_t* req;
246  uv_tcp_t* tcp;
247  uv_pipe_t* pipe;
248  int r;
249 
252  if (type == TCP) {
254 
255  r = uv_tcp_init(loop, tcp);
256  ASSERT(r == 0);
257 
258  req = (uv_connect_t*) req_alloc();
259  r = uv_tcp_connect(req,
260  tcp,
261  (const struct sockaddr*) &connect_addr,
262  connect_cb);
263  ASSERT(r == 0);
264  } else {
266 
267  r = uv_pipe_init(loop, pipe, 0);
268  ASSERT(r == 0);
269 
270  req = (uv_connect_t*) req_alloc();
272  }
273  }
274 }
275 
276 
277 static void connection_cb(uv_stream_t* s, int status) {
279  int r;
280 
281  ASSERT(server == s);
282  ASSERT(status == 0);
283 
284  if (type == TCP) {
285  stream = (uv_stream_t*)malloc(sizeof(uv_tcp_t));
287  ASSERT(r == 0);
288  } else {
289  stream = (uv_stream_t*)malloc(sizeof(uv_pipe_t));
291  ASSERT(r == 0);
292  }
293 
294  r = uv_accept(s, stream);
295  ASSERT(r == 0);
296 
298  ASSERT(r == 0);
299 
300  read_sockets++;
302 }
303 
304 
305 /*
306  * Request allocator
307  */
308 
309 typedef struct req_list_s {
311  struct req_list_s* next;
312 } req_list_t;
313 
314 
315 static req_list_t* req_freelist = NULL;
316 
317 
318 static uv_req_t* req_alloc(void) {
319  req_list_t* req;
320 
321  req = req_freelist;
322  if (req != NULL) {
323  req_freelist = req->next;
324  return (uv_req_t*) req;
325  }
326 
327  req = (req_list_t*) malloc(sizeof *req);
328  return (uv_req_t*) req;
329 }
330 
331 
332 static void req_free(uv_req_t* uv_req) {
334 
335  req->next = req_freelist;
336  req_freelist = req;
337 }
338 
339 
340 /*
341  * Buffer allocator
342  */
343 
344 typedef struct buf_list_s {
346  struct buf_list_s* next;
347 } buf_list_t;
348 
349 
350 static buf_list_t* buf_freelist = NULL;
351 
352 
353 static void buf_alloc(uv_handle_t* handle, size_t size, uv_buf_t* buf) {
354  buf_list_t* ab;
355 
356  ab = buf_freelist;
357  if (ab != NULL)
358  buf_freelist = ab->next;
359  else {
360  ab = malloc(size + sizeof(*ab));
361  ab->uv_buf_t.len = size;
362  ab->uv_buf_t.base = (char*) (ab + 1);
363  }
364 
365  *buf = ab->uv_buf_t;
366 }
367 
368 
369 static void buf_free(const uv_buf_t* buf) {
370  buf_list_t* ab = (buf_list_t*) buf->base - 1;
371  ab->next = buf_freelist;
372  buf_freelist = ab;
373 }
374 
375 
376 HELPER_IMPL(tcp_pump_server) {
377  int r;
378 
379  type = TCP;
380  loop = uv_default_loop();
381 
382  ASSERT(0 == uv_ip4_addr("0.0.0.0", TEST_PORT, &listen_addr));
383 
384  /* Server */
387  ASSERT(r == 0);
388  r = uv_tcp_bind(&tcpServer, (const struct sockaddr*) &listen_addr, 0);
389  ASSERT(r == 0);
391  ASSERT(r == 0);
392 
394 
395  return 0;
396 }
397 
398 
399 HELPER_IMPL(pipe_pump_server) {
400  int r;
401  type = PIPE;
402 
403  loop = uv_default_loop();
404 
405  /* Server */
407  r = uv_pipe_init(loop, &pipeServer, 0);
408  ASSERT(r == 0);
410  ASSERT(r == 0);
412  ASSERT(r == 0);
413 
415 
417  return 0;
418 }
419 
420 
421 static void tcp_pump(int n) {
424  type = TCP;
425 
426  loop = uv_default_loop();
427 
428  ASSERT(0 == uv_ip4_addr("127.0.0.1", TEST_PORT, &connect_addr));
429 
430  /* Start making connections */
432 
434 
436 }
437 
438 
439 static void pipe_pump(int n) {
442  type = PIPE;
443 
444  loop = uv_default_loop();
445 
446  /* Start making connections */
448 
450 
452 }
453 
454 
455 BENCHMARK_IMPL(tcp_pump100_client) {
456  tcp_pump(100);
457  return 0;
458 }
459 
460 
461 BENCHMARK_IMPL(tcp_pump1_client) {
462  tcp_pump(1);
463  return 0;
464 }
465 
466 
467 BENCHMARK_IMPL(pipe_pump100_client) {
468  pipe_pump(100);
469  return 0;
470 }
471 
472 
473 BENCHMARK_IMPL(pipe_pump1_client) {
474  pipe_pump(1);
475  return 0;
476 }
task.h
pipeServer
static uv_pipe_t pipeServer
Definition: benchmark-pump.c:50
buf_freelist
static buf_list_t * buf_freelist
Definition: benchmark-pump.c:350
pipe_pump
static void pipe_pump(int n)
Definition: benchmark-pump.c:439
uv_pipe_connect
UV_EXTERN void uv_pipe_connect(uv_connect_t *req, uv_pipe_t *handle, const char *name, uv_connect_cb cb)
Definition: unix/pipe.c:173
uv_pipe_init
UV_EXTERN int uv_pipe_init(uv_loop_t *, uv_pipe_t *handle, int ipc)
Definition: unix/pipe.c:33
uv_connect_s
Definition: uv.h:580
read_sockets
static int read_sockets
Definition: benchmark-pump.c:59
nsent_total
static int64_t nsent_total
Definition: benchmark-pump.c:65
uv_now
UV_EXTERN uint64_t uv_now(const uv_loop_t *)
Definition: uv-common.c:537
tcpServer
static uv_tcp_t tcpServer
Definition: benchmark-pump.c:49
buf
voidpf void * buf
Definition: bloaty/third_party/zlib/contrib/minizip/ioapi.h:136
uv_listen
UV_EXTERN int uv_listen(uv_stream_t *stream, int backlog, uv_connection_cb cb)
Definition: unix/stream.c:656
listen_addr
static struct sockaddr_in listen_addr
Definition: benchmark-pump.c:52
uv_connect_s::handle
uv_stream_t * handle
Definition: uv.h:583
ASSERT
#define ASSERT(expr)
Definition: task.h:102
nrecv
static int64_t nrecv
Definition: benchmark-pump.c:62
tcp
static uv_tcp_t tcp
Definition: test-connection-fail.c:29
status
absl::Status status
Definition: rls.cc:251
uv_strerror
const UV_EXTERN char * uv_strerror(int err)
Definition: uv-common.c:212
MAX_SIMULTANEOUS_CONNECTS
#define MAX_SIMULTANEOUS_CONNECTS
Definition: benchmark-pump.c:31
uv_run
UV_EXTERN int uv_run(uv_loop_t *, uv_run_mode mode)
Definition: unix/core.c:361
write_buffer
static char write_buffer[WRITE_BUFFER_SIZE]
Definition: benchmark-pump.c:69
req_free
static void req_free(uv_req_t *uv_req)
Definition: benchmark-pump.c:332
pipe_write_handles
static uv_pipe_t pipe_write_handles[MAX_WRITE_HANDLES]
Definition: benchmark-pump.c:77
BENCHMARK_IMPL
BENCHMARK_IMPL(tcp_pump100_client)
Definition: benchmark-pump.c:455
uv_tcp_bind
UV_EXTERN int uv_tcp_bind(uv_tcp_t *handle, const struct sockaddr *addr, unsigned int flags)
Definition: uv-common.c:277
TEST_PORT
#define TEST_PORT
Definition: task.h:53
python_utils.port_server.stderr
stderr
Definition: port_server.py:51
uv_close
UV_EXTERN void uv_close(uv_handle_t *handle, uv_close_cb close_cb)
Definition: unix/core.c:112
tcp_pump
static void tcp_pump(int n)
Definition: benchmark-pump.c:421
uv_stream_s
Definition: uv.h:491
uv_tcp_connect
UV_EXTERN int uv_tcp_connect(uv_connect_t *req, uv_tcp_t *handle, const struct sockaddr *addr, uv_connect_cb cb)
Definition: uv-common.c:315
uv_ip4_addr
UV_EXTERN int uv_ip4_addr(const char *ip, int port, struct sockaddr_in *addr)
Definition: uv-common.c:221
start_stats_collection
static void start_stats_collection(void)
Definition: benchmark-pump.c:156
STATS_COUNT
#define STATS_COUNT
Definition: benchmark-pump.c:35
uv_update_time
UV_EXTERN void uv_update_time(uv_loop_t *)
Definition: unix/core.c:413
uv_default_loop
UV_EXTERN uv_loop_t * uv_default_loop(void)
Definition: uv-common.c:733
uv_any_req
Definition: uv.h:1761
max_read_sockets
static int max_read_sockets
Definition: benchmark-pump.c:58
ssize_t
intptr_t ssize_t
Definition: win.h:27
do_write
static void do_write(uv_stream_t *)
Definition: benchmark-pump.c:202
int64_t
signed __int64 int64_t
Definition: stdint-msvc2008.h:89
uv_write
UV_EXTERN int uv_write(uv_write_t *req, uv_stream_t *handle, const uv_buf_t bufs[], unsigned int nbufs, uv_write_cb cb)
Definition: unix/stream.c:1492
req
static uv_connect_t req
Definition: test-connection-fail.c:30
req_list_t
struct req_list_s req_list_t
UV_RUN_DEFAULT
@ UV_RUN_DEFAULT
Definition: uv.h:254
req_list_s
Definition: benchmark-pump.c:309
uv_read_start
UV_EXTERN int uv_read_start(uv_stream_t *, uv_alloc_cb alloc_cb, uv_read_cb read_cb)
Definition: unix/stream.c:1555
bm_diff.diff
diff
Definition: bm_diff.py:274
buf_alloc
static void buf_alloc(uv_handle_t *handle, size_t size, uv_buf_t *buf)
Definition: benchmark-pump.c:353
uv_tcp_init
UV_EXTERN int uv_tcp_init(uv_loop_t *, uv_tcp_t *handle)
Definition: unix/tcp.c:143
uv_accept
UV_EXTERN int uv_accept(uv_stream_t *server, uv_stream_t *client)
Definition: unix/stream.c:591
TCP
@ TCP
Definition: task.h:82
req_list_s::uv_req
union uv_any_req uv_req
Definition: benchmark-pump.c:310
tcp_write_handles
static uv_tcp_t tcp_write_handles[MAX_WRITE_HANDLES]
Definition: benchmark-pump.c:76
connection_cb
static void connection_cb(uv_stream_t *s, int status)
Definition: benchmark-pump.c:277
buf_list_s::next
struct buf_list_s * next
Definition: benchmark-pump.c:346
HELPER_IMPL
HELPER_IMPL(tcp_pump_server)
Definition: benchmark-pump.c:376
uv_timer_s
Definition: uv.h:850
n
int n
Definition: abseil-cpp/absl/container/btree_test.cc:1080
uv_tcp_s
Definition: uv.h:544
req_alloc
static uv_req_t * req_alloc(void)
Definition: benchmark-pump.c:318
uv_buf_t::base
char * base
Definition: unix.h:122
req_list_s::next
struct req_list_s * next
Definition: benchmark-pump.c:311
uv.h
connect_addr
static struct sockaddr_in connect_addr
Definition: benchmark-pump.c:53
MAKE_VALGRIND_HAPPY
#define MAKE_VALGRIND_HAPPY()
Definition: task.h:229
buf_list_t
struct buf_list_s buf_list_t
read_cb
static void read_cb(uv_stream_t *stream, ssize_t bytes, const uv_buf_t *buf)
Definition: benchmark-pump.c:171
uv_buf_t
Definition: unix.h:121
bytes
uint8 bytes[10]
Definition: bloaty/third_party/protobuf/src/google/protobuf/io/coded_stream_unittest.cc:153
server
Definition: examples/python/async_streaming/server.py:1
TARGET_CONNECTIONS
static int TARGET_CONNECTIONS
Definition: benchmark-pump.c:29
max_connect_socket
static int max_connect_socket
Definition: benchmark-pump.c:57
type
static stream_type type
Definition: benchmark-pump.c:74
read_show_stats
static void read_show_stats(void)
Definition: benchmark-pump.c:127
fix_build_deps.r
r
Definition: fix_build_deps.py:491
show_stats
static void show_stats(uv_timer_t *handle)
Definition: benchmark-pump.c:88
PIPE
@ PIPE
Definition: task.h:84
buf_free
static void buf_free(const uv_buf_t *buf)
Definition: benchmark-pump.c:369
uv_pipe_s
Definition: uv.h:757
MAX_WRITE_HANDLES
#define MAX_WRITE_HANDLES
Definition: benchmark-pump.c:72
loop
static uv_loop_t * loop
Definition: benchmark-pump.c:47
write_sockets
static int write_sockets
Definition: benchmark-pump.c:60
read_sockets_close_cb
static void read_sockets_close_cb(uv_handle_t *handle)
Definition: benchmark-pump.c:142
uv_buf_t::len
size_t len
Definition: unix.h:123
stats_left
static int stats_left
Definition: benchmark-pump.c:67
connect_cb
static void connect_cb(uv_connect_t *req, int status)
Definition: benchmark-pump.c:216
uv_write_s
Definition: uv.h:522
req_freelist
static req_list_t * req_freelist
Definition: benchmark-pump.c:315
timer_handle
static uv_timer_t timer_handle
Definition: benchmark-pump.c:79
stream_type
stream_type
Definition: task.h:81
handle
static csh handle
Definition: test_arm_regression.c:16
uv_handle_s
Definition: uv.h:441
TEST_PIPENAME
#define TEST_PIPENAME
Definition: task.h:61
uv_timer_start
UV_EXTERN int uv_timer_start(uv_timer_t *handle, uv_timer_cb cb, uint64_t timeout, uint64_t repeat)
Definition: timer.c:66
buf_list_s::uv_buf_t
uv_buf_t uv_buf_t
Definition: benchmark-pump.c:345
uv_loop_s
Definition: uv.h:1767
uv_timer_init
UV_EXTERN int uv_timer_init(uv_loop_t *, uv_timer_t *handle)
Definition: timer.c:58
nsent
static int64_t nsent
Definition: benchmark-pump.c:64
uv_pipe_bind
UV_EXTERN int uv_pipe_bind(uv_pipe_t *handle, const char *name)
Definition: unix/pipe.c:43
size
voidpf void uLong size
Definition: bloaty/third_party/zlib/contrib/minizip/ioapi.h:136
buf_list_s
Definition: benchmark-pump.c:344
write_cb
static void write_cb(uv_write_t *req, int status)
Definition: benchmark-pump.c:190
nrecv_total
static int64_t nrecv_total
Definition: benchmark-pump.c:63
server
static uv_stream_t * server
Definition: benchmark-pump.c:51
start_time
static int64_t start_time
Definition: benchmark-pump.c:55
STATS_INTERVAL
#define STATS_INTERVAL
Definition: benchmark-pump.c:34
uv_req_s
Definition: uv.h:404
gbit
static double gbit(int64_t bytes, int64_t passed_ms)
Definition: benchmark-pump.c:82
i
uint64_t i
Definition: abseil-cpp/absl/container/btree_benchmark.cc:230
WRITE_BUFFER_SIZE
#define WRITE_BUFFER_SIZE
Definition: benchmark-pump.c:30
maybe_connect_some
static void maybe_connect_some(void)
Definition: benchmark-pump.c:244
stream
voidpf stream
Definition: bloaty/third_party/zlib/contrib/minizip/ioapi.h:136


grpc
Author(s):
autogenerated on Fri May 16 2025 02:57:45