server_async.cc
Go to the documentation of this file.
1 /*
2  *
3  * Copyright 2015 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
19 #include <algorithm>
20 #include <forward_list>
21 #include <functional>
22 #include <memory>
23 #include <mutex>
24 #include <thread>
25 
26 #include <grpc/grpc.h>
27 #include <grpc/support/alloc.h>
28 #include <grpc/support/log.h>
30 #include <grpcpp/resource_quota.h>
32 #include <grpcpp/server.h>
33 #include <grpcpp/server_builder.h>
34 #include <grpcpp/server_context.h>
35 #include <grpcpp/support/config.h>
36 
39 #include "src/proto/grpc/testing/benchmark_service.grpc.pb.h"
42 #include "test/cpp/qps/server.h"
43 
44 namespace grpc {
45 namespace testing {
46 
47 template <class RequestType, class ResponseType, class ServiceType,
48  class ServerContextType>
50  public:
52  const ServerConfig& config,
53  std::function<void(ServerBuilder*, ServiceType*)> register_service,
54  std::function<void(ServiceType*, ServerContextType*, RequestType*,
57  request_unary_function,
58  std::function<void(ServiceType*, ServerContextType*,
61  request_streaming_function,
62  std::function<void(ServiceType*, ServerContextType*,
65  request_streaming_from_client_function,
66  std::function<void(ServiceType*, ServerContextType*, RequestType*,
68  ServerCompletionQueue*, void*)>
69  request_streaming_from_server_function,
70  std::function<void(ServiceType*, ServerContextType*,
73  request_streaming_both_ways_function,
74  std::function<grpc::Status(const PayloadConfig&, RequestType*,
75  ResponseType*)>
76  process_rpc)
77  : Server(config) {
78  std::unique_ptr<ServerBuilder> builder = CreateQpsServerBuilder();
79 
80  auto port_num = port();
81  // Negative port number means inproc server, so no listen port needed
82  if (port_num >= 0) {
84  builder->AddListeningPort(server_address.c_str(),
86  &port_num);
87  }
88 
89  register_service(builder.get(), &async_service_);
90 
91  int num_threads = config.async_server_threads();
92  if (num_threads <= 0) { // dynamic sizing
93  num_threads = std::min(64, cores());
95  "Sizing async server to %d threads. Defaults to number of cores "
96  "in machine or 64 threads if machine has more than 64 cores to "
97  "avoid OOMs.",
98  num_threads);
99  }
100 
101  int tpc = std::max(1, config.threads_per_cq()); // 1 if unspecified
102  int num_cqs = (num_threads + tpc - 1) / tpc; // ceiling operator
103  for (int i = 0; i < num_cqs; i++) {
104  srv_cqs_.emplace_back(builder->AddCompletionQueue());
105  }
106  for (int i = 0; i < num_threads; i++) {
107  cq_.emplace_back(i % srv_cqs_.size());
108  }
109 
111 
112  server_ = builder->BuildAndStart();
113  if (server_ == nullptr) {
114  gpr_log(GPR_ERROR, "Server: Fail to BuildAndStart(port=%d)", port_num);
115  } else {
116  gpr_log(GPR_INFO, "Server: BuildAndStart(port=%d)", port_num);
117  }
118 
119  auto process_rpc_bound =
120  std::bind(process_rpc, config.payload_config(), std::placeholders::_1,
121  std::placeholders::_2);
122 
123  for (int i = 0; i < 5000; i++) {
124  for (int j = 0; j < num_cqs; j++) {
125  if (request_unary_function) {
126  auto request_unary = std::bind(
127  request_unary_function, &async_service_, std::placeholders::_1,
128  std::placeholders::_2, std::placeholders::_3, srv_cqs_[j].get(),
129  srv_cqs_[j].get(), std::placeholders::_4);
130  contexts_.emplace_back(
131  new ServerRpcContextUnaryImpl(request_unary, process_rpc_bound));
132  }
133  if (request_streaming_function) {
134  auto request_streaming = std::bind(
135  request_streaming_function, &async_service_,
136  std::placeholders::_1, std::placeholders::_2, srv_cqs_[j].get(),
137  srv_cqs_[j].get(), std::placeholders::_3);
138  contexts_.emplace_back(new ServerRpcContextStreamingImpl(
139  request_streaming, process_rpc_bound));
140  }
141  if (request_streaming_from_client_function) {
142  auto request_streaming_from_client = std::bind(
143  request_streaming_from_client_function, &async_service_,
144  std::placeholders::_1, std::placeholders::_2, srv_cqs_[j].get(),
145  srv_cqs_[j].get(), std::placeholders::_3);
147  request_streaming_from_client, process_rpc_bound));
148  }
149  if (request_streaming_from_server_function) {
150  auto request_streaming_from_server =
151  std::bind(request_streaming_from_server_function, &async_service_,
152  std::placeholders::_1, std::placeholders::_2,
153  std::placeholders::_3, srv_cqs_[j].get(),
154  srv_cqs_[j].get(), std::placeholders::_4);
156  request_streaming_from_server, process_rpc_bound));
157  }
158  if (request_streaming_both_ways_function) {
159  // TODO(vjpai): Add this code
160  }
161  }
162  }
163 
164  for (int i = 0; i < num_threads; i++) {
165  shutdown_state_.emplace_back(new PerThreadShutdownState());
166  threads_.emplace_back(&AsyncQpsServerTest::ThreadFunc, this, i);
167  }
168  }
169  ~AsyncQpsServerTest() override {
170  for (auto ss = shutdown_state_.begin(); ss != shutdown_state_.end(); ++ss) {
171  std::lock_guard<std::mutex> lock((*ss)->mutex);
172  (*ss)->shutdown = true;
173  }
174  // TODO(vjpai): Remove the following deadline and allow full proper
175  // shutdown.
178  for (auto cq = srv_cqs_.begin(); cq != srv_cqs_.end(); ++cq) {
179  (*cq)->Shutdown();
180  }
181  for (auto thr = threads_.begin(); thr != threads_.end(); thr++) {
182  thr->join();
183  }
184  for (auto cq = srv_cqs_.begin(); cq != srv_cqs_.end(); ++cq) {
185  bool ok;
186  void* got_tag;
187  while ((*cq)->Next(&got_tag, &ok)) {
188  }
189  }
190  }
191 
192  int GetPollCount() override {
193  int count = 0;
194  for (auto cq = srv_cqs_.begin(); cq != srv_cqs_.end(); cq++) {
195  count += grpc_get_cq_poll_num((*cq)->cq());
196  }
197  return count;
198  }
199 
200  std::shared_ptr<Channel> InProcessChannel(
201  const ChannelArguments& args) override {
202  return server_->InProcessChannel(args);
203  }
204 
205  private:
206  void ThreadFunc(int thread_idx) {
207  // Wait until work is available or we are shutting down
208  bool ok;
209  void* got_tag;
210  if (!srv_cqs_[cq_[thread_idx]]->Next(&got_tag, &ok)) {
211  return;
212  }
214  std::mutex* mu_ptr = &shutdown_state_[thread_idx]->mutex;
215  do {
216  ctx = detag(got_tag);
217  // The tag is a pointer to an RPC context to invoke
218  // Proceed while holding a lock to make sure that
219  // this thread isn't supposed to shut down
220  mu_ptr->lock();
221  if (shutdown_state_[thread_idx]->shutdown) {
222  mu_ptr->unlock();
223  return;
224  }
225  } while (srv_cqs_[cq_[thread_idx]]->DoThenAsyncNext(
226  [&, ctx, ok, mu_ptr]() {
227  ctx->lock();
228  if (!ctx->RunNextState(ok)) {
229  ctx->Reset();
230  }
231  ctx->unlock();
232  mu_ptr->unlock();
233  },
234  &got_tag, &ok, gpr_inf_future(GPR_CLOCK_REALTIME)));
235  }
236 
238  public:
240  void lock() { mu_.lock(); }
241  void unlock() { mu_.unlock(); }
242  virtual ~ServerRpcContext(){};
243  virtual bool RunNextState(bool) = 0; // next state, return false if done
244  virtual void Reset() = 0; // start this back at a clean state
245  private:
247  };
248  static void* tag(ServerRpcContext* func) { return static_cast<void*>(func); }
249  static ServerRpcContext* detag(void* tag) {
250  return static_cast<ServerRpcContext*>(tag);
251  }
252 
254  public:
256  std::function<void(ServerContextType*, RequestType*,
258  void*)>
259  request_method,
261  : srv_ctx_(new ServerContextType),
263  request_method_(request_method),
264  invoke_method_(invoke_method),
268  }
270  bool RunNextState(bool ok) override { return (this->*next_state_)(ok); }
271  void Reset() override {
272  srv_ctx_.reset(new ServerContextType);
273  req_ = RequestType();
276 
277  // Then request the method
281  }
282 
283  private:
284  bool finisher(bool) { return false; }
285  bool invoker(bool ok) {
286  if (!ok) {
287  return false;
288  }
289 
290  // Call the RPC processing function
292 
293  // Have the response writer work and invoke on_finish when done
296  return true;
297  }
298  std::unique_ptr<ServerContextType> srv_ctx_;
302  std::function<void(ServerContextType*, RequestType*,
307  };
308 
310  public:
312  std::function<void(
313  ServerContextType*,
315  request_method,
317  : srv_ctx_(new ServerContextType),
319  request_method_(request_method),
320  invoke_method_(invoke_method),
321  stream_(srv_ctx_.get()) {
323  }
325  bool RunNextState(bool ok) override { return (this->*next_state_)(ok); }
326  void Reset() override {
327  srv_ctx_.reset(new ServerContextType);
328  req_ = RequestType();
330  srv_ctx_.get());
331 
332  // Then request the method
335  }
336 
337  private:
338  bool request_done(bool ok) {
339  if (!ok) {
340  return false;
341  }
344  return true;
345  }
346 
347  bool read_done(bool ok) {
348  if (ok) {
349  // invoke the method
350  // Call the RPC processing function
352  // initiate the write
355  } else { // client has sent writes done
356  // finish the stream
359  }
360  return true;
361  }
362  bool write_done(bool ok) {
363  // now go back and get another streaming read!
364  if (ok) {
367  } else {
370  }
371  return true;
372  }
373  bool finish_done(bool /*ok*/) { return false; /*reset the context*/ }
374 
375  std::unique_ptr<ServerContextType> srv_ctx_;
379  std::function<void(
380  ServerContextType*,
385  };
386 
388  : public ServerRpcContext {
389  public:
391  std::function<void(ServerContextType*,
393  void*)>
394  request_method,
396  : srv_ctx_(new ServerContextType),
398  request_method_(request_method),
399  invoke_method_(invoke_method),
400  stream_(srv_ctx_.get()) {
402  }
404  bool RunNextState(bool ok) override { return (this->*next_state_)(ok); }
405  void Reset() override {
406  srv_ctx_.reset(new ServerContextType);
407  req_ = RequestType();
408  stream_ =
410 
411  // Then request the method
414  }
415 
416  private:
417  bool request_done(bool ok) {
418  if (!ok) {
419  return false;
420  }
423  return true;
424  }
425 
426  bool read_done(bool ok) {
427  if (ok) {
428  // In this case, just do another read
429  // next_state_ is unchanged
431  return true;
432  } else { // client has sent writes done
433  // invoke the method
434  // Call the RPC processing function
436  // finish the stream
439  }
440  return true;
441  }
442  bool finish_done(bool /*ok*/) { return false; /*reset the context*/ }
443 
444  std::unique_ptr<ServerContextType> srv_ctx_;
448  std::function<void(ServerContextType*,
450  void*)>
454  };
455 
457  : public ServerRpcContext {
458  public:
460  std::function<void(ServerContextType*, RequestType*,
462  request_method,
464  : srv_ctx_(new ServerContextType),
466  request_method_(request_method),
467  invoke_method_(invoke_method),
468  stream_(srv_ctx_.get()) {
471  }
473  bool RunNextState(bool ok) override { return (this->*next_state_)(ok); }
474  void Reset() override {
475  srv_ctx_.reset(new ServerContextType);
476  req_ = RequestType();
478 
479  // Then request the method
483  }
484 
485  private:
486  bool request_done(bool ok) {
487  if (!ok) {
488  return false;
489  }
490  // invoke the method
491  // Call the RPC processing function
493 
496  return true;
497  }
498 
499  bool write_done(bool ok) {
500  if (ok) {
501  // Do another write!
502  // next_state_ is unchanged
504  } else { // must be done so let's finish
507  }
508  return true;
509  }
510  bool finish_done(bool /*ok*/) { return false; /*reset the context*/ }
511 
512  std::unique_ptr<ServerContextType> srv_ctx_;
516  std::function<void(ServerContextType*, RequestType*,
521  };
522 
523  std::vector<std::thread> threads_;
524  std::unique_ptr<grpc::Server> server_;
525  std::vector<std::unique_ptr<grpc::ServerCompletionQueue>> srv_cqs_;
526  std::vector<int> cq_;
527  ServiceType async_service_;
528  std::vector<std::unique_ptr<ServerRpcContext>> contexts_;
529 
531  mutable std::mutex mutex;
532  bool shutdown;
534  };
535 
536  std::vector<std::unique_ptr<PerThreadShutdownState>> shutdown_state_;
537 };
538 
540  BenchmarkService::AsyncService* service) {
541  builder->RegisterService(service);
542 }
545  builder->RegisterAsyncGenericService(service);
546 }
547 
548 static Status ProcessSimpleRPC(const PayloadConfig&, SimpleRequest* request,
550  if (request->response_size() > 0) {
551  if (!Server::SetPayload(request->response_type(), request->response_size(),
552  response->mutable_payload())) {
553  return Status(grpc::StatusCode::INTERNAL, "Error creating payload.");
554  }
555  }
556  // We are done using the request. Clear it to reduce working memory.
557  // This proves to reduce cache misses in large message size cases.
558  request->Clear();
559  return Status::OK;
560 }
561 
562 static Status ProcessGenericRPC(const PayloadConfig& payload_config,
564  // We are done using the request. Clear it to reduce working memory.
565  // This proves to reduce cache misses in large message size cases.
566  request->Clear();
567  int resp_size = payload_config.bytebuf_params().resp_size();
568  std::unique_ptr<char[]> buf(new char[resp_size]);
569  memset(buf.get(), 0, static_cast<size_t>(resp_size));
570  Slice slice(buf.get(), resp_size);
571  *response = ByteBuffer(&slice, 1);
572  return Status::OK;
573 }
574 
575 std::unique_ptr<Server> CreateAsyncServer(const ServerConfig& config) {
576  return std::unique_ptr<Server>(
578  BenchmarkService::AsyncService,
581  &BenchmarkService::AsyncService::RequestUnaryCall,
582  &BenchmarkService::AsyncService::RequestStreamingCall,
583  &BenchmarkService::AsyncService::RequestStreamingFromClient,
584  &BenchmarkService::AsyncService::RequestStreamingFromServer,
585  &BenchmarkService::AsyncService::RequestStreamingBothWays,
587 }
588 std::unique_ptr<Server> CreateAsyncGenericServer(const ServerConfig& config) {
589  return std::unique_ptr<Server>(
592  config, RegisterGenericService, nullptr,
593  &grpc::AsyncGenericService::RequestCall, nullptr, nullptr, nullptr,
595 }
596 
597 } // namespace testing
598 } // namespace grpc
grpc::testing::AsyncQpsServerTest::ServerRpcContextStreamingImpl::request_method_
std::function< void(ServerContextType *, grpc::ServerAsyncReaderWriter< ResponseType, RequestType > *, void *)> request_method_
Definition: server_async.cc:382
grpc::testing::AsyncQpsServerTest::ServerRpcContextStreamingImpl
Definition: server_async.cc:309
grpc::testing::AsyncQpsServerTest::ServerRpcContextUnaryImpl::finisher
bool finisher(bool)
Definition: server_async.cc:284
messages_pb2.SimpleRequest
SimpleRequest
Definition: messages_pb2.py:597
absl::time_internal::cctz::seconds
std::chrono::duration< std::int_fast64_t > seconds
Definition: abseil-cpp/absl/time/internal/cctz/include/cctz/time_zone.h:40
grpc::testing::AsyncQpsServerTest::ServerRpcContext
Definition: server_async.cc:237
grpc::testing::AsyncQpsServerTest::ServerRpcContextStreamingFromServerImpl::~ServerRpcContextStreamingFromServerImpl
~ServerRpcContextStreamingFromServerImpl() override
Definition: server_async.cc:472
grpc::testing::AsyncQpsServerTest::ServerRpcContextStreamingFromClientImpl::srv_ctx_
std::unique_ptr< ServerContextType > srv_ctx_
Definition: server_async.cc:444
GPR_INFO
#define GPR_INFO
Definition: include/grpc/impl/codegen/log.h:56
testing
Definition: aws_request_signer_test.cc:25
grpc::status
auto status
Definition: cpp/client/credentials_test.cc:200
grpc::testing::AsyncQpsServerTest::ServerRpcContextStreamingFromServerImpl::req_
RequestType req_
Definition: server_async.cc:513
grpc::ServerCompletionQueue
Definition: include/grpcpp/impl/codegen/completion_queue.h:436
grpc::testing::AsyncQpsServerTest::ServerRpcContext::RunNextState
virtual bool RunNextState(bool)=0
now
static double now(void)
Definition: test/core/fling/client.cc:130
grpc::testing::RegisterBenchmarkService
static void RegisterBenchmarkService(ServerBuilder *builder, BenchmarkService::AsyncService *service)
Definition: server_async.cc:539
grpc::testing::AsyncQpsServerTest::ServerRpcContextStreamingFromClientImpl::~ServerRpcContextStreamingFromClientImpl
~ServerRpcContextStreamingFromClientImpl() override
Definition: server_async.cc:403
grpc::AsyncGenericService
Definition: grpcpp/impl/codegen/async_generic_service.h:70
grpc._simple_stubs.RequestType
RequestType
Definition: _simple_stubs.py:27
grpc._simple_stubs.ResponseType
ResponseType
Definition: _simple_stubs.py:28
grpc::ServerContext
Definition: grpcpp/impl/codegen/server_context.h:566
log.h
grpc::testing::AsyncQpsServerTest::ServerRpcContextStreamingFromServerImpl::write_done
bool write_done(bool ok)
Definition: server_async.cc:499
get
absl::string_view get(const Cont &c)
Definition: abseil-cpp/absl/strings/str_replace_test.cc:185
grpc::testing::AsyncQpsServerTest::server_
std::unique_ptr< grpc::Server > server_
Definition: server_async.cc:524
ctx
Definition: benchmark-async.c:30
grpc::testing::AsyncQpsServerTest::ServerRpcContextStreamingImpl::RunNextState
bool RunNextState(bool ok) override
Definition: server_async.cc:325
bool
bool
Definition: setup_once.h:312
grpc::testing::ProcessGenericRPC
static Status ProcessGenericRPC(const PayloadConfig &payload_config, ByteBuffer *request, ByteBuffer *response)
Definition: server_async.cc:562
grpc::testing::AsyncQpsServerTest::ServerRpcContextUnaryImpl::Reset
void Reset() override
Definition: server_async.cc:271
memset
return memset(p, 0, total)
grpc
Definition: grpcpp/alarm.h:33
false
#define false
Definition: setup_once.h:323
grpc::testing::AsyncQpsServerTest::ServerRpcContextStreamingFromClientImpl::finish_done
bool finish_done(bool)
Definition: server_async.cc:442
grpc::ServerAsyncReader< ResponseType, RequestType >
mutex
static uv_mutex_t mutex
Definition: threadpool.c:34
grpc::testing::AsyncQpsServerTest::ServerRpcContext::~ServerRpcContext
virtual ~ServerRpcContext()
Definition: server_async.cc:242
grpc::testing::AsyncQpsServerTest::~AsyncQpsServerTest
~AsyncQpsServerTest() override
Definition: server_async.cc:169
grpc::testing::AsyncQpsServerTest::PerThreadShutdownState::shutdown
bool shutdown
Definition: server_async.cc:532
grpc::testing::AsyncQpsServerTest::ServerRpcContext::lock
void lock()
Definition: server_async.cc:240
benchmark.request
request
Definition: benchmark.py:77
buf
voidpf void * buf
Definition: bloaty/third_party/zlib/contrib/minizip/ioapi.h:136
testing::internal::string
::std::string string
Definition: bloaty/third_party/protobuf/third_party/googletest/googletest/include/gtest/internal/gtest-port.h:881
grpc::testing::AsyncQpsServerTest::ServerRpcContextUnaryImpl::ServerRpcContextUnaryImpl
ServerRpcContextUnaryImpl(std::function< void(ServerContextType *, RequestType *, grpc::ServerAsyncResponseWriter< ResponseType > *, void *)> request_method, std::function< grpc::Status(RequestType *, ResponseType *)> invoke_method)
Definition: server_async.cc:255
grpc::testing::CreateAsyncGenericServer
std::unique_ptr< Server > CreateAsyncGenericServer(const ServerConfig &config)
Definition: server_async.cc:588
grpc::testing::AsyncQpsServerTest::PerThreadShutdownState::mutex
std::mutex mutex
Definition: server_async.cc:531
completion_queue.h
grpc::GenericServerContext
Definition: grpcpp/impl/codegen/async_generic_service.h:41
gpr_inf_future
GPRAPI gpr_timespec gpr_inf_future(gpr_clock_type type)
Definition: src/core/lib/gpr/time.cc:55
grpc::ServerAsyncReaderWriter< ResponseType, RequestType >
ctx
static struct test_ctx ctx
Definition: test-ipc-send-recv.c:65
async_generic_service.h
grpc::testing::ProcessSimpleRPC
static Status ProcessSimpleRPC(const PayloadConfig &, SimpleRequest *request, SimpleResponse *response)
Definition: server_async.cc:548
grpc::testing::AsyncQpsServerTest::ServerRpcContext::unlock
void unlock()
Definition: server_async.cc:241
grpc::testing::AsyncQpsServerTest::detag
static ServerRpcContext * detag(void *tag)
Definition: server_async.cc:249
env.new
def new
Definition: env.py:51
grpc::testing::AsyncQpsServerTest::ServerRpcContextStreamingImpl::finish_done
bool finish_done(bool)
Definition: server_async.cc:373
grpc::testing::AsyncQpsServerTest::ServerRpcContextStreamingFromServerImpl::request_done
bool request_done(bool ok)
Definition: server_async.cc:486
grpc::testing::AsyncQpsServerTest::ServerRpcContextStreamingFromServerImpl::finish_done
bool finish_done(bool)
Definition: server_async.cc:510
absl::base_internal::Next
static AllocList * Next(int i, AllocList *prev, LowLevelAlloc::Arena *arena)
Definition: abseil-cpp/absl/base/internal/low_level_alloc.cc:453
server_address
std::string server_address("0.0.0.0:10000")
grpc::testing::AsyncQpsServerTest::PerThreadShutdownState
Definition: server_async.cc:530
grpc::testing::AsyncQpsServerTest::PerThreadShutdownState::PerThreadShutdownState
PerThreadShutdownState()
Definition: server_async.cc:533
qps_server_builder.h
grpc::testing::AsyncQpsServerTest::tag
static void * tag(ServerRpcContext *func)
Definition: server_async.cc:248
grpc::testing::AsyncQpsServerTest::ServerRpcContextStreamingFromServerImpl::response_
ResponseType response_
Definition: server_async.cc:514
grpc::testing::AsyncQpsServerTest::cq_
std::vector< int > cq_
Definition: server_async.cc:526
grpc::testing::AsyncQpsServerTest::ServerRpcContextUnaryImpl::invoke_method_
std::function< grpc::Status(RequestType *, ResponseType *)> invoke_method_
Definition: server_async.cc:305
grpc::testing::AsyncQpsServerTest::ServerRpcContextStreamingFromServerImpl::ServerRpcContextStreamingFromServerImpl
ServerRpcContextStreamingFromServerImpl(std::function< void(ServerContextType *, RequestType *, grpc::ServerAsyncWriter< ResponseType > *, void *)> request_method, std::function< grpc::Status(RequestType *, ResponseType *)> invoke_method)
Definition: server_async.cc:459
grpc::testing::AsyncQpsServerTest::ServerRpcContextStreamingFromClientImpl::Reset
void Reset() override
Definition: server_async.cc:405
grpc::ServerAsyncWriter::Finish
void Finish(const grpc::Status &status, void *tag) override
Definition: grpcpp/impl/codegen/async_stream.h:921
grpc::testing::AsyncQpsServerTest::ServerRpcContextStreamingImpl::next_state_
bool(ServerRpcContextStreamingImpl::* next_state_)(bool)
Definition: server_async.cc:378
grpc::testing::AsyncQpsServerTest::ServerRpcContextStreamingImpl::read_done
bool read_done(bool ok)
Definition: server_async.cc:347
profile_analyzer.builder
builder
Definition: profile_analyzer.py:159
asyncio_get_stats.args
args
Definition: asyncio_get_stats.py:40
grpc::testing::AsyncQpsServerTest::ServerRpcContextStreamingFromClientImpl
Definition: server_async.cc:387
grpc::testing::AsyncQpsServerTest::ServerRpcContextStreamingFromServerImpl::srv_ctx_
std::unique_ptr< ServerContextType > srv_ctx_
Definition: server_async.cc:512
grpc::testing::AsyncQpsServerTest::ServerRpcContextUnaryImpl::next_state_
bool(ServerRpcContextUnaryImpl::* next_state_)(bool)
Definition: server_async.cc:301
max
int max
Definition: bloaty/third_party/zlib/examples/enough.c:170
grpc::testing::AsyncQpsServerTest::ServerRpcContextStreamingFromServerImpl::Reset
void Reset() override
Definition: server_async.cc:474
grpc::ServerBuilder
A builder class for the creation and startup of grpc::Server instances.
Definition: grpcpp/server_builder.h:86
grpc::testing::AsyncQpsServerTest::ServerRpcContextUnaryImpl::~ServerRpcContextUnaryImpl
~ServerRpcContextUnaryImpl() override
Definition: server_async.cc:269
grpc::testing::AsyncQpsServerTest::async_service_
ServiceType async_service_
Definition: server_async.cc:527
slice
grpc_slice slice
Definition: src/core/lib/surface/server.cc:467
gpr_log
GPRAPI void gpr_log(const char *file, int line, gpr_log_severity severity, const char *format,...) GPR_PRINT_FORMAT_CHECK(4
grpc::testing::AsyncQpsServerTest::ServerRpcContextUnaryImpl::req_
RequestType req_
Definition: server_async.cc:299
grpc::testing::AsyncQpsServerTest::ServerRpcContextUnaryImpl::response_
ResponseType response_
Definition: server_async.cc:300
grpc::testing::AsyncQpsServerTest::ServerRpcContextStreamingImpl::ServerRpcContextStreamingImpl
ServerRpcContextStreamingImpl(std::function< void(ServerContextType *, grpc::ServerAsyncReaderWriter< ResponseType, RequestType > *, void *)> request_method, std::function< grpc::Status(RequestType *, ResponseType *)> invoke_method)
Definition: server_async.cc:311
grpc::testing::AsyncQpsServerTest::ServerRpcContextUnaryImpl::response_writer_
grpc::ServerAsyncResponseWriter< ResponseType > response_writer_
Definition: server_async.cc:306
grpc.h
grpc_core::JoinHostPort
std::string JoinHostPort(absl::string_view host, int port)
Definition: host_port.cc:32
grpc::testing::AsyncQpsServerTest::ServerRpcContextUnaryImpl::RunNextState
bool RunNextState(bool ok) override
Definition: server_async.cc:270
grpc::testing::AsyncQpsServerTest::ServerRpcContextStreamingImpl::write_done
bool write_done(bool ok)
Definition: server_async.cc:362
grpc::testing::Server::ApplyConfigToBuilder
static void ApplyConfigToBuilder(const ServerConfig &config, ServerBuilder *builder)
Definition: test/cpp/qps/server.h:123
grpc::testing::AsyncQpsServerTest::ServerRpcContext::Reset
virtual void Reset()=0
grpc::testing::AsyncQpsServerTest::ServerRpcContextStreamingFromClientImpl::RunNextState
bool RunNextState(bool ok) override
Definition: server_async.cc:404
grpc::testing::AsyncQpsServerTest::ServerRpcContextStreamingFromClientImpl::next_state_
bool(ServerRpcContextStreamingFromClientImpl::* next_state_)(bool)
Definition: server_async.cc:447
grpc::ByteBuffer
A sequence of bytes.
Definition: include/grpcpp/impl/codegen/byte_buffer.h:61
grpc::testing::AsyncQpsServerTest::ServerRpcContextUnaryImpl::invoker
bool invoker(bool ok)
Definition: server_async.cc:285
grpc::testing::AsyncQpsServerTest::ServerRpcContextStreamingFromClientImpl::response_
ResponseType response_
Definition: server_async.cc:446
grpc::testing::AsyncQpsServerTest::ServerRpcContextStreamingFromClientImpl::stream_
grpc::ServerAsyncReader< ResponseType, RequestType > stream_
Definition: server_async.cc:453
grpc::Status::OK
static const Status & OK
An OK pre-defined instance.
Definition: include/grpcpp/impl/codegen/status.h:113
grpc::ServerAsyncReaderWriter::Write
void Write(const W &msg, void *tag) override
Definition: grpcpp/impl/codegen/async_stream.h:1042
grpc::testing::AsyncQpsServerTest::ServerRpcContextStreamingImpl::Reset
void Reset() override
Definition: server_async.cc:326
grpc::testing::AsyncQpsServerTest::ServerRpcContextStreamingFromClientImpl::ServerRpcContextStreamingFromClientImpl
ServerRpcContextStreamingFromClientImpl(std::function< void(ServerContextType *, grpc::ServerAsyncReader< ResponseType, RequestType > *, void *)> request_method, std::function< grpc::Status(RequestType *, ResponseType *)> invoke_method)
Definition: server_async.cc:390
grpc::ServerAsyncReader::Read
void Read(R *msg, void *tag) override
Definition: grpcpp/impl/codegen/async_stream.h:721
host_port.h
GPR_ERROR
#define GPR_ERROR
Definition: include/grpc/impl/codegen/log.h:57
grpc::testing::Server::CreateServerCredentials
static std::shared_ptr< ServerCredentials > CreateServerCredentials(const ServerConfig &config)
Definition: test/cpp/qps/server.h:98
min
#define min(a, b)
Definition: qsort.h:83
grpc::ServerAsyncReader::Finish
void Finish(const W &msg, const grpc::Status &status, void *tag) override
Definition: grpcpp/impl/codegen/async_stream.h:738
config.h
grpc::testing::Server::SetPayload
static bool SetPayload(PayloadType type, int size, Payload *payload)
Definition: test/cpp/qps/server.h:82
grpc::testing::AsyncQpsServerTest::GetPollCount
int GetPollCount() override
Definition: server_async.cc:192
grpc::testing::AsyncQpsServerTest::InProcessChannel
std::shared_ptr< Channel > InProcessChannel(const ChannelArguments &args) override
Definition: server_async.cc:200
grpc::testing::Server
Definition: test/cpp/qps/server.h:42
grpc::testing::AsyncQpsServerTest::ServerRpcContextStreamingFromClientImpl::request_method_
std::function< void(ServerContextType *, grpc::ServerAsyncReader< ResponseType, RequestType > *, void *)> request_method_
Definition: server_async.cc:451
server_credentials.h
grpc::ServerAsyncResponseWriter::Finish
void Finish(const W &msg, const grpc::Status &status, void *tag)
Definition: grpcpp/impl/codegen/async_unary_call.h:340
test_config.h
grpc::testing::AsyncQpsServerTest::contexts_
std::vector< std::unique_ptr< ServerRpcContext > > contexts_
Definition: server_async.cc:528
grpc::testing::AsyncQpsServerTest::ServerRpcContextUnaryImpl::request_method_
std::function< void(ServerContextType *, RequestType *, grpc::ServerAsyncResponseWriter< ResponseType > *, void *)> request_method_
Definition: server_async.cc:304
grpc::testing::AsyncQpsServerTest::srv_cqs_
std::vector< std::unique_ptr< grpc::ServerCompletionQueue > > srv_cqs_
Definition: server_async.cc:525
grpc::ChannelArguments
Definition: grpcpp/support/channel_arguments.h:39
grpc::testing::CreateQpsServerBuilder
std::unique_ptr< ServerBuilder > CreateQpsServerBuilder()
Definition: qps_server_builder.cc:37
func
const EVP_CIPHER *(* func)(void)
Definition: cipher_extra.c:73
grpc::testing::AsyncQpsServerTest::ServerRpcContext::mu_
std::mutex mu_
Definition: server_async.cc:246
grpc::testing::AsyncQpsServerTest::ServerRpcContextStreamingImpl::request_done
bool request_done(bool ok)
Definition: server_async.cc:338
grpc::testing::AsyncQpsServerTest::ServerRpcContextUnaryImpl::srv_ctx_
std::unique_ptr< ServerContextType > srv_ctx_
Definition: server_async.cc:298
count
int * count
Definition: bloaty/third_party/googletest/googlemock/test/gmock_stress_test.cc:96
grpc::ServerAsyncWriter::Write
void Write(const W &msg, void *tag) override
Definition: grpcpp/impl/codegen/async_stream.h:870
grpc::testing::AsyncQpsServerTest::ServerRpcContextStreamingFromClientImpl::invoke_method_
std::function< grpc::Status(RequestType *, ResponseType *)> invoke_method_
Definition: server_async.cc:452
grpc::testing::AsyncQpsServerTest::ServerRpcContextStreamingFromServerImpl::request_method_
std::function< void(ServerContextType *, RequestType *, grpc::ServerAsyncWriter< ResponseType > *, void *)> request_method_
Definition: server_async.cc:518
grpc::testing::AsyncQpsServerTest::ServerRpcContextStreamingImpl::req_
RequestType req_
Definition: server_async.cc:376
alloc.h
server_context.h
asyncio_get_stats.response
response
Definition: asyncio_get_stats.py:28
num_threads
static volatile int num_threads
Definition: benchmark-thread.c:30
grpc::testing::Server::port
int port() const
Definition: test/cpp/qps/server.h:96
grpc::ServerAsyncReaderWriter::Read
void Read(R *msg, void *tag) override
Definition: grpcpp/impl/codegen/async_stream.h:1036
grpc::testing::AsyncQpsServerTest::shutdown_state_
std::vector< std::unique_ptr< PerThreadShutdownState > > shutdown_state_
Definition: server_async.cc:536
grpc::protobuf::util::Status
GRPC_CUSTOM_UTIL_STATUS Status
Definition: include/grpcpp/impl/codegen/config_protobuf.h:93
grpc::testing::AsyncQpsServerTest
Definition: server_async.cc:49
grpc::Status
Definition: include/grpcpp/impl/codegen/status.h:35
ok
bool ok
Definition: async_end2end_test.cc:197
grpc::testing::AsyncQpsServerTest::ServerRpcContextStreamingImpl::response_
ResponseType response_
Definition: server_async.cc:377
grpc::testing::Server::cores
int cores() const
Definition: test/cpp/qps/server.h:97
grpc::testing::AsyncQpsServerTest::ServerRpcContext::ServerRpcContext
ServerRpcContext()
Definition: server_async.cc:239
grpc::testing::AsyncQpsServerTest::threads_
std::vector< std::thread > threads_
Definition: server_async.cc:523
config_s
Definition: bloaty/third_party/zlib/deflate.c:120
grpc::testing::AsyncQpsServerTest::ServerRpcContextStreamingFromServerImpl::next_state_
bool(ServerRpcContextStreamingFromServerImpl::* next_state_)(bool)
Definition: server_async.cc:515
grpc::AsyncGenericService::RequestCall
void RequestCall(GenericServerContext *ctx, GenericServerAsyncReaderWriter *reader_writer, grpc::CompletionQueue *call_cq, grpc::ServerCompletionQueue *notification_cq, void *tag)
Definition: async_generic_service.cc:25
grpc::testing::AsyncQpsServerTest::ThreadFunc
void ThreadFunc(int thread_idx)
Definition: server_async.cc:206
grpc::testing::AsyncQpsServerTest::ServerRpcContextStreamingFromServerImpl
Definition: server_async.cc:456
grpc::testing::AsyncQpsServerTest::ServerRpcContextStreamingImpl::srv_ctx_
std::unique_ptr< ServerContextType > srv_ctx_
Definition: server_async.cc:375
resource_quota.h
grpc::testing::AsyncQpsServerTest::ServerRpcContextStreamingFromServerImpl::invoke_method_
std::function< grpc::Status(RequestType *, ResponseType *)> invoke_method_
Definition: server_async.cc:519
grpc::testing::AsyncQpsServerTest::ServerRpcContextStreamingFromServerImpl::stream_
grpc::ServerAsyncWriter< ResponseType > stream_
Definition: server_async.cc:520
grpc::testing::AsyncQpsServerTest::ServerRpcContextStreamingFromClientImpl::read_done
bool read_done(bool ok)
Definition: server_async.cc:426
grpc::testing::AsyncQpsServerTest::ServerRpcContextStreamingFromClientImpl::req_
RequestType req_
Definition: server_async.cc:445
grpc::testing::AsyncQpsServerTest::ServerRpcContextStreamingImpl::stream_
grpc::ServerAsyncReaderWriter< ResponseType, RequestType > stream_
Definition: server_async.cc:384
grpc::CompletionQueue
Definition: include/grpcpp/impl/codegen/completion_queue.h:104
server.h
grpc::ServerAsyncResponseWriter< ResponseType >
grpc::Slice
Definition: include/grpcpp/impl/codegen/slice.h:36
grpc::ServerAsyncReaderWriter::Finish
void Finish(const grpc::Status &status, void *tag) override
Definition: grpcpp/impl/codegen/async_stream.h:1092
grpc::testing::RegisterGenericService
static void RegisterGenericService(ServerBuilder *builder, grpc::AsyncGenericService *service)
Definition: server_async.cc:543
service
__attribute__((deprecated("Please use GRPCProtoMethod."))) @interface ProtoMethod NSString * service
Definition: ProtoMethod.h:25
grpc.StatusCode.INTERNAL
tuple INTERNAL
Definition: src/python/grpcio/grpc/__init__.py:277
function
std::function< bool(GrpcTool *, int, const char **, const CliCredentials &, GrpcToolOutputCallback)> function
Definition: grpc_tool.cc:250
messages_pb2.SimpleResponse
SimpleResponse
Definition: messages_pb2.py:604
grpc::testing::AsyncQpsServerTest::AsyncQpsServerTest
AsyncQpsServerTest(const ServerConfig &config, std::function< void(ServerBuilder *, ServiceType *)> register_service, std::function< void(ServiceType *, ServerContextType *, RequestType *, ServerAsyncResponseWriter< ResponseType > *, CompletionQueue *, ServerCompletionQueue *, void *)> request_unary_function, std::function< void(ServiceType *, ServerContextType *, ServerAsyncReaderWriter< ResponseType, RequestType > *, CompletionQueue *, ServerCompletionQueue *, void *)> request_streaming_function, std::function< void(ServiceType *, ServerContextType *, ServerAsyncReader< ResponseType, RequestType > *, CompletionQueue *, ServerCompletionQueue *, void *)> request_streaming_from_client_function, std::function< void(ServiceType *, ServerContextType *, RequestType *, ServerAsyncWriter< ResponseType > *, CompletionQueue *, ServerCompletionQueue *, void *)> request_streaming_from_server_function, std::function< void(ServiceType *, ServerContextType *, ServerAsyncReaderWriter< ResponseType, RequestType > *, CompletionQueue *, ServerCompletionQueue *, void *)> request_streaming_both_ways_function, std::function< grpc::Status(const PayloadConfig &, RequestType *, ResponseType *)> process_rpc)
Definition: server_async.cc:51
grpc::ServerAsyncWriter< ResponseType >
GPR_CLOCK_REALTIME
@ GPR_CLOCK_REALTIME
Definition: gpr_types.h:39
grpc::testing::AsyncQpsServerTest::ServerRpcContextStreamingFromServerImpl::RunNextState
bool RunNextState(bool ok) override
Definition: server_async.cc:473
grpc::testing::AsyncQpsServerTest::ServerRpcContextUnaryImpl
Definition: server_async.cc:253
grpc_get_cq_poll_num
int grpc_get_cq_poll_num(grpc_completion_queue *cq)
Definition: completion_queue.cc:585
server_builder.h
cq
static grpc_completion_queue * cq
Definition: test/core/fling/client.cc:37
grpc::testing::AsyncQpsServerTest::ServerRpcContextStreamingFromClientImpl::request_done
bool request_done(bool ok)
Definition: server_async.cc:417
i
uint64_t i
Definition: abseil-cpp/absl/container/btree_benchmark.cc:230
server.h
grpc::testing::AsyncQpsServerTest::ServerRpcContextStreamingImpl::invoke_method_
std::function< grpc::Status(RequestType *, ResponseType *)> invoke_method_
Definition: server_async.cc:383
grpc::testing::CreateAsyncServer
std::unique_ptr< Server > CreateAsyncServer(const ServerConfig &config)
Definition: server_async.cc:575
grpc::testing::AsyncQpsServerTest::ServerRpcContextStreamingImpl::~ServerRpcContextStreamingImpl
~ServerRpcContextStreamingImpl() override
Definition: server_async.cc:324


grpc
Author(s):
autogenerated on Thu Mar 13 2025 03:01:16