Go to the documentation of this file.
20 #include <forward_list>
39 #include "src/proto/grpc/testing/benchmark_service.grpc.pb.h"
48 class ServerContextType>
52 const ServerConfig&
config,
57 request_unary_function,
61 request_streaming_function,
65 request_streaming_from_client_function,
69 request_streaming_from_server_function,
73 request_streaming_both_ways_function,
80 auto port_num =
port();
95 "Sizing async server to %d threads. Defaults to number of cores "
96 "in machine or 64 threads if machine has more than 64 cores to "
103 for (
int i = 0;
i < num_cqs;
i++) {
119 auto process_rpc_bound =
120 std::bind(process_rpc,
config.payload_config(), std::placeholders::_1,
121 std::placeholders::_2);
123 for (
int i = 0;
i < 5000;
i++) {
124 for (
int j = 0; j < num_cqs; j++) {
125 if (request_unary_function) {
126 auto request_unary = std::bind(
128 std::placeholders::_2, std::placeholders::_3,
srv_cqs_[j].
get(),
133 if (request_streaming_function) {
134 auto request_streaming = std::bind(
136 std::placeholders::_1, std::placeholders::_2,
srv_cqs_[j].
get(),
139 request_streaming, process_rpc_bound));
141 if (request_streaming_from_client_function) {
142 auto request_streaming_from_client = std::bind(
144 std::placeholders::_1, std::placeholders::_2,
srv_cqs_[j].
get(),
147 request_streaming_from_client, process_rpc_bound));
149 if (request_streaming_from_server_function) {
150 auto request_streaming_from_server =
151 std::bind(request_streaming_from_server_function, &
async_service_,
152 std::placeholders::_1, std::placeholders::_2,
156 request_streaming_from_server, process_rpc_bound));
158 if (request_streaming_both_ways_function) {
171 std::lock_guard<std::mutex> lock((*ss)->mutex);
172 (*ss)->shutdown =
true;
187 while ((*cq)->Next(&got_tag, &
ok)) {
225 }
while (
srv_cqs_[
cq_[thread_idx]]->DoThenAsyncNext(
226 [&,
ctx,
ok, mu_ptr]() {
228 if (!
ctx->RunNextState(
ok)) {
244 virtual void Reset() = 0;
272 srv_ctx_.reset(
new ServerContextType);
327 srv_ctx_.reset(
new ServerContextType);
406 srv_ctx_.reset(
new ServerContextType);
475 srv_ctx_.reset(
new ServerContextType);
525 std::vector<std::unique_ptr<grpc::ServerCompletionQueue>>
srv_cqs_;
528 std::vector<std::unique_ptr<ServerRpcContext>>
contexts_;
540 BenchmarkService::AsyncService*
service) {
550 if (
request->response_size() > 0) {
567 int resp_size = payload_config.bytebuf_params().resp_size();
568 std::unique_ptr<char[]>
buf(
new char[resp_size]);
569 memset(
buf.get(), 0,
static_cast<size_t>(resp_size));
576 return std::unique_ptr<Server>(
578 BenchmarkService::AsyncService,
581 &BenchmarkService::AsyncService::RequestUnaryCall,
582 &BenchmarkService::AsyncService::RequestStreamingCall,
583 &BenchmarkService::AsyncService::RequestStreamingFromClient,
584 &BenchmarkService::AsyncService::RequestStreamingFromServer,
585 &BenchmarkService::AsyncService::RequestStreamingBothWays,
589 return std::unique_ptr<Server>(
std::function< void(ServerContextType *, grpc::ServerAsyncReaderWriter< ResponseType, RequestType > *, void *)> request_method_
std::chrono::duration< std::int_fast64_t > seconds
~ServerRpcContextStreamingFromServerImpl() override
std::unique_ptr< ServerContextType > srv_ctx_
virtual bool RunNextState(bool)=0
static void RegisterBenchmarkService(ServerBuilder *builder, BenchmarkService::AsyncService *service)
~ServerRpcContextStreamingFromClientImpl() override
absl::string_view get(const Cont &c)
std::unique_ptr< grpc::Server > server_
bool RunNextState(bool ok) override
static Status ProcessGenericRPC(const PayloadConfig &payload_config, ByteBuffer *request, ByteBuffer *response)
return memset(p, 0, total)
virtual ~ServerRpcContext()
~AsyncQpsServerTest() override
ServerRpcContextUnaryImpl(std::function< void(ServerContextType *, RequestType *, grpc::ServerAsyncResponseWriter< ResponseType > *, void *)> request_method, std::function< grpc::Status(RequestType *, ResponseType *)> invoke_method)
std::unique_ptr< Server > CreateAsyncGenericServer(const ServerConfig &config)
GPRAPI gpr_timespec gpr_inf_future(gpr_clock_type type)
static struct test_ctx ctx
static Status ProcessSimpleRPC(const PayloadConfig &, SimpleRequest *request, SimpleResponse *response)
static ServerRpcContext * detag(void *tag)
bool request_done(bool ok)
static AllocList * Next(int i, AllocList *prev, LowLevelAlloc::Arena *arena)
std::string server_address("0.0.0.0:10000")
static void * tag(ServerRpcContext *func)
std::function< grpc::Status(RequestType *, ResponseType *)> invoke_method_
ServerRpcContextStreamingFromServerImpl(std::function< void(ServerContextType *, RequestType *, grpc::ServerAsyncWriter< ResponseType > *, void *)> request_method, std::function< grpc::Status(RequestType *, ResponseType *)> invoke_method)
void Finish(const grpc::Status &status, void *tag) override
bool(ServerRpcContextStreamingImpl::* next_state_)(bool)
std::unique_ptr< ServerContextType > srv_ctx_
bool(ServerRpcContextUnaryImpl::* next_state_)(bool)
A builder class for the creation and startup of grpc::Server instances.
~ServerRpcContextUnaryImpl() override
ServiceType async_service_
GPRAPI void gpr_log(const char *file, int line, gpr_log_severity severity, const char *format,...) GPR_PRINT_FORMAT_CHECK(4
ServerRpcContextStreamingImpl(std::function< void(ServerContextType *, grpc::ServerAsyncReaderWriter< ResponseType, RequestType > *, void *)> request_method, std::function< grpc::Status(RequestType *, ResponseType *)> invoke_method)
grpc::ServerAsyncResponseWriter< ResponseType > response_writer_
std::string JoinHostPort(absl::string_view host, int port)
bool RunNextState(bool ok) override
static void ApplyConfigToBuilder(const ServerConfig &config, ServerBuilder *builder)
bool RunNextState(bool ok) override
bool(ServerRpcContextStreamingFromClientImpl::* next_state_)(bool)
grpc::ServerAsyncReader< ResponseType, RequestType > stream_
static const Status & OK
An OK pre-defined instance.
void Write(const W &msg, void *tag) override
ServerRpcContextStreamingFromClientImpl(std::function< void(ServerContextType *, grpc::ServerAsyncReader< ResponseType, RequestType > *, void *)> request_method, std::function< grpc::Status(RequestType *, ResponseType *)> invoke_method)
void Read(R *msg, void *tag) override
static std::shared_ptr< ServerCredentials > CreateServerCredentials(const ServerConfig &config)
void Finish(const W &msg, const grpc::Status &status, void *tag) override
static bool SetPayload(PayloadType type, int size, Payload *payload)
int GetPollCount() override
std::shared_ptr< Channel > InProcessChannel(const ChannelArguments &args) override
std::function< void(ServerContextType *, grpc::ServerAsyncReader< ResponseType, RequestType > *, void *)> request_method_
void Finish(const W &msg, const grpc::Status &status, void *tag)
std::vector< std::unique_ptr< ServerRpcContext > > contexts_
std::function< void(ServerContextType *, RequestType *, grpc::ServerAsyncResponseWriter< ResponseType > *, void *)> request_method_
std::vector< std::unique_ptr< grpc::ServerCompletionQueue > > srv_cqs_
std::unique_ptr< ServerBuilder > CreateQpsServerBuilder()
bool request_done(bool ok)
std::unique_ptr< ServerContextType > srv_ctx_
void Write(const W &msg, void *tag) override
std::function< grpc::Status(RequestType *, ResponseType *)> invoke_method_
std::function< void(ServerContextType *, RequestType *, grpc::ServerAsyncWriter< ResponseType > *, void *)> request_method_
static volatile int num_threads
void Read(R *msg, void *tag) override
std::vector< std::unique_ptr< PerThreadShutdownState > > shutdown_state_
GRPC_CUSTOM_UTIL_STATUS Status
std::vector< std::thread > threads_
bool(ServerRpcContextStreamingFromServerImpl::* next_state_)(bool)
void RequestCall(GenericServerContext *ctx, GenericServerAsyncReaderWriter *reader_writer, grpc::CompletionQueue *call_cq, grpc::ServerCompletionQueue *notification_cq, void *tag)
void ThreadFunc(int thread_idx)
std::unique_ptr< ServerContextType > srv_ctx_
std::function< grpc::Status(RequestType *, ResponseType *)> invoke_method_
grpc::ServerAsyncWriter< ResponseType > stream_
grpc::ServerAsyncReaderWriter< ResponseType, RequestType > stream_
void Finish(const grpc::Status &status, void *tag) override
static void RegisterGenericService(ServerBuilder *builder, grpc::AsyncGenericService *service)
__attribute__((deprecated("Please use GRPCProtoMethod."))) @interface ProtoMethod NSString * service
AsyncQpsServerTest(const ServerConfig &config, std::function< void(ServerBuilder *, ServiceType *)> register_service, std::function< void(ServiceType *, ServerContextType *, RequestType *, ServerAsyncResponseWriter< ResponseType > *, CompletionQueue *, ServerCompletionQueue *, void *)> request_unary_function, std::function< void(ServiceType *, ServerContextType *, ServerAsyncReaderWriter< ResponseType, RequestType > *, CompletionQueue *, ServerCompletionQueue *, void *)> request_streaming_function, std::function< void(ServiceType *, ServerContextType *, ServerAsyncReader< ResponseType, RequestType > *, CompletionQueue *, ServerCompletionQueue *, void *)> request_streaming_from_client_function, std::function< void(ServiceType *, ServerContextType *, RequestType *, ServerAsyncWriter< ResponseType > *, CompletionQueue *, ServerCompletionQueue *, void *)> request_streaming_from_server_function, std::function< void(ServiceType *, ServerContextType *, ServerAsyncReaderWriter< ResponseType, RequestType > *, CompletionQueue *, ServerCompletionQueue *, void *)> request_streaming_both_ways_function, std::function< grpc::Status(const PayloadConfig &, RequestType *, ResponseType *)> process_rpc)
bool RunNextState(bool ok) override
int grpc_get_cq_poll_num(grpc_completion_queue *cq)
static grpc_completion_queue * cq
bool request_done(bool ok)
std::function< grpc::Status(RequestType *, ResponseType *)> invoke_method_
std::unique_ptr< Server > CreateAsyncServer(const ServerConfig &config)
~ServerRpcContextStreamingImpl() override
grpc
Author(s):
autogenerated on Thu Mar 13 2025 03:01:16