test/core/fling/server.cc
Go to the documentation of this file.
1 /*
2  *
3  * Copyright 2015 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
19 #include <signal.h>
20 #include <stdio.h>
21 #include <stdlib.h>
22 #include <string.h>
23 #include <time.h>
24 
25 #include <grpc/grpc.h>
26 #include <grpc/grpc_security.h>
27 #ifndef _WIN32
28 /* This is for _exit() below, which is temporary. */
29 #include <unistd.h>
30 #endif
31 
32 #include <grpc/support/alloc.h>
33 #include <grpc/support/log.h>
34 #include <grpc/support/time.h>
35 
39 #include "test/core/util/cmdline.h"
41 #include "test/core/util/port.h"
43 
46 static grpc_call* call;
50 static grpc_byte_buffer* payload_buffer = nullptr;
51 /* Used to drain the terminal read in unary calls. */
53 
57 static grpc_op status_op[2];
58 static int was_cancelled = 2;
59 static grpc_op unary_ops[6];
60 static int got_sigint = 0;
61 
62 static void* tag(intptr_t t) { return reinterpret_cast<void*>(t); }
63 
64 typedef enum {
73 
74 typedef struct {
77 } call_state;
78 
79 static void request_call(void) {
85 }
86 
87 static void handle_unary_method(void) {
88  grpc_op* op;
90 
92 
93  op = unary_ops;
96  op++;
99  op++;
101  if (payload_buffer == nullptr) {
102  gpr_log(GPR_INFO, "NULL payload buffer !!!");
103  }
105  op++;
110  op++;
113  op++;
114 
116  static_cast<size_t>(op - unary_ops),
119 }
120 
121 static void send_initial_metadata(void) {
127  error = grpc_call_start_batch(call, &metadata_send_op, 1, tagarg, nullptr);
128 
130 }
131 
132 static void start_read_op(int t) {
134  /* Starting read at server */
137  error = grpc_call_start_batch(call, &read_op, 1, tag(t), nullptr);
139 }
140 
141 static void start_write_op(void) {
143  void* tagarg = tag(FLING_SERVER_WRITE_FOR_STREAMING);
144  /* Starting write at server */
146  if (payload_buffer == nullptr) {
147  gpr_log(GPR_INFO, "NULL payload buffer !!!");
148  }
150  error = grpc_call_start_batch(call, &write_op, 1, tagarg, nullptr);
152 }
153 
154 static void start_send_status(void) {
163 
164  error = grpc_call_start_batch(call, status_op, 2, tagarg, nullptr);
166 }
167 
168 /* We have some sort of deadlock, so let's not exit gracefully for now.
169  When that is resolved, please remove the #include <unistd.h> above. */
170 static void sigint_handler(int /*x*/) { _exit(0); }
171 
172 int main(int argc, char** argv) {
173  grpc_event ev;
174  call_state* s;
175  std::string addr_buf;
176  gpr_cmdline* cl;
177  grpc_completion_queue* shutdown_cq;
178  int shutdown_started = 0;
179  int shutdown_finished = 0;
180 
181  int secure = 0;
182  const char* addr = nullptr;
183 
184  char* fake_argv[1];
185 
186  gpr_timers_set_log_filename("latency_trace.fling_server.txt");
187 
188  GPR_ASSERT(argc >= 1);
189  argc = 1;
190  fake_argv[0] = argv[0];
191  grpc_test_init(&argc, fake_argv);
192 
193  grpc_init();
194  srand(static_cast<unsigned>(clock()));
195 
196  cl = gpr_cmdline_create("fling server");
197  gpr_cmdline_add_string(cl, "bind", "Bind host:port", &addr);
198  gpr_cmdline_add_flag(cl, "secure", "Run with security?", &secure);
199  gpr_cmdline_parse(cl, argc, argv);
201 
202  if (addr == nullptr) {
204  addr = addr_buf.c_str();
205  }
206  gpr_log(GPR_INFO, "creating server on: %s", addr);
207 
210  if (secure) {
211  grpc_ssl_pem_key_cert_pair pem_key_cert_pair = {test_server1_key,
213  creds = grpc_ssl_server_credentials_create(nullptr, &pem_key_cert_pair, 1,
214  0, nullptr);
215  } else {
217  }
218  server = grpc_server_create(nullptr, nullptr);
223 
224  addr = nullptr;
225  addr_buf.clear();
226 
228 
229  request_call();
230 
231  grpc_profiler_start("server.prof");
232  signal(SIGINT, sigint_handler);
233  while (!shutdown_finished) {
234  if (got_sigint && !shutdown_started) {
235  gpr_log(GPR_INFO, "Shutting down due to SIGINT");
236 
237  shutdown_cq = grpc_completion_queue_create_for_pluck(nullptr);
238  grpc_server_shutdown_and_notify(server, shutdown_cq, tag(1000));
239 
241  shutdown_cq, tag(1000),
243  .type == GRPC_OP_COMPLETE);
244  grpc_completion_queue_destroy(shutdown_cq);
245 
247  shutdown_started = 1;
248  }
250  cq,
253  nullptr);
254  s = static_cast<call_state*>(ev.tag);
255  switch (ev.type) {
256  case GRPC_OP_COMPLETE:
257  switch (reinterpret_cast<intptr_t>(s)) {
259  if (call != nullptr) {
261  "/Reflector/reflectStream")) {
262  /* Received streaming call. Send metadata here. */
265  } else {
266  /* Received unary call. Can do all ops in one batch. */
268  }
269  } else {
270  GPR_ASSERT(shutdown_started);
271  }
272  /* request_call();
273  */
274  break;
276  if (payload_buffer != nullptr) {
277  /* Received payload from client. */
278  start_write_op();
279  } else {
280  /* Received end of stream from client. */
282  }
283  break;
285  /* Write completed at server */
287  payload_buffer = nullptr;
289  break;
291  /* Metadata send completed at server */
292  break;
294  /* Send status and close completed at server */
296  if (!shutdown_started) request_call();
297  break;
299  /* Finished payload read for unary. Start all reamaining
300  * unary ops in a batch.
301  */
303  break;
305  /* Finished unary call. */
307  payload_buffer = nullptr;
309  if (!shutdown_started) request_call();
310  break;
311  }
312  break;
313  case GRPC_QUEUE_SHUTDOWN:
314  GPR_ASSERT(shutdown_started);
315  shutdown_finished = 1;
316  break;
317  case GRPC_QUEUE_TIMEOUT:
318  break;
319  }
320  }
323 
326  grpc_shutdown();
327  return 0;
328 }
got_sigint
static int got_sigint
Definition: test/core/fling/server.cc:60
FLING_SERVER_BATCH_OPS_FOR_UNARY
@ FLING_SERVER_BATCH_OPS_FOR_UNARY
Definition: test/core/fling/server.cc:67
GPR_TIMESPAN
@ GPR_TIMESPAN
Definition: gpr_types.h:45
GPR_INFO
#define GPR_INFO
Definition: include/grpc/impl/codegen/log.h:56
grpc_op::grpc_op_data::grpc_op_send_message::send_message
struct grpc_byte_buffer * send_message
Definition: grpc_types.h:668
grpc_call_error
grpc_call_error
Definition: grpc_types.h:464
call_state::flags
uint32_t flags
Definition: test/core/fling/server.cc:76
grpc_call_details_destroy
GRPCAPI void grpc_call_details_destroy(grpc_call_details *details)
Definition: call_details.cc:36
grpc_call_details_init
GRPCAPI void grpc_call_details_init(grpc_call_details *details)
Definition: call_details.cc:30
grpc_timeout_seconds_to_deadline
gpr_timespec grpc_timeout_seconds_to_deadline(int64_t time_s)
Definition: test/core/util/test_config.cc:81
log.h
port.h
call_state
Definition: test/core/fling/server.cc:74
grpc_completion_queue_create_for_pluck
GRPCAPI grpc_completion_queue * grpc_completion_queue_create_for_pluck(void *reserved)
Definition: completion_queue_factory.cc:69
initial_metadata_send
static grpc_metadata_array initial_metadata_send
Definition: test/core/fling/server.cc:49
timers.h
grpc_op::grpc_op_data::send_initial_metadata
struct grpc_op::grpc_op_data::grpc_op_send_initial_metadata send_initial_metadata
FLING_SERVER_READ_FOR_UNARY
@ FLING_SERVER_READ_FOR_UNARY
Definition: test/core/fling/server.cc:66
FLING_SERVER_WRITE_FOR_STREAMING
@ FLING_SERVER_WRITE_FOR_STREAMING
Definition: test/core/fling/server.cc:70
grpc_metadata_array
Definition: grpc_types.h:579
grpc_call_details
Definition: grpc_types.h:585
string.h
request_call
static void request_call(void)
Definition: test/core/fling/server.cc:79
gpr_cmdline_destroy
void gpr_cmdline_destroy(gpr_cmdline *cl)
Definition: cmdline.cc:79
testing::internal::string
::std::string string
Definition: bloaty/third_party/protobuf/third_party/googletest/googletest/include/gtest/internal/gtest-port.h:881
error
grpc_error_handle error
Definition: retry_filter.cc:499
GRPC_QUEUE_SHUTDOWN
@ GRPC_QUEUE_SHUTDOWN
Definition: grpc_types.h:554
GRPC_OP_COMPLETE
@ GRPC_OP_COMPLETE
Definition: grpc_types.h:558
grpc_server_create
GRPCAPI grpc_server * grpc_server_create(const grpc_channel_args *args, void *reserved)
Definition: src/core/lib/surface/server.cc:1456
GRPC_CALL_OK
@ GRPC_CALL_OK
Definition: grpc_types.h:466
grpc_server_register_completion_queue
GRPCAPI void grpc_server_register_completion_queue(grpc_server *server, grpc_completion_queue *cq, void *reserved)
Definition: src/core/lib/surface/server.cc:1466
time.h
test_server1_cert
const char test_server1_cert[]
grpc_security.h
gpr_cmdline_add_flag
void gpr_cmdline_add_flag(gpr_cmdline *cl, const char *name, const char *help, int *value)
Definition: cmdline.cc:110
grpc_call_details::method
grpc_slice method
Definition: grpc_types.h:586
gpr_refcount
Definition: impl/codegen/sync_generic.h:39
grpc_op::grpc_op_data::recv_message
struct grpc_op::grpc_op_data::grpc_op_recv_message recv_message
grpc_test_init
void grpc_test_init(int *argc, char **argv)
Definition: test/core/util/test_config.cc:135
grpc_profiler_start
void grpc_profiler_start(const char *filename)
Definition: grpc_profiler.cc:30
grpc_op::data
union grpc_op::grpc_op_data data
grpc_insecure_server_credentials_create
GRPCAPI grpc_server_credentials * grpc_insecure_server_credentials_create()
Definition: core/lib/security/credentials/insecure/insecure_credentials.cc:71
uint32_t
unsigned int uint32_t
Definition: stdint-msvc2008.h:80
grpc_op::grpc_op_data::grpc_op_recv_message::recv_message
struct grpc_byte_buffer ** recv_message
Definition: grpc_types.h:693
grpc_server_request_call
GRPCAPI grpc_call_error grpc_server_request_call(grpc_server *server, grpc_call **call, grpc_call_details *details, grpc_metadata_array *request_metadata, grpc_completion_queue *cq_bound_to_call, grpc_completion_queue *cq_for_notification, void *tag_new)
Definition: src/core/lib/surface/server.cc:1526
gpr_timers_set_log_filename
void gpr_timers_set_log_filename(const char *)
Definition: basic_timers.cc:292
GRPC_STATUS_OK
@ GRPC_STATUS_OK
Definition: include/grpc/impl/codegen/status.h:30
GPR_ASSERT
#define GPR_ASSERT(x)
Definition: include/grpc/impl/codegen/log.h:94
unary_ops
static grpc_op unary_ops[6]
Definition: test/core/fling/server.cc:59
GRPC_OP_SEND_STATUS_FROM_SERVER
@ GRPC_OP_SEND_STATUS_FROM_SERVER
Definition: grpc_types.h:612
grpc_ssl_server_credentials_create
GRPCAPI grpc_server_credentials * grpc_ssl_server_credentials_create(const char *pem_root_certs, grpc_ssl_pem_key_cert_pair *pem_key_cert_pairs, size_t num_key_cert_pairs, int force_client_auth, void *reserved)
Definition: ssl_credentials.cc:319
payload_buffer
static grpc_byte_buffer * payload_buffer
Definition: test/core/fling/server.cc:50
grpc_server_credentials_release
GRPCAPI void grpc_server_credentials_release(grpc_server_credentials *creds)
Definition: credentials.cc:95
signal
static void signal(notification *n)
Definition: alts_tsi_handshaker_test.cc:107
grpc_server_add_http2_port
GRPCAPI int grpc_server_add_http2_port(grpc_server *server, const char *addr, grpc_server_credentials *creds)
Definition: chttp2_server.cc:1029
grpc_call_unref
GRPCAPI void grpc_call_unref(grpc_call *call)
Definition: call.cc:1770
grpc_op::grpc_op_data::grpc_op_send_status_from_server::status
grpc_status_code status
Definition: grpc_types.h:673
gpr_log
GPRAPI void gpr_log(const char *file, int line, gpr_log_severity severity, const char *format,...) GPR_PRINT_FORMAT_CHECK(4
grpc_event
Definition: grpc_types.h:564
grpc_completion_queue
Definition: completion_queue.cc:347
gpr_cmdline_add_string
void gpr_cmdline_add_string(gpr_cmdline *cl, const char *name, const char *help, const char **value)
Definition: cmdline.cc:115
grpc.h
grpc_call
struct grpc_call grpc_call
Definition: grpc_types.h:70
grpc_byte_buffer
Definition: grpc_types.h:43
grpc_core::JoinHostPort
std::string JoinHostPort(absl::string_view host, int port)
Definition: host_port.cc:32
grpc_op
Definition: grpc_types.h:640
GRPC_OP_SEND_MESSAGE
@ GRPC_OP_SEND_MESSAGE
Definition: grpc_types.h:602
cq
static grpc_completion_queue * cq
Definition: test/core/fling/server.cc:44
was_cancelled
static int was_cancelled
Definition: test/core/fling/server.cc:58
grpc_server
struct grpc_server grpc_server
Definition: grpc_types.h:65
intptr_t
_W64 signed int intptr_t
Definition: stdint-msvc2008.h:118
write_op
static grpc_op write_op
Definition: test/core/fling/server.cc:56
status_op
static grpc_op status_op[2]
Definition: test/core/fling/server.cc:57
request_metadata_recv
static grpc_metadata_array request_metadata_recv
Definition: test/core/fling/server.cc:48
grpc_server_destroy
GRPCAPI void grpc_server_destroy(grpc_server *server)
Definition: src/core/lib/surface/server.cc:1519
grpc_profiler.h
FLING_SERVER_SEND_STATUS_FOR_STREAMING
@ FLING_SERVER_SEND_STATUS_FOR_STREAMING
Definition: test/core/fling/server.cc:71
host_port.h
sigint_handler
static void sigint_handler(int)
Definition: test/core/fling/server.cc:170
grpc_pick_unused_port_or_die
int grpc_pick_unused_port_or_die(void)
gpr_now
GPRAPI gpr_timespec gpr_now(gpr_clock_type clock)
call
static grpc_call * call
Definition: test/core/fling/server.cc:46
fling_server_tags
fling_server_tags
Definition: test/core/fling/server.cc:64
terminal_buffer
static grpc_byte_buffer * terminal_buffer
Definition: test/core/fling/server.cc:52
call_state::pending_ops
gpr_refcount pending_ops
Definition: test/core/fling/server.cc:75
grpc_op::op
grpc_op_type op
Definition: grpc_types.h:642
grpc_op::grpc_op_data::grpc_op_send_initial_metadata::count
size_t count
Definition: grpc_types.h:653
test_config.h
grpc_op::grpc_op_data::recv_close_on_server
struct grpc_op::grpc_op_data::grpc_op_recv_close_on_server recv_close_on_server
GRPC_OP_RECV_MESSAGE
@ GRPC_OP_RECV_MESSAGE
Definition: grpc_types.h:621
grpc_server_credentials
Definition: src/core/lib/security/credentials/credentials.h:259
grpc_completion_queue_pluck
GRPCAPI grpc_event grpc_completion_queue_pluck(grpc_completion_queue *cq, void *tag, gpr_timespec deadline, void *reserved)
Definition: completion_queue.cc:1328
start_read_op
static void start_read_op(int t)
Definition: test/core/fling/server.cc:132
read_op
static grpc_op read_op
Definition: test/core/fling/server.cc:54
grpc_op::grpc_op_data::send_status_from_server
struct grpc_op::grpc_op_data::grpc_op_send_status_from_server send_status_from_server
gpr_time_add
GPRAPI gpr_timespec gpr_time_add(gpr_timespec a, gpr_timespec b)
Definition: src/core/lib/gpr/time.cc:135
server
Definition: examples/python/async_streaming/server.py:1
grpc_completion_queue_destroy
GRPCAPI void grpc_completion_queue_destroy(grpc_completion_queue *cq)
Definition: completion_queue.cc:1424
GRPC_OP_SEND_INITIAL_METADATA
@ GRPC_OP_SEND_INITIAL_METADATA
Definition: grpc_types.h:598
grpc_op::grpc_op_data::send_message
struct grpc_op::grpc_op_data::grpc_op_send_message send_message
tag
static void * tag(intptr_t t)
Definition: test/core/fling/server.cc:62
server
static grpc_server * server
Definition: test/core/fling/server.cc:45
handle_unary_method
static void handle_unary_method(void)
Definition: test/core/fling/server.cc:87
gpr_time_from_micros
GPRAPI gpr_timespec gpr_time_from_micros(int64_t us, gpr_clock_type clock_type)
Definition: src/core/lib/gpr/time.cc:115
alloc.h
cmdline.h
grpc_op::grpc_op_data::grpc_op_send_status_from_server::trailing_metadata_count
size_t trailing_metadata_count
Definition: grpc_types.h:671
grpc_server_shutdown_and_notify
GRPCAPI void grpc_server_shutdown_and_notify(grpc_server *server, grpc_completion_queue *cq, void *tag)
Definition: src/core/lib/surface/server.cc:1503
grpc_byte_buffer_destroy
GRPCAPI void grpc_byte_buffer_destroy(grpc_byte_buffer *bb)
Definition: byte_buffer.cc:81
grpc_completion_queue_next
GRPCAPI grpc_event grpc_completion_queue_next(grpc_completion_queue *cq, gpr_timespec deadline, void *reserved)
Definition: completion_queue.cc:1133
grpc_profiler_stop
void grpc_profiler_stop(void)
Definition: grpc_profiler.cc:44
grpc_completion_queue_shutdown
GRPCAPI void grpc_completion_queue_shutdown(grpc_completion_queue *cq)
Definition: completion_queue.cc:1416
gpr_cmdline_create
gpr_cmdline * gpr_cmdline_create(const char *description)
Definition: cmdline.cc:66
GRPC_OP_RECV_CLOSE_ON_SERVER
@ GRPC_OP_RECV_CLOSE_ON_SERVER
Definition: grpc_types.h:633
start_send_status
static void start_send_status(void)
Definition: test/core/fling/server.cc:154
metadata_send_op
static grpc_op metadata_send_op
Definition: test/core/fling/server.cc:55
start_write_op
static void start_write_op(void)
Definition: test/core/fling/server.cc:141
gpr_cmdline
Definition: cmdline.cc:48
FLING_SERVER_SEND_INIT_METADATA_FOR_STREAMING
@ FLING_SERVER_SEND_INIT_METADATA_FOR_STREAMING
Definition: test/core/fling/server.cc:68
main
int main(int argc, char **argv)
Definition: test/core/fling/server.cc:172
grpc_completion_queue_create_for_next
GRPCAPI grpc_completion_queue * grpc_completion_queue_create_for_next(void *reserved)
Definition: completion_queue_factory.cc:62
grpc_op::grpc_op_data::grpc_op_send_status_from_server::status_details
grpc_slice * status_details
Definition: grpc_types.h:677
ssl_test_data.h
asyncio_get_stats.type
type
Definition: asyncio_get_stats.py:37
grpc_event::type
grpc_completion_type type
Definition: grpc_types.h:566
grpc_init
GRPCAPI void grpc_init(void)
Definition: init.cc:146
grpc_server_start
GRPCAPI void grpc_server_start(grpc_server *server)
Definition: src/core/lib/surface/server.cc:1497
GPR_CLOCK_REALTIME
@ GPR_CLOCK_REALTIME
Definition: gpr_types.h:39
grpc_slice_str_cmp
GPRAPI int grpc_slice_str_cmp(grpc_slice a, const char *b)
Definition: slice/slice.cc:426
test_server1_key
const char test_server1_key[]
FLING_SERVER_READ_FOR_STREAMING
@ FLING_SERVER_READ_FOR_STREAMING
Definition: test/core/fling/server.cc:69
op
static grpc_op * op
Definition: test/core/fling/client.cc:47
GRPC_QUEUE_TIMEOUT
@ GRPC_QUEUE_TIMEOUT
Definition: grpc_types.h:556
grpc_call_start_batch
GRPCAPI grpc_call_error grpc_call_start_batch(grpc_call *call, const grpc_op *ops, size_t nops, void *tag, void *reserved)
Definition: call.cc:1831
send_initial_metadata
static void send_initial_metadata(void)
Definition: test/core/fling/server.cc:121
grpc_shutdown
GRPCAPI void grpc_shutdown(void)
Definition: init.cc:209
grpc_ssl_pem_key_cert_pair
Definition: grpc_security.h:173
addr
struct sockaddr_in addr
Definition: libuv/docs/code/tcp-echo-server/main.c:10
grpc_event::tag
void * tag
Definition: grpc_types.h:576
grpc_op::grpc_op_data::grpc_op_recv_close_on_server::cancelled
int * cancelled
Definition: grpc_types.h:714
call_details
static grpc_call_details call_details
Definition: test/core/fling/server.cc:47
grpc_metadata_array_init
GRPCAPI void grpc_metadata_array_init(grpc_metadata_array *array)
Definition: metadata_array.cc:30
gpr_cmdline_parse
int gpr_cmdline_parse(gpr_cmdline *cl, int argc, char **argv)
Definition: cmdline.cc:309
FLING_SERVER_NEW_REQUEST
@ FLING_SERVER_NEW_REQUEST
Definition: test/core/fling/server.cc:65


grpc
Author(s):
autogenerated on Fri May 16 2025 03:00:10