callback_streaming_ping_pong.h
Go to the documentation of this file.
1 /*
2  *
3  * Copyright 2019 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
19 #ifndef TEST_CPP_MICROBENCHMARKS_CALLBACK_STREAMING_PING_PONG_H
20 #define TEST_CPP_MICROBENCHMARKS_CALLBACK_STREAMING_PING_PONG_H
21 
22 #include <sstream>
23 
24 #include <benchmark/benchmark.h>
25 
27 #include "src/proto/grpc/testing/echo.grpc.pb.h"
31 
32 namespace grpc {
33 namespace testing {
34 
35 /*******************************************************************************
36  * BENCHMARKING KERNELS
37  */
38 
39 class BidiClient : public grpc::ClientBidiReactor<EchoRequest, EchoResponse> {
40  public:
41  BidiClient(benchmark::State* state, EchoTestService::Stub* stub,
42  ClientContext* cli_ctx, EchoRequest* request,
43  EchoResponse* response)
44  : state_{state},
45  stub_{stub},
46  cli_ctx_{cli_ctx},
49  msgs_size_ = state->range(0);
50  msgs_to_send_ = state->range(1);
51  StartNewRpc();
52  }
53 
54  void OnReadDone(bool ok) override {
55  if (!ok) {
56  gpr_log(GPR_ERROR, "Client read failed");
57  return;
58  }
59  MaybeWrite();
60  }
61 
62  void OnWriteDone(bool ok) override {
63  if (!ok) {
64  gpr_log(GPR_ERROR, "Client write failed");
65  return;
66  }
68  StartRead(response_);
69  }
70 
71  void OnDone(const Status& s) override {
72  GPR_ASSERT(s.ok());
74  if (state_->KeepRunning()) {
75  writes_complete_ = 0;
76  StartNewRpc();
77  } else {
78  std::unique_lock<std::mutex> l(mu);
79  done = true;
80  cv.notify_one();
81  }
82  }
83 
84  void StartNewRpc() {
85  cli_ctx_->~ClientContext();
87  cli_ctx_->AddMetadata(kServerMessageSize, std::to_string(msgs_size_));
88  stub_->async()->BidiStream(cli_ctx_, this);
89  MaybeWrite();
90  StartCall();
91  }
92 
93  void Await() {
94  std::unique_lock<std::mutex> l(mu);
95  while (!done) {
96  cv.wait(l);
97  }
98  }
99 
100  private:
101  void MaybeWrite() {
104  } else {
105  StartWritesDone();
106  }
107  }
108 
110  EchoTestService::Stub* stub_;
112  EchoRequest* request_;
113  EchoResponse* response_;
118  std::condition_variable cv;
119  bool done = false;
120 };
121 
122 template <class Fixture, class ClientContextMutator, class ServerContextMutator>
124  int message_size = state.range(0);
125  int max_ping_pongs = state.range(1);
127  std::unique_ptr<Fixture> fixture(new Fixture(&service));
128  std::unique_ptr<EchoTestService::Stub> stub_(
129  EchoTestService::NewStub(fixture->channel()));
130  EchoRequest request;
131  EchoResponse response;
132  ClientContext cli_ctx;
133  if (message_size > 0) {
134  request.set_message(std::string(message_size, 'a'));
135  } else {
136  request.set_message("");
137  }
138  if (state.KeepRunning()) {
139  GPR_TIMER_SCOPE("BenchmarkCycle", 0);
140  BidiClient test{&state, stub_.get(), &cli_ctx, &request, &response};
141  test.Await();
142  }
143  fixture->Finish(state);
144  fixture.reset();
145  state.SetBytesProcessed(2 * message_size * max_ping_pongs *
146  state.iterations());
147 }
148 
149 } // namespace testing
150 } // namespace grpc
151 #endif // TEST_CPP_MICROBENCHMARKS_CALLBACK_STREAMING_PING_PONG_H
Fixture
Definition: bm_call_create.cc:359
testing
Definition: aws_request_signer_test.cc:25
grpc::testing::CallbackStreamingTestService
Definition: callback_test_service.h:37
stub_
std::unique_ptr< grpc::testing::EchoTestService::Stub > stub_
Definition: client_channel_stress_test.cc:331
response_
grpc_http_response response_
Definition: google_c2p_resolver.cc:101
timers.h
grpc
Definition: grpcpp/alarm.h:33
mutex
static uv_mutex_t mutex
Definition: threadpool.c:34
GPR_TIMER_SCOPE
#define GPR_TIMER_SCOPE(tag, important)
Definition: src/core/lib/profiling/timers.h:43
grpc::ClientBidiReactor< EchoRequest, EchoResponse >::StartCall
void StartCall()
Definition: impl/codegen/client_callback.h:246
grpc::ClientBidiReactor
ClientBidiReactor is the interface for a bidirectional streaming RPC.
Definition: impl/codegen/client_callback.h:151
test
Definition: spinlock_test.cc:36
benchmark.request
request
Definition: benchmark.py:77
testing::internal::string
::std::string string
Definition: bloaty/third_party/protobuf/third_party/googletest/googletest/include/gtest/internal/gtest-port.h:881
grpc::ClientBidiReactor< EchoRequest, EchoResponse >::StartWrite
void StartWrite(const EchoRequest *req)
Definition: impl/codegen/client_callback.h:261
fullstack_context_mutators.h
async_greeter_client.stub
stub
Definition: hellostreamingworld/async_greeter_client.py:26
fullstack_fixtures.h
grpc::testing::BidiClient::OnReadDone
void OnReadDone(bool ok) override
Definition: callback_streaming_ping_pong.h:56
cli_ctx_
ClientContext cli_ctx_
Definition: raw_end2end_test.cc:164
GPR_ASSERT
#define GPR_ASSERT(x)
Definition: include/grpc/impl/codegen/log.h:94
fixture
static const char fixture[]
Definition: test-fs-copyfile.c:36
grpc::testing::BidiClient::msgs_size_
int msgs_size_
Definition: callback_streaming_ping_pong.h:118
grpc::testing::BidiClient::BidiClient
BidiClient(benchmark::State *state, EchoTestService::Stub *stub, ClientContext *cli_ctx, EchoRequest *request, EchoResponse *response)
Definition: callback_streaming_ping_pong.h:43
gpr_log
GPRAPI void gpr_log(const char *file, int line, gpr_log_severity severity, const char *format,...) GPR_PRINT_FORMAT_CHECK(4
grpc::testing::BidiClient::MaybeWrite
void MaybeWrite()
Definition: callback_streaming_ping_pong.h:103
done
struct tab * done
Definition: bloaty/third_party/zlib/examples/enough.c:176
grpc::testing::kServerMessageSize
const char *const kServerMessageSize
Definition: callback_test_service.h:35
GPR_ERROR
#define GPR_ERROR
Definition: include/grpc/impl/codegen/log.h:57
request_
EchoRequest request_
Definition: client_callback_end2end_test.cc:724
grpc::ClientContext
Definition: grpcpp/impl/codegen/client_context.h:195
msgs_to_send_
const int msgs_to_send_
Definition: client_callback_end2end_test.cc:1212
grpc::testing::cv
static gpr_cv cv
Definition: bm_cq.cc:163
benchmark::State
Definition: benchmark/include/benchmark/benchmark.h:503
asyncio_get_stats.response
response
Definition: asyncio_get_stats.py:28
grpc::Status
Definition: include/grpcpp/impl/codegen/status.h:35
ok
bool ok
Definition: async_end2end_test.cc:197
state
Definition: bloaty/third_party/zlib/contrib/blast/blast.c:41
state_
grpc_connectivity_state state_
Definition: channel_connectivity.cc:213
writes_complete_
int writes_complete_
Definition: client_callback_end2end_test.cc:1211
grpc::testing::BM_CallbackBidiStreaming
static void BM_CallbackBidiStreaming(benchmark::State &state)
Definition: callback_streaming_ping_pong.h:123
service
__attribute__((deprecated("Please use GRPCProtoMethod."))) @interface ProtoMethod NSString * service
Definition: ProtoMethod.h:25
grpc::testing::BidiClient::StartNewRpc
void StartNewRpc()
Definition: callback_streaming_ping_pong.h:86
callback_test_service.h
to_string
static bool to_string(zval *from)
Definition: protobuf/php/ext/google/protobuf/convert.c:333
state
static struct rpc_state state
Definition: bad_server_response_test.cc:87
grpc::testing::mu
static gpr_mu mu
Definition: bm_cq.cc:162


grpc
Author(s):
autogenerated on Fri May 16 2025 02:57:51