fullstack_streaming_pump.h
Go to the documentation of this file.
1 /*
2  *
3  * Copyright 2016 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
19 /* Benchmark gRPC end2end in various configurations */
20 
21 #ifndef TEST_CPP_MICROBENCHMARKS_FULLSTACK_STREAMING_PUMP_H
22 #define TEST_CPP_MICROBENCHMARKS_FULLSTACK_STREAMING_PUMP_H
23 
24 #include <sstream>
25 
26 #include <benchmark/benchmark.h>
27 
29 #include "src/proto/grpc/testing/echo.grpc.pb.h"
32 
33 namespace grpc {
34 namespace testing {
35 
36 /*******************************************************************************
37  * BENCHMARKING KERNELS
38  */
39 
40 static void* tag(intptr_t x) { return reinterpret_cast<void*>(x); }
41 
42 template <class Fixture>
44  EchoTestService::AsyncService service;
45  std::unique_ptr<Fixture> fixture(new Fixture(&service));
46  {
47  EchoRequest send_request;
48  EchoRequest recv_request;
49  if (state.range(0) > 0) {
50  send_request.set_message(std::string(state.range(0), 'a'));
51  }
52  Status recv_status;
53  ServerContext svr_ctx;
55  service.RequestBidiStream(&svr_ctx, &response_rw, fixture->cq(),
56  fixture->cq(), tag(0));
57  std::unique_ptr<EchoTestService::Stub> stub(
58  EchoTestService::NewStub(fixture->channel()));
59  ClientContext cli_ctx;
60  auto request_rw = stub->AsyncBidiStream(&cli_ctx, fixture->cq(), tag(1));
61  int need_tags = (1 << 0) | (1 << 1);
62  void* t;
63  bool ok;
64  while (need_tags) {
65  GPR_ASSERT(fixture->cq()->Next(&t, &ok));
66  GPR_ASSERT(ok);
67  int i = static_cast<int>(reinterpret_cast<intptr_t>(t));
68  GPR_ASSERT(need_tags & (1 << i));
69  need_tags &= ~(1 << i);
70  }
71  response_rw.Read(&recv_request, tag(0));
72  for (auto _ : state) {
73  GPR_TIMER_SCOPE("BenchmarkCycle", 0);
74  request_rw->Write(send_request, tag(1));
75  while (true) {
76  GPR_ASSERT(fixture->cq()->Next(&t, &ok));
77  if (t == tag(0)) {
78  response_rw.Read(&recv_request, tag(0));
79  } else if (t == tag(1)) {
80  break;
81  } else {
82  GPR_ASSERT(false);
83  }
84  }
85  }
86  request_rw->WritesDone(tag(1));
87  need_tags = (1 << 0) | (1 << 1);
88  while (need_tags) {
89  GPR_ASSERT(fixture->cq()->Next(&t, &ok));
90  int i = static_cast<int>(reinterpret_cast<intptr_t>(t));
91  GPR_ASSERT(need_tags & (1 << i));
92  need_tags &= ~(1 << i);
93  }
94  response_rw.Finish(Status::OK, tag(0));
95  Status final_status;
96  request_rw->Finish(&final_status, tag(1));
97  need_tags = (1 << 0) | (1 << 1);
98  while (need_tags) {
99  GPR_ASSERT(fixture->cq()->Next(&t, &ok));
100  int i = static_cast<int>(reinterpret_cast<intptr_t>(t));
101  GPR_ASSERT(need_tags & (1 << i));
102  need_tags &= ~(1 << i);
103  }
104  GPR_ASSERT(final_status.ok());
105  }
106  fixture->Finish(state);
107  fixture.reset();
108  state.SetBytesProcessed(state.range(0) * state.iterations());
109 }
110 
111 template <class Fixture>
113  EchoTestService::AsyncService service;
114  std::unique_ptr<Fixture> fixture(new Fixture(&service));
115  {
116  EchoResponse send_response;
117  EchoResponse recv_response;
118  if (state.range(0) > 0) {
119  send_response.set_message(std::string(state.range(0), 'a'));
120  }
121  Status recv_status;
122  ServerContext svr_ctx;
124  service.RequestBidiStream(&svr_ctx, &response_rw, fixture->cq(),
125  fixture->cq(), tag(0));
126  std::unique_ptr<EchoTestService::Stub> stub(
127  EchoTestService::NewStub(fixture->channel()));
128  ClientContext cli_ctx;
129  auto request_rw = stub->AsyncBidiStream(&cli_ctx, fixture->cq(), tag(1));
130  int need_tags = (1 << 0) | (1 << 1);
131  void* t;
132  bool ok;
133  while (need_tags) {
134  GPR_ASSERT(fixture->cq()->Next(&t, &ok));
135  GPR_ASSERT(ok);
136  int i = static_cast<int>(reinterpret_cast<intptr_t>(t));
137  GPR_ASSERT(need_tags & (1 << i));
138  need_tags &= ~(1 << i);
139  }
140  request_rw->Read(&recv_response, tag(0));
141  for (auto _ : state) {
142  GPR_TIMER_SCOPE("BenchmarkCycle", 0);
143  response_rw.Write(send_response, tag(1));
144  while (true) {
145  GPR_ASSERT(fixture->cq()->Next(&t, &ok));
146  if (t == tag(0)) {
147  request_rw->Read(&recv_response, tag(0));
148  } else if (t == tag(1)) {
149  break;
150  } else {
151  GPR_ASSERT(false);
152  }
153  }
154  }
155  response_rw.Finish(Status::OK, tag(1));
156  need_tags = (1 << 0) | (1 << 1);
157  while (need_tags) {
158  GPR_ASSERT(fixture->cq()->Next(&t, &ok));
159  int i = static_cast<int>(reinterpret_cast<intptr_t>(t));
160  GPR_ASSERT(need_tags & (1 << i));
161  need_tags &= ~(1 << i);
162  }
163  }
164  fixture->Finish(state);
165  fixture.reset();
166  state.SetBytesProcessed(state.range(0) * state.iterations());
167 }
168 } // namespace testing
169 } // namespace grpc
170 
171 #endif // TEST_CPP_MICROBENCHMARKS_FULLSTACK_FIXTURES_H
Fixture
Definition: bm_call_create.cc:359
testing
Definition: aws_request_signer_test.cc:25
grpc::ServerContext
Definition: grpcpp/impl/codegen/server_context.h:566
timers.h
grpc
Definition: grpcpp/alarm.h:33
grpc::Status::ok
bool ok() const
Is the status OK?
Definition: include/grpcpp/impl/codegen/status.h:126
GPR_TIMER_SCOPE
#define GPR_TIMER_SCOPE(tag, important)
Definition: src/core/lib/profiling/timers.h:43
send_request
Definition: ares_private.h:147
testing::internal::string
::std::string string
Definition: bloaty/third_party/protobuf/third_party/googletest/googletest/include/gtest/internal/gtest-port.h:881
grpc::testing::BM_PumpStreamServerToClient
static void BM_PumpStreamServerToClient(benchmark::State &state)
Definition: fullstack_streaming_pump.h:112
grpc::ServerAsyncReaderWriter
Definition: grpcpp/impl/codegen/async_stream.h:1010
fullstack_context_mutators.h
async_greeter_client.stub
stub
Definition: hellostreamingworld/async_greeter_client.py:26
fullstack_fixtures.h
GPR_ASSERT
#define GPR_ASSERT(x)
Definition: include/grpc/impl/codegen/log.h:94
fixture
static const char fixture[]
Definition: test-fs-copyfile.c:36
gmock_output_test._
_
Definition: bloaty/third_party/googletest/googlemock/test/gmock_output_test.py:175
intptr_t
_W64 signed int intptr_t
Definition: stdint-msvc2008.h:118
grpc::Status::OK
static const Status & OK
An OK pre-defined instance.
Definition: include/grpcpp/impl/codegen/status.h:113
grpc::ServerAsyncReaderWriter::Write
void Write(const W &msg, void *tag) override
Definition: grpcpp/impl/codegen/async_stream.h:1042
x
int x
Definition: bloaty/third_party/googletest/googlemock/test/gmock-matchers_test.cc:3610
grpc::testing::tag
static void * tag(intptr_t t)
Definition: h2_ssl_cert_test.cc:263
grpc::ClientContext
Definition: grpcpp/impl/codegen/client_context.h:195
benchmark::State
Definition: benchmark/include/benchmark/benchmark.h:503
grpc::ServerAsyncReaderWriter::Read
void Read(R *msg, void *tag) override
Definition: grpcpp/impl/codegen/async_stream.h:1036
grpc::Status
Definition: include/grpcpp/impl/codegen/status.h:35
ok
bool ok
Definition: async_end2end_test.cc:197
state
Definition: bloaty/third_party/zlib/contrib/blast/blast.c:41
grpc::ServerAsyncReaderWriter::Finish
void Finish(const grpc::Status &status, void *tag) override
Definition: grpcpp/impl/codegen/async_stream.h:1092
service
__attribute__((deprecated("Please use GRPCProtoMethod."))) @interface ProtoMethod NSString * service
Definition: ProtoMethod.h:25
grpc::testing::BM_PumpStreamClientToServer
static void BM_PumpStreamClientToServer(benchmark::State &state)
Definition: fullstack_streaming_pump.h:43
i
uint64_t i
Definition: abseil-cpp/absl/container/btree_benchmark.cc:230


grpc
Author(s):
autogenerated on Thu Mar 13 2025 02:59:22