22 #ifdef GRPC_POSIX_SOCKET_EV
27 #include <netinet/in.h>
32 #include <sys/socket.h>
57 static void create_test_socket(
int port,
int* socket_fd,
58 struct sockaddr_in* sin) {
64 fd =
socket(AF_INET, SOCK_STREAM, 0);
65 setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &one,
sizeof(one));
71 flags = fcntl(fd, F_GETFL, 0);
76 sin->sin_family = AF_INET;
77 sin->sin_addr.s_addr = htonl(0x7f000001);
83 void no_op_cb(
void* ,
int ) {}
96 static void server_init(
server* sv) {
97 sv->read_bytes_total = 0;
112 static void session_shutdown_cb(
void*
arg,
114 session* se =
static_cast<session*
>(
arg);
124 static void session_read_cb(
void*
arg,
126 session* se =
static_cast<session*
>(
arg);
133 session_shutdown_cb(
arg,
true);
139 if (read_once > 0) read_total += read_once;
140 }
while (read_once > 0);
141 se->sv->read_bytes_total += read_total;
147 if (read_once == 0) {
148 session_shutdown_cb(
arg,
true);
149 }
else if (read_once == -1) {
150 if (errno == EAGAIN) {
169 static void listen_shutdown_cb(
void*
arg ,
int ) {
188 struct sockaddr_storage ss;
189 socklen_t slen =
sizeof(ss);
190 grpc_fd* listen_em_fd = sv->em_fd;
193 listen_shutdown_cb(
arg, 1);
198 reinterpret_cast<struct sockaddr*
>(&ss), &slen);
201 flags = fcntl(fd, F_GETFL, 0);
202 fcntl(fd, F_SETFL,
flags | O_NONBLOCK);
203 se =
static_cast<session*
>(
gpr_malloc(
sizeof(*se)));
208 grpc_schedule_on_exec_ctx);
215 #define MAX_NUM_FD 1024
221 static int server_start(
server* sv) {
224 struct sockaddr_in sin;
227 create_test_socket(
port, &fd, &sin);
228 addr_len =
sizeof(sin);
229 GPR_ASSERT(bind(fd, (
struct sockaddr*)&sin, addr_len) == 0);
230 GPR_ASSERT(getsockname(fd, (
struct sockaddr*)&sin, &addr_len) == 0);
231 port = ntohs(sin.sin_port);
238 grpc_schedule_on_exec_ctx);
245 static void server_wait_and_shutdown(
server* sv) {
263 #define CLIENT_WRITE_BUF_SIZE 10
265 #define CLIENT_TOTAL_WRITE_CNT 3
270 char write_buf[CLIENT_WRITE_BUF_SIZE];
274 int client_write_cnt;
280 static void client_init(
client* cl) {
281 memset(cl->write_buf, 0,
sizeof(cl->write_buf));
282 cl->write_bytes_total = 0;
283 cl->client_write_cnt = 0;
288 static void client_session_shutdown_cb(
void*
arg ,
int ) {
297 static void client_session_write(
void*
arg,
305 client_session_shutdown_cb(
arg, 1);
311 write_once =
write(fd, cl->write_buf, CLIENT_WRITE_BUF_SIZE);
312 if (write_once > 0) cl->write_bytes_total += write_once;
313 }
while (write_once > 0);
315 if (errno == EAGAIN) {
317 if (cl->client_write_cnt < CLIENT_TOTAL_WRITE_CNT) {
319 grpc_schedule_on_exec_ctx);
321 cl->client_write_cnt++;
323 client_session_shutdown_cb(
arg, 1);
333 static void client_start(
client* cl,
int port) {
335 struct sockaddr_in sin;
336 create_test_socket(
port, &fd, &sin);
337 if (connect(fd,
reinterpret_cast<struct sockaddr*
>(&sin),
sizeof(sin)) ==
339 if (errno == EINPROGRESS) {
342 pfd.events = POLLOUT;
344 if (poll(&pfd, 1, -1) == -1) {
361 static void client_wait_and_shutdown(
client* cl) {
379 static void test_grpc_fd(
void) {
386 port = server_start(&sv);
388 client_start(&cl,
port);
390 client_wait_and_shutdown(&cl);
391 server_wait_and_shutdown(&sv);
392 GPR_ASSERT(sv.read_bytes_total == cl.write_bytes_total);
396 typedef struct fd_change_data {
400 void init_change_data(fd_change_data* fdc) { fdc->cb_that_ran =
nullptr; }
402 void destroy_change_data(fd_change_data* ) {}
404 static void first_read_callback(
void*
arg ,
406 fd_change_data* fdc =
static_cast<fd_change_data*
>(
arg);
409 fdc->cb_that_ran = first_read_callback;
415 static void second_read_callback(
void*
arg ,
417 fd_change_data* fdc =
static_cast<fd_change_data*
>(
arg);
420 fdc->cb_that_ran = second_read_callback;
430 static void test_grpc_fd_change(
void) {
442 grpc_schedule_on_exec_ctx);
444 grpc_schedule_on_exec_ctx);
446 init_change_data(&
a);
447 init_change_data(&
b);
449 GPR_ASSERT(socketpair(AF_UNIX, SOCK_STREAM, 0, sv) == 0);
450 flags = fcntl(sv[0], F_GETFL, 0);
452 flags = fcntl(sv[1], F_GETFL, 0);
466 while (
a.cb_that_ran ==
nullptr) {
490 while (
b.cb_that_ran ==
nullptr) {
505 destroy_change_data(&
a);
506 destroy_change_data(&
b);
514 int main(
int argc,
char** argv) {
523 test_grpc_fd_change();
525 grpc_schedule_on_exec_ctx);
536 int main(
int argc,
char** argv) {
return 1; }