fullstack_streaming_ping_pong.h
Go to the documentation of this file.
1 /*
2  *
3  * Copyright 2016 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
19 /* Benchmark gRPC end2end in various configurations */
20 
21 #ifndef TEST_CPP_MICROBENCHMARKS_FULLSTACK_STREAMING_PING_PONG_H
22 #define TEST_CPP_MICROBENCHMARKS_FULLSTACK_STREAMING_PING_PONG_H
23 
24 #include <sstream>
25 
26 #include <benchmark/benchmark.h>
27 
29 #include "src/proto/grpc/testing/echo.grpc.pb.h"
32 
33 namespace grpc {
34 namespace testing {
35 
36 /*******************************************************************************
37  * BENCHMARKING KERNELS
38  */
39 
40 static void* tag(intptr_t x) { return reinterpret_cast<void*>(x); }
41 
42 // Repeatedly makes Streaming Bidi calls (exchanging a configurable number of
43 // messages in each call) in a loop on a single channel
44 //
45 // First parmeter (i.e state.range(0)): Message size (in bytes) to use
46 // Second parameter (i.e state.range(1)): Number of ping pong messages.
47 // Note: One ping-pong means two messages (one from client to server and
48 // the other from server to client):
49 template <class Fixture, class ClientContextMutator, class ServerContextMutator>
51  const int msg_size = state.range(0);
52  const int max_ping_pongs = state.range(1);
53 
54  EchoTestService::AsyncService service;
55  std::unique_ptr<Fixture> fixture(new Fixture(&service));
56  {
57  EchoResponse send_response;
58  EchoResponse recv_response;
59  EchoRequest send_request;
60  EchoRequest recv_request;
61 
62  if (msg_size > 0) {
63  send_request.set_message(std::string(msg_size, 'a'));
64  send_response.set_message(std::string(msg_size, 'b'));
65  }
66 
67  std::unique_ptr<EchoTestService::Stub> stub(
68  EchoTestService::NewStub(fixture->channel()));
69 
70  for (auto _ : state) {
71  ServerContext svr_ctx;
72  ServerContextMutator svr_ctx_mut(&svr_ctx);
74  service.RequestBidiStream(&svr_ctx, &response_rw, fixture->cq(),
75  fixture->cq(), tag(0));
76 
77  ClientContext cli_ctx;
78  ClientContextMutator cli_ctx_mut(&cli_ctx);
79  auto request_rw = stub->AsyncBidiStream(&cli_ctx, fixture->cq(), tag(1));
80 
81  // Establish async stream between client side and server side
82  void* t;
83  bool ok;
84  int need_tags = (1 << 0) | (1 << 1);
85  while (need_tags) {
86  GPR_ASSERT(fixture->cq()->Next(&t, &ok));
87  GPR_ASSERT(ok);
88  int i = static_cast<int>(reinterpret_cast<intptr_t>(t));
89  GPR_ASSERT(need_tags & (1 << i));
90  need_tags &= ~(1 << i);
91  }
92 
93  // Send 'max_ping_pongs' number of ping pong messages
94  int ping_pong_cnt = 0;
95  while (ping_pong_cnt < max_ping_pongs) {
96  request_rw->Write(send_request, tag(0)); // Start client send
97  response_rw.Read(&recv_request, tag(1)); // Start server recv
98  request_rw->Read(&recv_response, tag(2)); // Start client recv
99 
100  need_tags = (1 << 0) | (1 << 1) | (1 << 2) | (1 << 3);
101  while (need_tags) {
102  GPR_ASSERT(fixture->cq()->Next(&t, &ok));
103  GPR_ASSERT(ok);
104  int i = static_cast<int>(reinterpret_cast<intptr_t>(t));
105 
106  // If server recv is complete, start the server send operation
107  if (i == 1) {
108  response_rw.Write(send_response, tag(3));
109  }
110 
111  GPR_ASSERT(need_tags & (1 << i));
112  need_tags &= ~(1 << i);
113  }
114 
115  ping_pong_cnt++;
116  }
117 
118  request_rw->WritesDone(tag(0));
119  response_rw.Finish(Status::OK, tag(1));
120 
121  Status recv_status;
122  request_rw->Finish(&recv_status, tag(2));
123 
124  need_tags = (1 << 0) | (1 << 1) | (1 << 2);
125  while (need_tags) {
126  GPR_ASSERT(fixture->cq()->Next(&t, &ok));
127  int i = static_cast<int>(reinterpret_cast<intptr_t>(t));
128  GPR_ASSERT(need_tags & (1 << i));
129  need_tags &= ~(1 << i);
130  }
131 
132  GPR_ASSERT(recv_status.ok());
133  }
134  }
135 
136  fixture->Finish(state);
137  fixture.reset();
138  state.SetBytesProcessed(msg_size * state.iterations() * max_ping_pongs * 2);
139 }
140 
141 // Repeatedly sends ping pong messages in a single streaming Bidi call in a loop
142 // First parmeter (i.e state.range(0)): Message size (in bytes) to use
143 template <class Fixture, class ClientContextMutator, class ServerContextMutator>
145  const int msg_size = state.range(0);
146 
147  EchoTestService::AsyncService service;
148  std::unique_ptr<Fixture> fixture(new Fixture(&service));
149  {
150  EchoResponse send_response;
151  EchoResponse recv_response;
152  EchoRequest send_request;
153  EchoRequest recv_request;
154 
155  if (msg_size > 0) {
156  send_request.set_message(std::string(msg_size, 'a'));
157  send_response.set_message(std::string(msg_size, 'b'));
158  }
159 
160  std::unique_ptr<EchoTestService::Stub> stub(
161  EchoTestService::NewStub(fixture->channel()));
162 
163  ServerContext svr_ctx;
164  ServerContextMutator svr_ctx_mut(&svr_ctx);
166  service.RequestBidiStream(&svr_ctx, &response_rw, fixture->cq(),
167  fixture->cq(), tag(0));
168 
169  ClientContext cli_ctx;
170  ClientContextMutator cli_ctx_mut(&cli_ctx);
171  auto request_rw = stub->AsyncBidiStream(&cli_ctx, fixture->cq(), tag(1));
172 
173  // Establish async stream between client side and server side
174  void* t;
175  bool ok;
176  int need_tags = (1 << 0) | (1 << 1);
177  while (need_tags) {
178  GPR_ASSERT(fixture->cq()->Next(&t, &ok));
179  GPR_ASSERT(ok);
180  int i = static_cast<int>(reinterpret_cast<intptr_t>(t));
181  GPR_ASSERT(need_tags & (1 << i));
182  need_tags &= ~(1 << i);
183  }
184 
185  for (auto _ : state) {
186  GPR_TIMER_SCOPE("BenchmarkCycle", 0);
187  request_rw->Write(send_request, tag(0)); // Start client send
188  response_rw.Read(&recv_request, tag(1)); // Start server recv
189  request_rw->Read(&recv_response, tag(2)); // Start client recv
190 
191  need_tags = (1 << 0) | (1 << 1) | (1 << 2) | (1 << 3);
192  while (need_tags) {
193  GPR_ASSERT(fixture->cq()->Next(&t, &ok));
194  GPR_ASSERT(ok);
195  int i = static_cast<int>(reinterpret_cast<intptr_t>(t));
196 
197  // If server recv is complete, start the server send operation
198  if (i == 1) {
199  response_rw.Write(send_response, tag(3));
200  }
201 
202  GPR_ASSERT(need_tags & (1 << i));
203  need_tags &= ~(1 << i);
204  }
205  }
206 
207  request_rw->WritesDone(tag(0));
208  response_rw.Finish(Status::OK, tag(1));
209  Status recv_status;
210  request_rw->Finish(&recv_status, tag(2));
211 
212  need_tags = (1 << 0) | (1 << 1) | (1 << 2);
213  while (need_tags) {
214  GPR_ASSERT(fixture->cq()->Next(&t, &ok));
215  int i = static_cast<int>(reinterpret_cast<intptr_t>(t));
216  GPR_ASSERT(need_tags & (1 << i));
217  need_tags &= ~(1 << i);
218  }
219 
220  GPR_ASSERT(recv_status.ok());
221  }
222 
223  fixture->Finish(state);
224  fixture.reset();
225  state.SetBytesProcessed(msg_size * state.iterations() * 2);
226 }
227 
228 // Repeatedly makes Streaming Bidi calls (exchanging a configurable number of
229 // messages in each call) in a loop on a single channel. Different from
230 // BM_StreamingPingPong we are using stream coalescing api, e.g. WriteLast,
231 // WriteAndFinish, set_initial_metadata_corked. These apis aim at saving
232 // sendmsg syscalls for streaming by coalescing 1. initial metadata with first
233 // message; 2. final streaming message with trailing metadata.
234 //
235 // First parmeter (i.e state.range(0)): Message size (in bytes) to use
236 // Second parameter (i.e state.range(1)): Number of ping pong messages.
237 // Note: One ping-pong means two messages (one from client to server and
238 // the other from server to client):
239 // Third parameter (i.e state.range(2)): Switch between using WriteAndFinish
240 // API and WriteLast API for server.
241 template <class Fixture, class ClientContextMutator, class ServerContextMutator>
243  const int msg_size = state.range(0);
244  const int max_ping_pongs = state.range(1);
245  // This options is used to test out server API: WriteLast and WriteAndFinish
246  // respectively, since we can not use both of them on server side at the same
247  // time. Value 1 means we are testing out the WriteAndFinish API, and
248  // otherwise we are testing out the WriteLast API.
249  const int write_and_finish = state.range(2);
250 
251  EchoTestService::AsyncService service;
252  std::unique_ptr<Fixture> fixture(new Fixture(&service));
253  {
254  EchoResponse send_response;
255  EchoResponse recv_response;
256  EchoRequest send_request;
257  EchoRequest recv_request;
258 
259  if (msg_size > 0) {
260  send_request.set_message(std::string(msg_size, 'a'));
261  send_response.set_message(std::string(msg_size, 'b'));
262  }
263 
264  std::unique_ptr<EchoTestService::Stub> stub(
265  EchoTestService::NewStub(fixture->channel()));
266 
267  for (auto _ : state) {
268  ServerContext svr_ctx;
269  ServerContextMutator svr_ctx_mut(&svr_ctx);
271  service.RequestBidiStream(&svr_ctx, &response_rw, fixture->cq(),
272  fixture->cq(), tag(0));
273 
274  ClientContext cli_ctx;
275  ClientContextMutator cli_ctx_mut(&cli_ctx);
276  cli_ctx.set_initial_metadata_corked(true);
277  // tag:1 here will never comes up, since we are not performing any op due
278  // to initial metadata coalescing.
279  auto request_rw = stub->AsyncBidiStream(&cli_ctx, fixture->cq(), tag(1));
280 
281  void* t;
282  bool ok;
283  int expect_tags = 0;
284 
285  // Send 'max_ping_pongs' number of ping pong messages
286  int ping_pong_cnt = 0;
287  while (ping_pong_cnt < max_ping_pongs) {
288  if (ping_pong_cnt == max_ping_pongs - 1) {
289  request_rw->WriteLast(send_request, WriteOptions(), tag(2));
290  } else {
291  request_rw->Write(send_request, tag(2)); // Start client send
292  }
293 
294  int await_tags = (1 << 2);
295 
296  if (ping_pong_cnt == 0) {
297  // wait for the server call structure (call_hook, etc.) to be
298  // initialized (async stream between client side and server side
299  // established). It is necessary when client init metadata is
300  // coalesced
301  GPR_ASSERT(fixture->cq()->Next(&t, &ok));
302  while (static_cast<int>(reinterpret_cast<intptr_t>(t)) != 0) {
303  // In some cases tag:2 comes before tag:0 (write tag comes out
304  // first), this while loop is to make sure get tag:0.
305  int i = static_cast<int>(reinterpret_cast<intptr_t>(t));
306  GPR_ASSERT(await_tags & (1 << i));
307  await_tags &= ~(1 << i);
308  GPR_ASSERT(fixture->cq()->Next(&t, &ok));
309  }
310  }
311 
312  response_rw.Read(&recv_request, tag(3)); // Start server recv
313  request_rw->Read(&recv_response, tag(4)); // Start client recv
314 
315  await_tags |= (1 << 3) | (1 << 4);
316  expect_tags = await_tags;
317  await_tags |= (1 << 5);
318 
319  while (await_tags != 0) {
320  GPR_ASSERT(fixture->cq()->Next(&t, &ok));
321  GPR_ASSERT(ok);
322  int i = static_cast<int>(reinterpret_cast<intptr_t>(t));
323 
324  // If server recv is complete, start the server send operation
325  if (i == 3) {
326  if (ping_pong_cnt == max_ping_pongs - 1) {
327  if (write_and_finish == 1) {
328  response_rw.WriteAndFinish(send_response, WriteOptions(),
329  Status::OK, tag(5));
330  expect_tags |= (1 << 5);
331  } else {
332  response_rw.WriteLast(send_response, WriteOptions(), tag(5));
333  // WriteLast buffers the write, so it's possible neither server
334  // write op nor client read op will finish inside the while
335  // loop.
336  await_tags &= ~(1 << 4);
337  await_tags &= ~(1 << 5);
338  expect_tags |= (1 << 5);
339  }
340  } else {
341  response_rw.Write(send_response, tag(5));
342  expect_tags |= (1 << 5);
343  }
344  }
345 
346  GPR_ASSERT(expect_tags & (1 << i));
347  expect_tags &= ~(1 << i);
348  await_tags &= ~(1 << i);
349  }
350 
351  ping_pong_cnt++;
352  }
353 
354  if (max_ping_pongs == 0) {
355  expect_tags |= (1 << 6) | (1 << 7) | (1 << 8);
356  } else {
357  if (write_and_finish == 1) {
358  expect_tags |= (1 << 8);
359  } else {
360  // server's buffered write and the client's read of the buffered write
361  // tags should come up.
362  expect_tags |= (1 << 7) | (1 << 8);
363  }
364  }
365 
366  // No message write or initial metadata write happened yet.
367  if (max_ping_pongs == 0) {
368  request_rw->WritesDone(tag(6));
369  // wait for server call data structure(call_hook, etc.) to be
370  // initialized, since initial metadata is corked.
371  GPR_ASSERT(fixture->cq()->Next(&t, &ok));
372  while (static_cast<int>(reinterpret_cast<intptr_t>(t)) != 0) {
373  int i = static_cast<int>(reinterpret_cast<intptr_t>(t));
374  GPR_ASSERT(expect_tags & (1 << i));
375  expect_tags &= ~(1 << i);
376  GPR_ASSERT(fixture->cq()->Next(&t, &ok));
377  }
378  response_rw.Finish(Status::OK, tag(7));
379  } else {
380  if (write_and_finish != 1) {
381  response_rw.Finish(Status::OK, tag(7));
382  }
383  }
384 
385  Status recv_status;
386  request_rw->Finish(&recv_status, tag(8));
387 
388  while (expect_tags) {
389  GPR_ASSERT(fixture->cq()->Next(&t, &ok));
390  int i = static_cast<int>(reinterpret_cast<intptr_t>(t));
391  GPR_ASSERT(expect_tags & (1 << i));
392  expect_tags &= ~(1 << i);
393  }
394 
395  GPR_ASSERT(recv_status.ok());
396  }
397  }
398 
399  fixture->Finish(state);
400  fixture.reset();
401  state.SetBytesProcessed(msg_size * state.iterations() * max_ping_pongs * 2);
402 }
403 } // namespace testing
404 } // namespace grpc
405 
406 #endif // TEST_CPP_MICROBENCHMARKS_FULLSTACK_STREAMING_PING_PONG_H
grpc::testing::BM_StreamingPingPong
static void BM_StreamingPingPong(benchmark::State &state)
Definition: fullstack_streaming_ping_pong.h:50
Fixture
Definition: bm_call_create.cc:359
testing
Definition: aws_request_signer_test.cc:25
grpc::ServerContext
Definition: grpcpp/impl/codegen/server_context.h:566
timers.h
grpc
Definition: grpcpp/alarm.h:33
grpc::ServerAsyncReaderWriter::WriteAndFinish
void WriteAndFinish(const W &msg, grpc::WriteOptions options, const grpc::Status &status, void *tag) override
Definition: grpcpp/impl/codegen/async_stream.h:1071
grpc::Status::ok
bool ok() const
Is the status OK?
Definition: include/grpcpp/impl/codegen/status.h:126
GPR_TIMER_SCOPE
#define GPR_TIMER_SCOPE(tag, important)
Definition: src/core/lib/profiling/timers.h:43
send_request
Definition: ares_private.h:147
testing::internal::string
::std::string string
Definition: bloaty/third_party/protobuf/third_party/googletest/googletest/include/gtest/internal/gtest-port.h:881
grpc::ServerAsyncReaderWriter
Definition: grpcpp/impl/codegen/async_stream.h:1010
fullstack_context_mutators.h
async_greeter_client.stub
stub
Definition: hellostreamingworld/async_greeter_client.py:26
fullstack_fixtures.h
grpc::internal::AsyncWriterInterface::WriteLast
void WriteLast(const W &msg, grpc::WriteOptions options, void *tag)
Definition: grpcpp/impl/codegen/async_stream.h:158
grpc::testing::BM_StreamingPingPongMsgs
static void BM_StreamingPingPongMsgs(benchmark::State &state)
Definition: fullstack_streaming_ping_pong.h:144
grpc::WriteOptions
Per-message write options.
Definition: call_op_set.h:81
grpc::ClientContext::set_initial_metadata_corked
void set_initial_metadata_corked(bool corked)
Definition: grpcpp/impl/codegen/client_context.h:357
GPR_ASSERT
#define GPR_ASSERT(x)
Definition: include/grpc/impl/codegen/log.h:94
fixture
static const char fixture[]
Definition: test-fs-copyfile.c:36
gmock_output_test._
_
Definition: bloaty/third_party/googletest/googlemock/test/gmock_output_test.py:175
intptr_t
_W64 signed int intptr_t
Definition: stdint-msvc2008.h:118
grpc::Status::OK
static const Status & OK
An OK pre-defined instance.
Definition: include/grpcpp/impl/codegen/status.h:113
grpc::ServerAsyncReaderWriter::Write
void Write(const W &msg, void *tag) override
Definition: grpcpp/impl/codegen/async_stream.h:1042
x
int x
Definition: bloaty/third_party/googletest/googlemock/test/gmock-matchers_test.cc:3610
grpc::testing::tag
static void * tag(intptr_t t)
Definition: h2_ssl_cert_test.cc:263
grpc::ClientContext
Definition: grpcpp/impl/codegen/client_context.h:195
benchmark::State
Definition: benchmark/include/benchmark/benchmark.h:503
grpc::testing::BM_StreamingPingPongWithCoalescingApi
static void BM_StreamingPingPongWithCoalescingApi(benchmark::State &state)
Definition: fullstack_streaming_ping_pong.h:242
grpc::ServerAsyncReaderWriter::Read
void Read(R *msg, void *tag) override
Definition: grpcpp/impl/codegen/async_stream.h:1036
grpc::Status
Definition: include/grpcpp/impl/codegen/status.h:35
ok
bool ok
Definition: async_end2end_test.cc:197
state
Definition: bloaty/third_party/zlib/contrib/blast/blast.c:41
grpc::ServerAsyncReaderWriter::Finish
void Finish(const grpc::Status &status, void *tag) override
Definition: grpcpp/impl/codegen/async_stream.h:1092
service
__attribute__((deprecated("Please use GRPCProtoMethod."))) @interface ProtoMethod NSString * service
Definition: ProtoMethod.h:25
i
uint64_t i
Definition: abseil-cpp/absl/container/btree_benchmark.cc:230


grpc
Author(s):
autogenerated on Thu Mar 13 2025 02:59:22