generic_end2end_test.cc
Go to the documentation of this file.
1 /*
2  *
3  * Copyright 2015 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
19 #include <memory>
20 #include <thread>
21 
22 #include <gtest/gtest.h>
23 
24 #include "absl/memory/memory.h"
25 
26 #include <grpc/grpc.h>
27 #include <grpc/support/time.h>
28 #include <grpcpp/channel.h>
29 #include <grpcpp/client_context.h>
30 #include <grpcpp/create_channel.h>
34 #include <grpcpp/server.h>
35 #include <grpcpp/server_builder.h>
36 #include <grpcpp/server_context.h>
37 #include <grpcpp/support/slice.h>
38 
39 #include "src/proto/grpc/testing/echo.grpc.pb.h"
40 #include "test/core/util/port.h"
43 
44 using grpc::testing::EchoRequest;
45 using grpc::testing::EchoResponse;
46 
47 namespace grpc {
48 namespace testing {
49 namespace {
50 
51 void* tag(int i) { return reinterpret_cast<void*>(i); }
52 
53 void verify_ok(CompletionQueue* cq, int i, bool expect_ok) {
54  bool ok;
55  void* got_tag;
56  EXPECT_TRUE(cq->Next(&got_tag, &ok));
57  EXPECT_EQ(expect_ok, ok);
58  EXPECT_EQ(tag(i), got_tag);
59 }
60 
61 class GenericEnd2endTest : public ::testing::Test {
62  protected:
63  GenericEnd2endTest() : server_host_("localhost") {}
64 
65  void SetUp() override {
66  shut_down_ = false;
68  server_address_ << server_host_ << ":" << port;
69  // Setup server
70  ServerBuilder builder;
71  builder.AddListeningPort(server_address_.str(),
73  builder.RegisterAsyncGenericService(&generic_service_);
74  // Include a second call to RegisterAsyncGenericService to make sure that
75  // we get an error in the log, since it is not allowed to have 2 async
76  // generic services
77  builder.RegisterAsyncGenericService(&generic_service_);
78  srv_cq_ = builder.AddCompletionQueue();
79  server_ = builder.BuildAndStart();
80  }
81 
82  void ShutDownServerAndCQs() {
83  if (!shut_down_) {
84  server_->Shutdown();
85  void* ignored_tag;
86  bool ignored_ok;
87  cli_cq_.Shutdown();
88  srv_cq_->Shutdown();
89  while (cli_cq_.Next(&ignored_tag, &ignored_ok)) {
90  }
91  while (srv_cq_->Next(&ignored_tag, &ignored_ok)) {
92  }
93  shut_down_ = true;
94  }
95  }
96  void TearDown() override { ShutDownServerAndCQs(); }
97 
98  void ResetStub() {
99  std::shared_ptr<Channel> channel = grpc::CreateChannel(
101  stub_ = grpc::testing::EchoTestService::NewStub(channel);
102  generic_stub_ = absl::make_unique<GenericStub>(channel);
103  }
104 
105  void server_ok(int i) { verify_ok(srv_cq_.get(), i, true); }
106  void client_ok(int i) { verify_ok(&cli_cq_, i, true); }
107  void server_fail(int i) { verify_ok(srv_cq_.get(), i, false); }
108  void client_fail(int i) { verify_ok(&cli_cq_, i, false); }
109 
110  void SendRpc(int num_rpcs) {
111  SendRpc(num_rpcs, false, gpr_inf_future(GPR_CLOCK_MONOTONIC));
112  }
113 
114  void SendRpc(int num_rpcs, bool check_deadline, gpr_timespec deadline) {
115  const std::string kMethodName("/grpc.cpp.test.util.EchoTestService/Echo");
116  for (int i = 0; i < num_rpcs; i++) {
117  EchoRequest send_request;
118  EchoRequest recv_request;
119  EchoResponse send_response;
120  EchoResponse recv_response;
121  Status recv_status;
122 
123  ClientContext cli_ctx;
124  GenericServerContext srv_ctx;
126 
127  // The string needs to be long enough to test heap-based slice.
128  send_request.set_message("Hello world. Hello world. Hello world.");
129 
130  if (check_deadline) {
131  cli_ctx.set_deadline(deadline);
132  }
133 
134  // Rather than using the original kMethodName, make a short-lived
135  // copy to also confirm that we don't refer to this object beyond
136  // the initial call preparation
137  const std::string* method_name = new std::string(kMethodName);
138 
139  std::unique_ptr<GenericClientAsyncReaderWriter> call =
140  generic_stub_->PrepareCall(&cli_ctx, *method_name, &cli_cq_);
141 
142  delete method_name; // Make sure that this is not needed after invocation
143 
144  std::thread request_call([this]() { server_ok(4); });
145  call->StartCall(tag(1));
146  client_ok(1);
147  std::unique_ptr<ByteBuffer> send_buffer =
149  call->Write(*send_buffer, tag(2));
150  // Send ByteBuffer can be destroyed after calling Write.
151  send_buffer.reset();
152  client_ok(2);
153  call->WritesDone(tag(3));
154  client_ok(3);
155 
156  generic_service_.RequestCall(&srv_ctx, &stream, srv_cq_.get(),
157  srv_cq_.get(), tag(4));
158 
159  request_call.join();
160  EXPECT_EQ(server_host_, srv_ctx.host().substr(0, server_host_.length()));
161  EXPECT_EQ(kMethodName, srv_ctx.method());
162 
163  if (check_deadline) {
164  EXPECT_TRUE(gpr_time_similar(deadline, srv_ctx.raw_deadline(),
166  }
167 
168  ByteBuffer recv_buffer;
169  stream.Read(&recv_buffer, tag(5));
170  server_ok(5);
171  EXPECT_TRUE(ParseFromByteBuffer(&recv_buffer, &recv_request));
172  EXPECT_EQ(send_request.message(), recv_request.message());
173 
174  send_response.set_message(recv_request.message());
175  send_buffer = SerializeToByteBuffer(&send_response);
176  stream.Write(*send_buffer, tag(6));
177  send_buffer.reset();
178  server_ok(6);
179 
180  stream.Finish(Status::OK, tag(7));
181  server_ok(7);
182 
183  recv_buffer.Clear();
184  call->Read(&recv_buffer, tag(8));
185  client_ok(8);
186  EXPECT_TRUE(ParseFromByteBuffer(&recv_buffer, &recv_response));
187 
188  call->Finish(&recv_status, tag(9));
189  client_ok(9);
190 
191  EXPECT_EQ(send_response.message(), recv_response.message());
192  EXPECT_TRUE(recv_status.ok());
193  }
194  }
195 
196  // Return errors to up to one call that comes in on the supplied completion
197  // queue, until the CQ is being shut down (and therefore we can no longer
198  // enqueue further events).
199  void DriveCompletionQueue() {
200  enum class Event : uintptr_t {
201  kCallReceived,
202  kResponseSent,
203  };
204  // Request the call, but only if the main thread hasn't beaten us to
205  // shutting down the CQ.
206  grpc::GenericServerContext server_context;
207  grpc::GenericServerAsyncReaderWriter reader_writer(&server_context);
208 
209  {
210  std::lock_guard<std::mutex> lock(shutting_down_mu_);
211  if (!shutting_down_) {
212  generic_service_.RequestCall(
213  &server_context, &reader_writer, srv_cq_.get(), srv_cq_.get(),
214  reinterpret_cast<void*>(Event::kCallReceived));
215  }
216  }
217  // Process events.
218  {
219  Event event;
220  bool ok;
221  while (srv_cq_->Next(reinterpret_cast<void**>(&event), &ok)) {
222  std::lock_guard<std::mutex> lock(shutting_down_mu_);
223  if (shutting_down_) {
224  // The main thread has started shutting down. Simply continue to drain
225  // events.
226  continue;
227  }
228 
229  switch (event) {
230  case Event::kCallReceived:
231  reader_writer.Finish(
233  reinterpret_cast<void*>(Event::kResponseSent));
234  break;
235 
236  case Event::kResponseSent:
237  // We are done.
238  break;
239  }
240  }
241  }
242  }
243 
244  CompletionQueue cli_cq_;
245  std::unique_ptr<ServerCompletionQueue> srv_cq_;
246  std::unique_ptr<grpc::testing::EchoTestService::Stub> stub_;
247  std::unique_ptr<grpc::GenericStub> generic_stub_;
248  std::unique_ptr<Server> server_;
249  AsyncGenericService generic_service_;
251  std::ostringstream server_address_;
255 };
256 
257 TEST_F(GenericEnd2endTest, SimpleRpc) {
258  ResetStub();
259  SendRpc(1);
260 }
261 
262 TEST_F(GenericEnd2endTest, SequentialRpcs) {
263  ResetStub();
264  SendRpc(10);
265 }
266 
267 TEST_F(GenericEnd2endTest, SequentialUnaryRpcs) {
268  ResetStub();
269  const int num_rpcs = 10;
270  const std::string kMethodName("/grpc.cpp.test.util.EchoTestService/Echo");
271  for (int i = 0; i < num_rpcs; i++) {
272  EchoRequest send_request;
273  EchoRequest recv_request;
274  EchoResponse send_response;
275  EchoResponse recv_response;
276  Status recv_status;
277 
278  ClientContext cli_ctx;
279  GenericServerContext srv_ctx;
281 
282  // The string needs to be long enough to test heap-based slice.
283  send_request.set_message("Hello world. Hello world. Hello world.");
284 
285  std::unique_ptr<ByteBuffer> cli_send_buffer =
287  std::thread request_call([this]() { server_ok(4); });
288  std::unique_ptr<GenericClientAsyncResponseReader> call =
289  generic_stub_->PrepareUnaryCall(&cli_ctx, kMethodName, *cli_send_buffer,
290  &cli_cq_);
291  call->StartCall();
292  ByteBuffer cli_recv_buffer;
293  call->Finish(&cli_recv_buffer, &recv_status, tag(1));
294  std::thread client_check([this] { client_ok(1); });
295 
296  generic_service_.RequestCall(&srv_ctx, &stream, srv_cq_.get(),
297  srv_cq_.get(), tag(4));
298  request_call.join();
299  EXPECT_EQ(server_host_, srv_ctx.host().substr(0, server_host_.length()));
300  EXPECT_EQ(kMethodName, srv_ctx.method());
301 
302  ByteBuffer srv_recv_buffer;
303  stream.Read(&srv_recv_buffer, tag(5));
304  server_ok(5);
305  EXPECT_TRUE(ParseFromByteBuffer(&srv_recv_buffer, &recv_request));
306  EXPECT_EQ(send_request.message(), recv_request.message());
307 
308  send_response.set_message(recv_request.message());
309  std::unique_ptr<ByteBuffer> srv_send_buffer =
310  SerializeToByteBuffer(&send_response);
311  stream.Write(*srv_send_buffer, tag(6));
312  server_ok(6);
313 
314  stream.Finish(Status::OK, tag(7));
315  server_ok(7);
316 
317  client_check.join();
318  EXPECT_TRUE(ParseFromByteBuffer(&cli_recv_buffer, &recv_response));
319  EXPECT_EQ(send_response.message(), recv_response.message());
320  EXPECT_TRUE(recv_status.ok());
321  }
322 }
323 
324 // One ping, one pong.
325 TEST_F(GenericEnd2endTest, SimpleBidiStreaming) {
326  ResetStub();
327 
328  const std::string kMethodName(
329  "/grpc.cpp.test.util.EchoTestService/BidiStream");
330  EchoRequest send_request;
331  EchoRequest recv_request;
332  EchoResponse send_response;
333  EchoResponse recv_response;
334  Status recv_status;
335  ClientContext cli_ctx;
336  GenericServerContext srv_ctx;
337  GenericServerAsyncReaderWriter srv_stream(&srv_ctx);
338 
339  cli_ctx.set_compression_algorithm(GRPC_COMPRESS_GZIP);
340  send_request.set_message("Hello");
341  std::thread request_call([this]() { server_ok(2); });
342  std::unique_ptr<GenericClientAsyncReaderWriter> cli_stream =
343  generic_stub_->PrepareCall(&cli_ctx, kMethodName, &cli_cq_);
344  cli_stream->StartCall(tag(1));
345  client_ok(1);
346 
347  generic_service_.RequestCall(&srv_ctx, &srv_stream, srv_cq_.get(),
348  srv_cq_.get(), tag(2));
349  request_call.join();
350 
351  EXPECT_EQ(server_host_, srv_ctx.host().substr(0, server_host_.length()));
352  EXPECT_EQ(kMethodName, srv_ctx.method());
353 
354  std::unique_ptr<ByteBuffer> send_buffer =
356  cli_stream->Write(*send_buffer, tag(3));
357  send_buffer.reset();
358  client_ok(3);
359 
360  ByteBuffer recv_buffer;
361  srv_stream.Read(&recv_buffer, tag(4));
362  server_ok(4);
363  EXPECT_TRUE(ParseFromByteBuffer(&recv_buffer, &recv_request));
364  EXPECT_EQ(send_request.message(), recv_request.message());
365 
366  send_response.set_message(recv_request.message());
367  send_buffer = SerializeToByteBuffer(&send_response);
368  srv_stream.Write(*send_buffer, tag(5));
369  send_buffer.reset();
370  server_ok(5);
371 
372  cli_stream->Read(&recv_buffer, tag(6));
373  client_ok(6);
374  EXPECT_TRUE(ParseFromByteBuffer(&recv_buffer, &recv_response));
375  EXPECT_EQ(send_response.message(), recv_response.message());
376 
377  cli_stream->WritesDone(tag(7));
378  client_ok(7);
379 
380  srv_stream.Read(&recv_buffer, tag(8));
381  server_fail(8);
382 
383  srv_stream.Finish(Status::OK, tag(9));
384  server_ok(9);
385 
386  cli_stream->Finish(&recv_status, tag(10));
387  client_ok(10);
388 
389  EXPECT_EQ(send_response.message(), recv_response.message());
390  EXPECT_TRUE(recv_status.ok());
391 }
392 
393 TEST_F(GenericEnd2endTest, Deadline) {
394  ResetStub();
395  SendRpc(1, true,
398 }
399 
400 TEST_F(GenericEnd2endTest, ShortDeadline) {
401  ResetStub();
402 
403  ClientContext cli_ctx;
404  EchoRequest request;
405  EchoResponse response;
406 
407  shutting_down_ = false;
408  std::thread driver([this] { DriveCompletionQueue(); });
409 
410  request.set_message("");
411  cli_ctx.set_deadline(gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC),
413  Status s = stub_->Echo(&cli_ctx, request, &response);
414  EXPECT_FALSE(s.ok());
415  {
416  std::lock_guard<std::mutex> lock(shutting_down_mu_);
417  shutting_down_ = true;
418  }
419  ShutDownServerAndCQs();
420  driver.join();
421 }
422 
423 } // namespace
424 } // namespace testing
425 } // namespace grpc
426 
427 int main(int argc, char** argv) {
428  grpc::testing::TestEnvironment env(&argc, argv);
429  ::testing::InitGoogleTest(&argc, argv);
430  return RUN_ALL_TESTS();
431 }
EXPECT_FALSE
#define EXPECT_FALSE(condition)
Definition: bloaty/third_party/googletest/googletest/include/gtest/gtest.h:1970
GPR_TIMESPAN
@ GPR_TIMESPAN
Definition: gpr_types.h:45
testing
Definition: aws_request_signer_test.cc:25
shutting_down_
bool shutting_down_
Definition: generic_end2end_test.cc:252
gpr_time_similar
GPRAPI int gpr_time_similar(gpr_timespec a, gpr_timespec b, gpr_timespec threshold)
Definition: src/core/lib/gpr/time.cc:206
port.h
generate.env
env
Definition: generate.py:37
send_buffer
static char * send_buffer
Definition: test-tcp-writealot.c:38
grpc
Definition: grpcpp/alarm.h:33
request_call
static void request_call(grpc_end2end_proxy *proxy)
Definition: proxy.cc:432
mutex
static uv_mutex_t mutex
Definition: threadpool.c:34
server_address_
std::ostringstream server_address_
Definition: generic_end2end_test.cc:251
send_request
Definition: ares_private.h:147
benchmark.request
request
Definition: benchmark.py:77
testing::internal::string
::std::string string
Definition: bloaty/third_party/protobuf/third_party/googletest/googletest/include/gtest/internal/gtest-port.h:881
slice.h
server_host_
const std::string server_host_
Definition: generic_end2end_test.cc:250
grpc::GenericServerContext
Definition: grpcpp/impl/codegen/async_generic_service.h:41
gpr_inf_future
GPRAPI gpr_timespec gpr_inf_future(gpr_clock_type type)
Definition: src/core/lib/gpr/time.cc:55
grpc::ServerAsyncReaderWriter
Definition: grpcpp/impl/codegen/async_stream.h:1010
async_generic_service.h
cli_cq_
CompletionQueue cli_cq_
Definition: generic_end2end_test.cc:244
absl::FormatConversionChar::s
@ s
time.h
testing::Test
Definition: bloaty/third_party/googletest/googletest/include/gtest/gtest.h:402
call
FilterStackCall * call
Definition: call.cc:750
grpc::testing::ParseFromByteBuffer
bool ParseFromByteBuffer(ByteBuffer *buffer, grpc::protobuf::Message *message)
Definition: byte_buffer_proto_helper.cc:26
profile_analyzer.builder
builder
Definition: profile_analyzer.py:159
channel
wrapped_grpc_channel * channel
Definition: src/php/ext/grpc/call.h:33
stub_
std::unique_ptr< grpc::testing::EchoTestService::Stub > stub_
Definition: generic_end2end_test.cc:246
shut_down_
bool shut_down_
Definition: generic_end2end_test.cc:253
shutting_down_mu_
std::mutex shutting_down_mu_
Definition: generic_end2end_test.cc:254
grpc.h
grpc::testing::SerializeToByteBuffer
std::unique_ptr< ByteBuffer > SerializeToByteBuffer(grpc::protobuf::Message *message)
Definition: byte_buffer_proto_helper.cc:37
channel.h
GPR_CLOCK_MONOTONIC
@ GPR_CLOCK_MONOTONIC
Definition: gpr_types.h:36
grpc::Status::OK
static const Status & OK
An OK pre-defined instance.
Definition: include/grpcpp/impl/codegen/status.h:113
grpc::CreateChannel
std::shared_ptr< Channel > CreateChannel(const grpc::string &target, const std::shared_ptr< ChannelCredentials > &creds)
uintptr_t
_W64 unsigned int uintptr_t
Definition: stdint-msvc2008.h:119
RUN_ALL_TESTS
int RUN_ALL_TESTS() GTEST_MUST_USE_RESULT_
Definition: bloaty/third_party/googletest/googletest/include/gtest/gtest.h:2471
grpc_pick_unused_port_or_die
int grpc_pick_unused_port_or_die(void)
gpr_now
GPRAPI gpr_timespec gpr_now(gpr_clock_type clock)
generic_stub.h
grpc.StatusCode.UNIMPLEMENTED
tuple UNIMPLEMENTED
Definition: src/python/grpcio/grpc/__init__.py:276
grpc::testing::tag
static void * tag(intptr_t t)
Definition: h2_ssl_cert_test.cc:263
tests.unit._exit_scenarios.port
port
Definition: _exit_scenarios.py:179
test_config.h
client_context.h
testing::InitGoogleTest
GTEST_API_ void InitGoogleTest(int *argc, char **argv)
Definition: bloaty/third_party/googletest/googletest/src/gtest.cc:6106
server_
std::unique_ptr< Server > server_
Definition: generic_end2end_test.cc:248
grpc::testing::TEST_F
TEST_F(ChannelArgumentsTest, SetInt)
Definition: channel_arguments_test.cc:134
gpr_time_add
GPRAPI gpr_timespec gpr_time_add(gpr_timespec a, gpr_timespec b)
Definition: src/core/lib/gpr/time.cc:135
gpr_time_from_micros
GPRAPI gpr_timespec gpr_time_from_micros(int64_t us, gpr_clock_type clock_type)
Definition: src/core/lib/gpr/time.cc:115
server_context.h
asyncio_get_stats.response
response
Definition: asyncio_get_stats.py:28
grpc::testing::TestEnvironment
Definition: test/core/util/test_config.h:54
GRPC_COMPRESS_GZIP
@ GRPC_COMPRESS_GZIP
Definition: compression_types.h:63
proto_utils.h
grpc::protobuf::util::Status
GRPC_CUSTOM_UTIL_STATUS Status
Definition: include/grpcpp/impl/codegen/config_protobuf.h:93
grpc::Status
Definition: include/grpcpp/impl/codegen/status.h:35
ok
bool ok
Definition: async_end2end_test.cc:197
generic_service_
AsyncGenericService generic_service_
Definition: generic_end2end_test.cc:249
grpc::testing::EXPECT_EQ
EXPECT_EQ(options.token_exchange_service_uri, "https://foo/exchange")
srv_cq_
std::unique_ptr< ServerCompletionQueue > srv_cq_
Definition: generic_end2end_test.cc:245
gpr_time_from_millis
GPRAPI gpr_timespec gpr_time_from_millis(int64_t ms, gpr_clock_type clock_type)
Definition: src/core/lib/gpr/time.cc:119
grpc::GenericServerAsyncReaderWriter
ServerAsyncReaderWriter< ByteBuffer, ByteBuffer > GenericServerAsyncReaderWriter
Definition: grpcpp/impl/codegen/async_generic_service.h:36
main
int main(int argc, char **argv)
Definition: generic_end2end_test.cc:427
grpc::InsecureServerCredentials
std::shared_ptr< ServerCredentials > InsecureServerCredentials()
Definition: insecure_server_credentials.cc:52
grpc::testing::EXPECT_TRUE
EXPECT_TRUE(grpc::experimental::StsCredentialsOptionsFromJson(minimum_valid_json, &options) .ok())
server.h
gpr_timespec
Definition: gpr_types.h:50
grpc::InsecureChannelCredentials
std::shared_ptr< ChannelCredentials > InsecureChannelCredentials()
Credentials for an unencrypted, unauthenticated channel.
Definition: cpp/client/insecure_credentials.cc:69
grpc::testing::SendRpc
static void SendRpc(grpc::testing::EchoTestService::Stub *stub, int num_rpcs, bool allow_exhaustion, gpr_atm *errors)
Definition: thread_stress_test.cc:277
method_name
absl::string_view method_name
Definition: call_creds_util.cc:40
thread
static uv_thread_t thread
Definition: test-async-null-cb.c:29
server_builder.h
cq
static grpc_completion_queue * cq
Definition: test/core/fling/client.cc:37
byte_buffer_proto_helper.h
i
uint64_t i
Definition: abseil-cpp/absl/container/btree_benchmark.cc:230
create_channel.h
gpr_time_from_seconds
GPRAPI gpr_timespec gpr_time_from_seconds(int64_t s, gpr_clock_type clock_type)
Definition: src/core/lib/gpr/time.cc:123
generic_stub_
std::unique_ptr< grpc::GenericStub > generic_stub_
Definition: generic_end2end_test.cc:247
stream
voidpf stream
Definition: bloaty/third_party/zlib/contrib/minizip/ioapi.h:136


grpc
Author(s):
autogenerated on Fri May 16 2025 02:58:28