test-ipc-send-recv.c
Go to the documentation of this file.
1 /* Copyright Joyent, Inc. and other Node contributors. All rights reserved.
2  *
3  * Permission is hereby granted, free of charge, to any person obtaining a copy
4  * of this software and associated documentation files (the "Software"), to
5  * deal in the Software without restriction, including without limitation the
6  * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
7  * sell copies of the Software, and to permit persons to whom the Software is
8  * furnished to do so, subject to the following conditions:
9  *
10  * The above copyright notice and this permission notice shall be included in
11  * all copies or substantial portions of the Software.
12  *
13  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
14  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18  * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
19  * IN THE SOFTWARE.
20  */
21 
22 #include "uv.h"
23 #include "task.h"
24 
25 #include <stdio.h>
26 #include <string.h>
27 
28 /* See test-ipc.c */
31  const char* helper);
32 
34 
35 union handles {
41 };
42 
43 struct test_ctx {
49  union handles send;
50  union handles send2;
51  union handles recv;
52  union handles recv2;
53 };
54 
55 struct echo_ctx {
61  union handles recv;
62  union handles recv2;
63 };
64 
65 static struct test_ctx ctx;
66 static struct echo_ctx ctx2;
67 
68 /* Used in write2_cb to decide if we need to cleanup or not */
69 static int is_child_process;
70 static int is_in_process;
71 static int read_cb_count;
72 static int recv_cb_count;
73 static int write2_cb_called;
74 
75 
77  size_t suggested_size,
78  uv_buf_t* buf) {
79  /* we're not actually reading anything so a small buffer is okay */
80  static char slab[8];
81  buf->base = slab;
82  buf->len = sizeof(slab);
83 }
84 
85 
86 static void recv_cb(uv_stream_t* handle,
87  ssize_t nread,
88  const uv_buf_t* buf) {
90  uv_pipe_t* pipe;
91  int r;
92  union handles* recv;
93 
94  pipe = (uv_pipe_t*) handle;
95  ASSERT(pipe == &ctx.channel);
96 
97  do {
98  if (++recv_cb_count == 1) {
99  recv = &ctx.recv;
100  } else {
101  recv = &ctx.recv2;
102  }
103 
104  /* Depending on the OS, the final recv_cb can be called after
105  * the child process has terminated which can result in nread
106  * being UV_EOF instead of the number of bytes read. Since
107  * the other end of the pipe has closed this UV_EOF is an
108  * acceptable value. */
109  if (nread == UV_EOF) {
110  /* UV_EOF is only acceptable for the final recv_cb call */
111  ASSERT(recv_cb_count == 2);
112  } else {
113  ASSERT(nread >= 0);
115 
117  ASSERT(pending == ctx.expected_type);
118 
119  if (pending == UV_NAMED_PIPE)
120  r = uv_pipe_init(ctx.channel.loop, &recv->pipe, 0);
121  else if (pending == UV_TCP)
122  r = uv_tcp_init(ctx.channel.loop, &recv->tcp);
123  else
124  abort();
125  ASSERT(r == 0);
126 
127  r = uv_accept(handle, &recv->stream);
128  ASSERT(r == 0);
129  }
130  } while (uv_pipe_pending_count(pipe) > 0);
131 
132  /* Close after two writes received */
133  if (recv_cb_count == 2) {
134  uv_close((uv_handle_t*)&ctx.channel, NULL);
135  }
136 }
137 
138 static void connect_cb(uv_connect_t* req, int status) {
139  int r;
140  uv_buf_t buf;
141 
142  ASSERT(req == &ctx.connect_req);
143  ASSERT(status == 0);
144 
145  buf = uv_buf_init(".", 1);
146  r = uv_write2(&ctx.write_req,
147  (uv_stream_t*)&ctx.channel,
148  &buf, 1,
149  &ctx.send.stream,
150  NULL);
151  ASSERT(r == 0);
152 
153  /* Perform two writes to the same pipe to make sure that on Windows we are
154  * not running into issue 505:
155  * https://github.com/libuv/libuv/issues/505 */
156  buf = uv_buf_init(".", 1);
157  r = uv_write2(&ctx.write_req2,
158  (uv_stream_t*)&ctx.channel,
159  &buf, 1,
160  &ctx.send2.stream,
161  NULL);
162  ASSERT(r == 0);
163 
164  r = uv_read_start((uv_stream_t*)&ctx.channel, alloc_cb, recv_cb);
165  ASSERT(r == 0);
166 }
167 
168 static int run_test(int inprocess) {
171  int r;
172 
173  if (inprocess) {
175  ASSERT(r == 0);
176 
177  uv_sleep(1000);
178 
179  r = uv_pipe_init(uv_default_loop(), &ctx.channel, 1);
180  ASSERT(r == 0);
181 
182  uv_pipe_connect(&ctx.connect_req, &ctx.channel, TEST_PIPENAME_3, connect_cb);
183  } else {
184  spawn_helper(&ctx.channel, &process, "ipc_send_recv_helper");
185 
186  connect_cb(&ctx.connect_req, 0);
187  }
188 
190  ASSERT(r == 0);
191 
192  ASSERT(recv_cb_count == 2);
193 
194  if (inprocess) {
195  r = uv_thread_join(&tid);
196  ASSERT(r == 0);
197  }
198 
199  return 0;
200 }
201 
202 static int run_ipc_send_recv_pipe(int inprocess) {
203  int r;
204 
205  ctx.expected_type = UV_NAMED_PIPE;
206 
207  r = uv_pipe_init(uv_default_loop(), &ctx.send.pipe, 1);
208  ASSERT(r == 0);
209 
210  r = uv_pipe_bind(&ctx.send.pipe, TEST_PIPENAME);
211  ASSERT(r == 0);
212 
213  r = uv_pipe_init(uv_default_loop(), &ctx.send2.pipe, 1);
214  ASSERT(r == 0);
215 
216  r = uv_pipe_bind(&ctx.send2.pipe, TEST_PIPENAME_2);
217  ASSERT(r == 0);
218 
219  r = run_test(inprocess);
220  ASSERT(r == 0);
221 
223  return 0;
224 }
225 
226 TEST_IMPL(ipc_send_recv_pipe) {
227 #if defined(NO_SEND_HANDLE_ON_PIPE)
228  RETURN_SKIP(NO_SEND_HANDLE_ON_PIPE);
229 #endif
230  return run_ipc_send_recv_pipe(0);
231 }
232 
233 TEST_IMPL(ipc_send_recv_pipe_inprocess) {
234 #if defined(NO_SEND_HANDLE_ON_PIPE)
235  RETURN_SKIP(NO_SEND_HANDLE_ON_PIPE);
236 #endif
237  return run_ipc_send_recv_pipe(1);
238 }
239 
240 static int run_ipc_send_recv_tcp(int inprocess) {
241  struct sockaddr_in addr;
242  int r;
243 
244  ASSERT(0 == uv_ip4_addr("127.0.0.1", TEST_PORT, &addr));
245 
246  ctx.expected_type = UV_TCP;
247 
248  r = uv_tcp_init(uv_default_loop(), &ctx.send.tcp);
249  ASSERT(r == 0);
250 
251  r = uv_tcp_init(uv_default_loop(), &ctx.send2.tcp);
252  ASSERT(r == 0);
253 
254  r = uv_tcp_bind(&ctx.send.tcp, (const struct sockaddr*) &addr, 0);
255  ASSERT(r == 0);
256 
257  r = uv_tcp_bind(&ctx.send2.tcp, (const struct sockaddr*) &addr, 0);
258  ASSERT(r == 0);
259 
260  r = run_test(inprocess);
261  ASSERT(r == 0);
262 
264  return 0;
265 }
266 
267 TEST_IMPL(ipc_send_recv_tcp) {
268 #if defined(NO_SEND_HANDLE_ON_PIPE)
269  RETURN_SKIP(NO_SEND_HANDLE_ON_PIPE);
270 #endif
271  return run_ipc_send_recv_tcp(0);
272 }
273 
274 TEST_IMPL(ipc_send_recv_tcp_inprocess) {
275 #if defined(NO_SEND_HANDLE_ON_PIPE)
276  RETURN_SKIP(NO_SEND_HANDLE_ON_PIPE);
277 #endif
278  return run_ipc_send_recv_tcp(1);
279 }
280 
281 
282 /* Everything here runs in a child process or second thread. */
283 
284 static void write2_cb(uv_write_t* req, int status) {
285  ASSERT(status == 0);
286 
287  /* After two successful writes in the child process, allow the child
288  * process to be closed. */
289  if (++write2_cb_called == 2 && (is_child_process || is_in_process)) {
290  uv_close(&ctx2.recv.handle, NULL);
291  uv_close(&ctx2.recv2.handle, NULL);
292  uv_close((uv_handle_t*)&ctx2.channel, NULL);
293  uv_close((uv_handle_t*)&ctx2.listen, NULL);
294  }
295 }
296 
298  ssize_t nread,
299  const uv_buf_t* rdbuf) {
300  uv_buf_t wrbuf;
301  uv_pipe_t* pipe;
303  int r;
304  union handles* recv;
306 
307  if (nread == UV_EOF || nread == UV_ECONNABORTED) {
308  return;
309  }
310 
311  pipe = (uv_pipe_t*) handle;
312  do {
313  if (++read_cb_count == 2) {
314  recv = &ctx2.recv;
316  } else {
317  recv = &ctx2.recv2;
319  }
320 
321  ASSERT(pipe == &ctx2.channel);
322  ASSERT(nread >= 0);
324 
326  ASSERT(pending == UV_NAMED_PIPE || pending == UV_TCP);
327 
328  if (pending == UV_NAMED_PIPE)
329  r = uv_pipe_init(ctx2.channel.loop, &recv->pipe, 0);
330  else if (pending == UV_TCP)
331  r = uv_tcp_init(ctx2.channel.loop, &recv->tcp);
332  else
333  abort();
334  ASSERT(r == 0);
335 
336  r = uv_accept(handle, &recv->stream);
337  ASSERT(r == 0);
338 
339  wrbuf = uv_buf_init(".", 1);
342  &wrbuf,
343  1,
344  &recv->stream,
345  write2_cb);
346  ASSERT(r == 0);
347  } while (uv_pipe_pending_count(pipe) > 0);
348 }
349 
350 static void send_recv_start(void) {
351  int r;
355 
357  ASSERT(r == 0);
358 }
359 
360 static void listen_cb(uv_stream_t* handle, int status) {
361  int r;
363  ASSERT(status == 0);
364 
366  ASSERT(r == 0);
367 
368  send_recv_start();
369 }
370 
372  int r;
373 
374  is_in_process = inprocess;
375 
376  memset(&ctx2, 0, sizeof(ctx2));
377 
378  r = uv_pipe_init(loop, &ctx2.listen, 0);
379  ASSERT(r == 0);
380 
381  r = uv_pipe_init(loop, &ctx2.channel, 1);
382  ASSERT(r == 0);
383 
384  if (inprocess) {
386  ASSERT(r == 0);
387 
388  r = uv_listen((uv_stream_t*)&ctx2.listen, SOMAXCONN, listen_cb);
389  ASSERT(r == 0);
390  } else {
391  r = uv_pipe_open(&ctx2.channel, 0);
392  ASSERT(r == 0);
393 
394  send_recv_start();
395  }
396 
399  ASSERT(r == 0);
400 
401  return 0;
402 }
403 
404 /* stdin is a duplex channel over which a handle is sent.
405  * We receive it and send it back where it came from.
406  */
408  int r;
409 
411  ASSERT(r == 0);
412 
414  return 0;
415 }
416 
418  int r;
419  uv_loop_t loop;
420 
421  r = uv_loop_init(&loop);
422  ASSERT(r == 0);
423 
425  ASSERT(r == 0);
426 
427  r = uv_loop_close(&loop);
428  ASSERT(r == 0);
429 }
uv_pipe_pending_count
UV_EXTERN int uv_pipe_pending_count(uv_pipe_t *handle)
Definition: unix/pipe.c:298
async_greeter_server_with_graceful_shutdown.loop
loop
Definition: async_greeter_server_with_graceful_shutdown.py:59
TEST_PIPENAME_2
#define TEST_PIPENAME_2
Definition: task.h:62
slab
static char slab[1]
Definition: test-watcher-cross-stop.c:37
handles::stream
uv_stream_t stream
Definition: test-ipc-send-recv.c:37
uv_process_s
Definition: uv.h:1037
task.h
ctx
Definition: benchmark-async.c:30
echo_ctx::recv
union handles recv
Definition: test-ipc-send-recv.c:61
uv_pipe_connect
UV_EXTERN void uv_pipe_connect(uv_connect_t *req, uv_pipe_t *handle, const char *name, uv_connect_cb cb)
Definition: unix/pipe.c:173
memset
return memset(p, 0, total)
uv_pipe_init
UV_EXTERN int uv_pipe_init(uv_loop_t *, uv_pipe_t *handle, int ipc)
Definition: unix/pipe.c:33
echo_ctx::channel
uv_pipe_t channel
Definition: test-ipc-send-recv.c:57
uv_handle_type
uv_handle_type
Definition: uv.h:189
uv_connect_s
Definition: uv.h:580
string.h
uv_tty_s
Definition: uv.h:704
buf
voidpf void * buf
Definition: bloaty/third_party/zlib/contrib/minizip/ioapi.h:136
uv_listen
UV_EXTERN int uv_listen(uv_stream_t *stream, int backlog, uv_connection_cb cb)
Definition: unix/stream.c:656
test_ctx::send2
union handles send2
Definition: test-ipc-send-recv.c:50
handles
Definition: test-ipc-send-recv.c:35
test_ctx::write_req
uv_write_t write_req
Definition: test-ipc-send-recv.c:46
ASSERT
#define ASSERT(expr)
Definition: task.h:102
handles::handle
uv_handle_t handle
Definition: test-ipc-send-recv.c:36
read_cb_count
static int read_cb_count
Definition: test-ipc-send-recv.c:71
status
absl::Status status
Definition: rls.cc:251
uv_thread_join
UV_EXTERN int uv_thread_join(uv_thread_t *tid)
Definition: libuv/src/unix/thread.c:271
write_req
Definition: benchmark-tcp-write-batch.c:31
uv_run
UV_EXTERN int uv_run(uv_loop_t *, uv_run_mode mode)
Definition: unix/core.c:361
read_cb
static void read_cb(uv_stream_t *handle, ssize_t nread, const uv_buf_t *rdbuf)
Definition: test-ipc-send-recv.c:297
uv_is_closing
UV_EXTERN int uv_is_closing(const uv_handle_t *handle)
Definition: unix/core.c:319
ipc_send_recv_helper_threadproc
void ipc_send_recv_helper_threadproc(void *arg)
Definition: test-ipc-send-recv.c:417
grpc_core::pending
P< T > pending()
Definition: try_join_test.cc:50
ipc_send_recv_helper
int ipc_send_recv_helper(void)
Definition: test-ipc-send-recv.c:407
run_test
static int run_test(int inprocess)
Definition: test-ipc-send-recv.c:168
is_in_process
static int is_in_process
Definition: test-ipc-send-recv.c:70
uv_tcp_bind
UV_EXTERN int uv_tcp_bind(uv_tcp_t *handle, const struct sockaddr *addr, unsigned int flags)
Definition: uv-common.c:277
TEST_PORT
#define TEST_PORT
Definition: task.h:53
uv_close
UV_EXTERN void uv_close(uv_handle_t *handle, uv_close_cb close_cb)
Definition: unix/core.c:112
write2_cb
static void write2_cb(uv_write_t *req, int status)
Definition: test-ipc-send-recv.c:284
uv_stream_s
Definition: uv.h:491
tid
int tid
Definition: fake_binder_test.cc:236
handles::tcp
uv_tcp_t tcp
Definition: test-ipc-send-recv.c:39
uv_ip4_addr
UV_EXTERN int uv_ip4_addr(const char *ip, int port, struct sockaddr_in *addr)
Definition: uv-common.c:221
uv_loop_close
UV_EXTERN int uv_loop_close(uv_loop_t *loop)
Definition: uv-common.c:761
uv_default_loop
UV_EXTERN uv_loop_t * uv_default_loop(void)
Definition: uv-common.c:733
channel
wrapped_grpc_channel * channel
Definition: src/php/ext/grpc/call.h:33
ssize_t
intptr_t ssize_t
Definition: win.h:27
process
static uv_process_t process
Definition: benchmark-spawn.c:32
test_ctx::write_req2
uv_write_t write_req2
Definition: test-ipc-send-recv.c:47
run_ipc_send_recv_tcp
static int run_ipc_send_recv_tcp(int inprocess)
Definition: test-ipc-send-recv.c:240
uv_is_readable
UV_EXTERN int uv_is_readable(const uv_stream_t *handle)
Definition: unix/stream.c:1606
test_ctx::recv2
union handles recv2
Definition: test-ipc-send-recv.c:52
req
static uv_connect_t req
Definition: test-connection-fail.c:30
echo_ctx
Definition: test-ipc-send-recv.c:55
UV_RUN_DEFAULT
@ UV_RUN_DEFAULT
Definition: uv.h:254
test_ctx::recv
union handles recv
Definition: test-ipc-send-recv.c:51
test_ctx
Definition: test-ipc-send-recv.c:43
uv_read_start
UV_EXTERN int uv_read_start(uv_stream_t *, uv_alloc_cb alloc_cb, uv_read_cb read_cb)
Definition: unix/stream.c:1555
echo_ctx::write_req
uv_write_t write_req
Definition: test-ipc-send-recv.c:58
arg
Definition: cmdline.cc:40
connect_cb
static void connect_cb(uv_connect_t *req, int status)
Definition: test-ipc-send-recv.c:138
echo_ctx::write_req2
uv_write_t write_req2
Definition: test-ipc-send-recv.c:59
listen_cb
static void listen_cb(uv_stream_t *handle, int status)
Definition: test-ipc-send-recv.c:360
uv_tcp_init
UV_EXTERN int uv_tcp_init(uv_loop_t *, uv_tcp_t *handle)
Definition: unix/tcp.c:143
uv_accept
UV_EXTERN int uv_accept(uv_stream_t *server, uv_stream_t *client)
Definition: unix/stream.c:591
uv_loop_init
UV_EXTERN int uv_loop_init(uv_loop_t *loop)
Definition: loop.c:30
uv_thread_create
UV_EXTERN int uv_thread_create(uv_thread_t *tid, uv_thread_cb entry, void *arg)
Definition: libuv/src/unix/thread.c:209
echo_ctx::recv2
union handles recv2
Definition: test-ipc-send-recv.c:62
uv_sleep
UV_EXTERN void uv_sleep(unsigned int msec)
Definition: unix/core.c:1521
run_ipc_send_recv_pipe
static int run_ipc_send_recv_pipe(int inprocess)
Definition: test-ipc-send-recv.c:202
test_ctx::channel
uv_pipe_t channel
Definition: test-ipc-send-recv.c:44
uv_tcp_s
Definition: uv.h:544
ctx2
static struct echo_ctx ctx2
Definition: test-ipc-send-recv.c:66
notify_parent_process
void notify_parent_process(void)
Definition: runner-unix.c:54
uv.h
ctx::loop
uv_loop_t loop
Definition: benchmark-async.c:31
send_recv_start
static void send_recv_start(void)
Definition: test-ipc-send-recv.c:350
MAKE_VALGRIND_HAPPY
#define MAKE_VALGRIND_HAPPY()
Definition: task.h:229
uv_buf_t
Definition: unix.h:121
recv_cb
static void recv_cb(uv_stream_t *handle, ssize_t nread, const uv_buf_t *buf)
Definition: test-ipc-send-recv.c:86
run_ipc_send_recv_helper
int run_ipc_send_recv_helper(uv_loop_t *loop, int inprocess)
Definition: test-ipc-send-recv.c:371
test_ctx::connect_req
uv_connect_t connect_req
Definition: test-ipc-send-recv.c:45
uv_pipe_pending_type
UV_EXTERN uv_handle_type uv_pipe_pending_type(uv_pipe_t *handle)
Definition: unix/pipe.c:315
test_ctx::send
union handles send
Definition: test-ipc-send-recv.c:49
fix_build_deps.r
r
Definition: fix_build_deps.py:491
write2_cb_called
static int write2_cb_called
Definition: test-ipc-send-recv.c:73
recv_cb_count
static int recv_cb_count
Definition: test-ipc-send-recv.c:72
uv_pipe_s
Definition: uv.h:757
uv_buf_init
UV_EXTERN uv_buf_t uv_buf_init(char *base, unsigned int len)
Definition: uv-common.c:157
handles::pipe
uv_pipe_t pipe
Definition: test-ipc-send-recv.c:38
handles::tty
uv_tty_t tty
Definition: test-ipc-send-recv.c:40
test_ctx::expected_type
uv_handle_type expected_type
Definition: test-ipc-send-recv.c:48
uv_write_s
Definition: uv.h:522
RETURN_SKIP
#define RETURN_SKIP(explanation)
Definition: task.h:262
handle
static csh handle
Definition: test_arm_regression.c:16
alloc_cb
static void alloc_cb(uv_handle_t *handle, size_t suggested_size, uv_buf_t *buf)
Definition: test-ipc-send-recv.c:76
echo_ctx::listen
uv_pipe_t listen
Definition: test-ipc-send-recv.c:56
uv_handle_s
Definition: uv.h:441
TEST_PIPENAME
#define TEST_PIPENAME
Definition: task.h:61
spawn_helper
void spawn_helper(uv_pipe_t *channel, uv_process_t *process, const char *helper)
Definition: test-ipc.c:291
uv_thread_t
pthread_t uv_thread_t
Definition: unix.h:134
uv_loop_s
Definition: uv.h:1767
TEST_IMPL
TEST_IMPL(ipc_send_recv_pipe)
Definition: test-ipc-send-recv.c:226
uv_write2
UV_EXTERN int uv_write2(uv_write_t *req, uv_stream_t *handle, const uv_buf_t bufs[], unsigned int nbufs, uv_stream_t *send_handle, uv_write_cb cb)
Definition: unix/stream.c:1393
echo_ctx::expected_type
uv_handle_type expected_type
Definition: test-ipc-send-recv.c:60
write_req
uv_write_t write_req
Definition: libuv/docs/code/tty-gravity/main.c:9
uv_is_writable
UV_EXTERN int uv_is_writable(const uv_stream_t *handle)
Definition: unix/stream.c:1611
uv_pipe_bind
UV_EXTERN int uv_pipe_bind(uv_pipe_t *handle, const char *name)
Definition: unix/pipe.c:43
TEST_PIPENAME_3
#define TEST_PIPENAME_3
Definition: task.h:63
uv_pipe_open
UV_EXTERN int uv_pipe_open(uv_pipe_t *, uv_file file)
Definition: unix/pipe.c:137
addr
struct sockaddr_in addr
Definition: libuv/docs/code/tcp-echo-server/main.c:10
is_child_process
static int is_child_process
Definition: test-ipc-send-recv.c:69


grpc
Author(s):
autogenerated on Thu Mar 13 2025 03:01:30