hybrid_end2end_test.cc
Go to the documentation of this file.
1 /*
2  *
3  * Copyright 2016 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
19 #include <memory>
20 #include <thread>
21 
22 #include <gtest/gtest.h>
23 
24 #include <grpc/grpc.h>
25 #include <grpcpp/channel.h>
26 #include <grpcpp/client_context.h>
27 #include <grpcpp/create_channel.h>
29 #include <grpcpp/server.h>
30 #include <grpcpp/server_builder.h>
31 #include <grpcpp/server_context.h>
32 
33 #include "src/core/lib/gpr/env.h"
35 #include "src/proto/grpc/testing/duplicate/echo_duplicate.grpc.pb.h"
36 #include "src/proto/grpc/testing/echo.grpc.pb.h"
37 #include "test/core/util/port.h"
41 
42 namespace grpc {
43 namespace testing {
44 namespace {
45 
46 void* tag(int i) { return reinterpret_cast<void*>(i); }
47 
48 bool VerifyReturnSuccess(CompletionQueue* cq, int i) {
49  void* got_tag;
50  bool ok;
51  EXPECT_TRUE(cq->Next(&got_tag, &ok));
52  EXPECT_EQ(tag(i), got_tag);
53  return ok;
54 }
55 
56 void Verify(CompletionQueue* cq, int i, bool expect_ok) {
57  EXPECT_EQ(expect_ok, VerifyReturnSuccess(cq, i));
58 }
59 
60 // Handlers to handle async request at a server. To be run in a separate thread.
61 template <class Service>
62 void HandleEcho(Service* service, ServerCompletionQueue* cq, bool dup_service) {
63  ServerContext srv_ctx;
64  grpc::ServerAsyncResponseWriter<EchoResponse> response_writer(&srv_ctx);
65  EchoRequest recv_request;
66  EchoResponse send_response;
67  service->RequestEcho(&srv_ctx, &recv_request, &response_writer, cq, cq,
68  tag(1));
69  Verify(cq, 1, true);
70  send_response.set_message(recv_request.message());
71  if (dup_service) {
72  send_response.mutable_message()->append("_dup");
73  }
74  response_writer.Finish(send_response, Status::OK, tag(2));
75  Verify(cq, 2, true);
76 }
77 
78 // Handlers to handle raw request at a server. To be run in a
79 // separate thread. Note that this is the same as the async version, except
80 // that the req/resp are ByteBuffers
81 template <class Service>
82 void HandleRawEcho(Service* service, ServerCompletionQueue* cq,
83  bool /*dup_service*/) {
84  ServerContext srv_ctx;
85  GenericServerAsyncResponseWriter response_writer(&srv_ctx);
86  ByteBuffer recv_buffer;
87  service->RequestEcho(&srv_ctx, &recv_buffer, &response_writer, cq, cq,
88  tag(1));
89  Verify(cq, 1, true);
90  EchoRequest recv_request;
91  EXPECT_TRUE(ParseFromByteBuffer(&recv_buffer, &recv_request));
92  EchoResponse send_response;
93  send_response.set_message(recv_request.message());
94  auto send_buffer = SerializeToByteBuffer(&send_response);
95  response_writer.Finish(*send_buffer, Status::OK, tag(2));
96  Verify(cq, 2, true);
97 }
98 
99 template <class Service>
100 void HandleClientStreaming(Service* service, ServerCompletionQueue* cq) {
101  ServerContext srv_ctx;
102  EchoRequest recv_request;
103  EchoResponse send_response;
104  ServerAsyncReader<EchoResponse, EchoRequest> srv_stream(&srv_ctx);
105  service->RequestRequestStream(&srv_ctx, &srv_stream, cq, cq, tag(1));
106  Verify(cq, 1, true);
107  int i = 1;
108  do {
109  i++;
110  send_response.mutable_message()->append(recv_request.message());
111  srv_stream.Read(&recv_request, tag(i));
112  } while (VerifyReturnSuccess(cq, i));
113  srv_stream.Finish(send_response, Status::OK, tag(100));
114  Verify(cq, 100, true);
115 }
116 
117 template <class Service>
118 void HandleRawClientStreaming(Service* service, ServerCompletionQueue* cq) {
119  ServerContext srv_ctx;
120  ByteBuffer recv_buffer;
121  EchoRequest recv_request;
122  EchoResponse send_response;
123  GenericServerAsyncReader srv_stream(&srv_ctx);
124  service->RequestRequestStream(&srv_ctx, &srv_stream, cq, cq, tag(1));
125  Verify(cq, 1, true);
126  int i = 1;
127  while (true) {
128  i++;
129  srv_stream.Read(&recv_buffer, tag(i));
130  if (!VerifyReturnSuccess(cq, i)) {
131  break;
132  }
133  EXPECT_TRUE(ParseFromByteBuffer(&recv_buffer, &recv_request));
134  send_response.mutable_message()->append(recv_request.message());
135  }
136  auto send_buffer = SerializeToByteBuffer(&send_response);
137  srv_stream.Finish(*send_buffer, Status::OK, tag(100));
138  Verify(cq, 100, true);
139 }
140 
141 template <class Service>
142 void HandleServerStreaming(Service* service, ServerCompletionQueue* cq) {
143  ServerContext srv_ctx;
144  EchoRequest recv_request;
145  EchoResponse send_response;
146  ServerAsyncWriter<EchoResponse> srv_stream(&srv_ctx);
147  service->RequestResponseStream(&srv_ctx, &recv_request, &srv_stream, cq, cq,
148  tag(1));
149  Verify(cq, 1, true);
150  send_response.set_message(recv_request.message() + "0");
151  srv_stream.Write(send_response, tag(2));
152  Verify(cq, 2, true);
153  send_response.set_message(recv_request.message() + "1");
154  srv_stream.Write(send_response, tag(3));
155  Verify(cq, 3, true);
156  send_response.set_message(recv_request.message() + "2");
157  srv_stream.Write(send_response, tag(4));
158  Verify(cq, 4, true);
159  srv_stream.Finish(Status::OK, tag(5));
160  Verify(cq, 5, true);
161 }
162 
163 void HandleGenericEcho(GenericServerAsyncReaderWriter* stream,
164  CompletionQueue* cq) {
165  ByteBuffer recv_buffer;
166  stream->Read(&recv_buffer, tag(2));
167  Verify(cq, 2, true);
168  EchoRequest recv_request;
169  EXPECT_TRUE(ParseFromByteBuffer(&recv_buffer, &recv_request));
170  EchoResponse send_response;
171  send_response.set_message(recv_request.message());
172  auto send_buffer = SerializeToByteBuffer(&send_response);
173  stream->Write(*send_buffer, tag(3));
174  Verify(cq, 3, true);
175  stream->Finish(Status::OK, tag(4));
176  Verify(cq, 4, true);
177 }
178 
179 void HandleGenericRequestStream(GenericServerAsyncReaderWriter* stream,
180  CompletionQueue* cq) {
181  ByteBuffer recv_buffer;
182  EchoRequest recv_request;
183  EchoResponse send_response;
184  int i = 1;
185  while (true) {
186  i++;
187  stream->Read(&recv_buffer, tag(i));
188  if (!VerifyReturnSuccess(cq, i)) {
189  break;
190  }
191  EXPECT_TRUE(ParseFromByteBuffer(&recv_buffer, &recv_request));
192  send_response.mutable_message()->append(recv_request.message());
193  }
194  auto send_buffer = SerializeToByteBuffer(&send_response);
195  stream->Write(*send_buffer, tag(99));
196  Verify(cq, 99, true);
197  stream->Finish(Status::OK, tag(100));
198  Verify(cq, 100, true);
199 }
200 
201 // Request and handle one generic call.
202 void HandleGenericCall(AsyncGenericService* service,
203  ServerCompletionQueue* cq) {
204  GenericServerContext srv_ctx;
206  service->RequestCall(&srv_ctx, &stream, cq, cq, tag(1));
207  Verify(cq, 1, true);
208  if (srv_ctx.method() == "/grpc.testing.EchoTestService/Echo") {
209  HandleGenericEcho(&stream, cq);
210  } else if (srv_ctx.method() ==
211  "/grpc.testing.EchoTestService/RequestStream") {
212  HandleGenericRequestStream(&stream, cq);
213  } else { // other methods not handled yet.
214  gpr_log(GPR_ERROR, "method: %s", srv_ctx.method().c_str());
215  GPR_ASSERT(0);
216  }
217 }
218 
219 class TestServiceImplDupPkg
220  : public grpc::testing::duplicate::EchoTestService::Service {
221  public:
222  Status Echo(ServerContext* /*context*/, const EchoRequest* request,
223  EchoResponse* response) override {
224  response->set_message(request->message() + "_dup");
225  return Status::OK;
226  }
227 };
228 
229 class HybridEnd2endTest : public ::testing::TestWithParam<bool> {
230  protected:
231  HybridEnd2endTest() {}
232 
233  static void SetUpTestCase() {
234 #if TARGET_OS_IPHONE
235  // Workaround Apple CFStream bug
236  gpr_setenv("grpc_cfstream", "0");
237 #endif
238  }
239 
240  void SetUp() override {
243  ->value_param() != nullptr)
244  ? GetParam()
245  : false;
246  }
247 
248  bool SetUpServer(grpc::Service* service1, grpc::Service* service2,
249  AsyncGenericService* generic_service,
250  CallbackGenericService* callback_generic_service,
251  int max_message_size = 0) {
253  server_address_ << "localhost:" << port;
254 
255  // Setup server
256  ServerBuilder builder;
257  builder.AddListeningPort(server_address_.str(),
259  // Always add a sync unimplemented service: we rely on having at least one
260  // synchronous method to get a listening cq
261  builder.RegisterService(&unimplemented_service_);
262  builder.RegisterService(service1);
263  if (service2) {
264  builder.RegisterService(service2);
265  }
266  if (generic_service) {
267  builder.RegisterAsyncGenericService(generic_service);
268  }
269  if (callback_generic_service) {
270  builder.RegisterCallbackGenericService(callback_generic_service);
271  }
272 
273  if (max_message_size != 0) {
274  builder.SetMaxMessageSize(max_message_size);
275  }
276 
277  // Create a separate cq for each potential handler.
278  for (int i = 0; i < 5; i++) {
279  cqs_.push_back(builder.AddCompletionQueue(false));
280  }
281  server_ = builder.BuildAndStart();
282 
283  // If there is a generic callback service, this setup is only successful if
284  // we have an iomgr that can run in the background or are inprocess
285  return !callback_generic_service || grpc_iomgr_run_in_background() ||
286  inproc_;
287  }
288 
289  void TearDown() override {
290  if (server_) {
291  server_->Shutdown();
292  }
293  void* ignored_tag;
294  bool ignored_ok;
295  for (auto it = cqs_.begin(); it != cqs_.end(); ++it) {
296  (*it)->Shutdown();
297  while ((*it)->Next(&ignored_tag, &ignored_ok)) {
298  }
299  }
300  }
301 
302  void ResetStub() {
303  std::shared_ptr<Channel> channel =
304  inproc_ ? server_->InProcessChannel(ChannelArguments())
307  stub_ = grpc::testing::EchoTestService::NewStub(channel);
308  }
309 
310  // Test all rpc methods.
311  void TestAllMethods() {
312  SendEcho();
313  SendSimpleClientStreaming();
314  SendSimpleServerStreaming();
315  SendBidiStreaming();
316  }
317 
318  void SendEcho() {
319  EchoRequest send_request;
320  EchoResponse recv_response;
321  ClientContext cli_ctx;
322  cli_ctx.set_wait_for_ready(true);
323  send_request.set_message("Hello");
324  Status recv_status = stub_->Echo(&cli_ctx, send_request, &recv_response);
325  EXPECT_EQ(send_request.message(), recv_response.message());
326  EXPECT_TRUE(recv_status.ok());
327  }
328 
329  void SendEchoToDupService() {
330  std::shared_ptr<Channel> channel = grpc::CreateChannel(
332  auto stub = grpc::testing::duplicate::EchoTestService::NewStub(channel);
333  EchoRequest send_request;
334  EchoResponse recv_response;
335  ClientContext cli_ctx;
336  cli_ctx.set_wait_for_ready(true);
337  send_request.set_message("Hello");
338  Status recv_status = stub->Echo(&cli_ctx, send_request, &recv_response);
339  EXPECT_EQ(send_request.message() + "_dup", recv_response.message());
340  EXPECT_TRUE(recv_status.ok());
341  }
342 
343  void SendSimpleClientStreaming() {
344  EchoRequest send_request;
345  EchoResponse recv_response;
346  std::string expected_message;
347  ClientContext cli_ctx;
348  cli_ctx.set_wait_for_ready(true);
349  send_request.set_message("Hello");
350  auto stream = stub_->RequestStream(&cli_ctx, &recv_response);
351  for (int i = 0; i < 5; i++) {
353  expected_message.append(send_request.message());
354  }
355  stream->WritesDone();
356  Status recv_status = stream->Finish();
357  EXPECT_EQ(expected_message, recv_response.message());
358  EXPECT_TRUE(recv_status.ok());
359  }
360 
361  void SendSimpleServerStreaming() {
362  EchoRequest request;
363  EchoResponse response;
364  ClientContext context;
366  request.set_message("hello");
367 
368  auto stream = stub_->ResponseStream(&context, request);
369  EXPECT_TRUE(stream->Read(&response));
370  EXPECT_EQ(response.message(), request.message() + "0");
371  EXPECT_TRUE(stream->Read(&response));
372  EXPECT_EQ(response.message(), request.message() + "1");
373  EXPECT_TRUE(stream->Read(&response));
374  EXPECT_EQ(response.message(), request.message() + "2");
375  EXPECT_FALSE(stream->Read(&response));
376 
377  Status s = stream->Finish();
378  EXPECT_TRUE(s.ok());
379  }
380 
381  void SendSimpleServerStreamingToDupService() {
382  std::shared_ptr<Channel> channel = grpc::CreateChannel(
384  auto stub = grpc::testing::duplicate::EchoTestService::NewStub(channel);
385  EchoRequest request;
386  EchoResponse response;
387  ClientContext context;
389  request.set_message("hello");
390 
391  auto stream = stub->ResponseStream(&context, request);
392  EXPECT_TRUE(stream->Read(&response));
393  EXPECT_EQ(response.message(), request.message() + "0_dup");
394  EXPECT_TRUE(stream->Read(&response));
395  EXPECT_EQ(response.message(), request.message() + "1_dup");
396  EXPECT_TRUE(stream->Read(&response));
397  EXPECT_EQ(response.message(), request.message() + "2_dup");
398  EXPECT_FALSE(stream->Read(&response));
399 
400  Status s = stream->Finish();
401  EXPECT_TRUE(s.ok());
402  }
403 
404  void SendBidiStreaming() {
405  EchoRequest request;
406  EchoResponse response;
407  ClientContext context;
409  std::string msg("hello");
410 
411  auto stream = stub_->BidiStream(&context);
412 
413  request.set_message(msg + "0");
414  EXPECT_TRUE(stream->Write(request));
415  EXPECT_TRUE(stream->Read(&response));
416  EXPECT_EQ(response.message(), request.message());
417 
418  request.set_message(msg + "1");
419  EXPECT_TRUE(stream->Write(request));
420  EXPECT_TRUE(stream->Read(&response));
421  EXPECT_EQ(response.message(), request.message());
422 
423  request.set_message(msg + "2");
424  EXPECT_TRUE(stream->Write(request));
425  EXPECT_TRUE(stream->Read(&response));
426  EXPECT_EQ(response.message(), request.message());
427 
428  stream->WritesDone();
429  EXPECT_FALSE(stream->Read(&response));
430  EXPECT_FALSE(stream->Read(&response));
431 
432  Status s = stream->Finish();
433  EXPECT_TRUE(s.ok());
434  }
435 
436  grpc::testing::UnimplementedEchoService::Service unimplemented_service_;
437  std::vector<std::unique_ptr<ServerCompletionQueue>> cqs_;
438  std::unique_ptr<grpc::testing::EchoTestService::Stub> stub_;
439  std::unique_ptr<Server> server_;
440  std::ostringstream server_address_;
441  bool inproc_;
442 };
443 
444 TEST_F(HybridEnd2endTest, AsyncEcho) {
445  typedef EchoTestService::WithAsyncMethod_Echo<TestServiceImpl> SType;
446  SType service;
447  SetUpServer(&service, nullptr, nullptr, nullptr);
448  ResetStub();
449  std::thread echo_handler_thread(HandleEcho<SType>, &service, cqs_[0].get(),
450  false);
451  TestAllMethods();
452  echo_handler_thread.join();
453 }
454 
455 TEST_F(HybridEnd2endTest, RawEcho) {
456  typedef EchoTestService::WithRawMethod_Echo<TestServiceImpl> SType;
457  SType service;
458  SetUpServer(&service, nullptr, nullptr, nullptr);
459  ResetStub();
460  std::thread echo_handler_thread(HandleRawEcho<SType>, &service, cqs_[0].get(),
461  false);
462  TestAllMethods();
463  echo_handler_thread.join();
464 }
465 
466 TEST_F(HybridEnd2endTest, RawRequestStream) {
467  typedef EchoTestService::WithRawMethod_RequestStream<TestServiceImpl> SType;
468  SType service;
469  SetUpServer(&service, nullptr, nullptr, nullptr);
470  ResetStub();
471  std::thread request_stream_handler_thread(HandleRawClientStreaming<SType>,
472  &service, cqs_[0].get());
473  TestAllMethods();
474  request_stream_handler_thread.join();
475 }
476 
477 TEST_F(HybridEnd2endTest, AsyncEchoRawRequestStream) {
478  typedef EchoTestService::WithRawMethod_RequestStream<
479  EchoTestService::WithAsyncMethod_Echo<TestServiceImpl>>
480  SType;
481  SType service;
482  SetUpServer(&service, nullptr, nullptr, nullptr);
483  ResetStub();
484  std::thread echo_handler_thread(HandleEcho<SType>, &service, cqs_[0].get(),
485  false);
486  std::thread request_stream_handler_thread(HandleRawClientStreaming<SType>,
487  &service, cqs_[1].get());
488  TestAllMethods();
489  request_stream_handler_thread.join();
490  echo_handler_thread.join();
491 }
492 
493 TEST_F(HybridEnd2endTest, GenericEchoRawRequestStream) {
494  typedef EchoTestService::WithRawMethod_RequestStream<
495  EchoTestService::WithGenericMethod_Echo<TestServiceImpl>>
496  SType;
497  SType service;
498  AsyncGenericService generic_service;
499  SetUpServer(&service, nullptr, &generic_service, nullptr);
500  ResetStub();
501  std::thread generic_handler_thread(HandleGenericCall, &generic_service,
502  cqs_[0].get());
503  std::thread request_stream_handler_thread(HandleRawClientStreaming<SType>,
504  &service, cqs_[1].get());
505  TestAllMethods();
506  generic_handler_thread.join();
507  request_stream_handler_thread.join();
508 }
509 
510 TEST_F(HybridEnd2endTest, AsyncEchoRequestStream) {
511  typedef EchoTestService::WithAsyncMethod_RequestStream<
512  EchoTestService::WithAsyncMethod_Echo<TestServiceImpl>>
513  SType;
514  SType service;
515  SetUpServer(&service, nullptr, nullptr, nullptr);
516  ResetStub();
517  std::thread echo_handler_thread(HandleEcho<SType>, &service, cqs_[0].get(),
518  false);
519  std::thread request_stream_handler_thread(HandleClientStreaming<SType>,
520  &service, cqs_[1].get());
521  TestAllMethods();
522  echo_handler_thread.join();
523  request_stream_handler_thread.join();
524 }
525 
526 TEST_F(HybridEnd2endTest, AsyncRequestStreamResponseStream) {
527  typedef EchoTestService::WithAsyncMethod_RequestStream<
528  EchoTestService::WithAsyncMethod_ResponseStream<TestServiceImpl>>
529  SType;
530  SType service;
531  SetUpServer(&service, nullptr, nullptr, nullptr);
532  ResetStub();
533  std::thread response_stream_handler_thread(HandleServerStreaming<SType>,
534  &service, cqs_[0].get());
535  std::thread request_stream_handler_thread(HandleClientStreaming<SType>,
536  &service, cqs_[1].get());
537  TestAllMethods();
538  response_stream_handler_thread.join();
539  request_stream_handler_thread.join();
540 }
541 
542 // Add a second service with one sync method.
543 TEST_F(HybridEnd2endTest, AsyncRequestStreamResponseStream_SyncDupService) {
544  typedef EchoTestService::WithAsyncMethod_RequestStream<
545  EchoTestService::WithAsyncMethod_ResponseStream<TestServiceImpl>>
546  SType;
547  SType service;
548  TestServiceImplDupPkg dup_service;
549  SetUpServer(&service, &dup_service, nullptr, nullptr);
550  ResetStub();
551  std::thread response_stream_handler_thread(HandleServerStreaming<SType>,
552  &service, cqs_[0].get());
553  std::thread request_stream_handler_thread(HandleClientStreaming<SType>,
554  &service, cqs_[1].get());
555  TestAllMethods();
556  SendEchoToDupService();
557  response_stream_handler_thread.join();
558  request_stream_handler_thread.join();
559 }
560 
561 // Add a second service with one sync streamed unary method.
562 class StreamedUnaryDupPkg
563  : public duplicate::EchoTestService::WithStreamedUnaryMethod_Echo<
564  TestServiceImplDupPkg> {
565  public:
566  Status StreamedEcho(
567  ServerContext* /*context*/,
568  ServerUnaryStreamer<EchoRequest, EchoResponse>* stream) override {
569  EchoRequest req;
570  EchoResponse resp;
571  uint32_t next_msg_sz;
572  stream->NextMessageSize(&next_msg_sz);
573  gpr_log(GPR_INFO, "Streamed Unary Next Message Size is %u", next_msg_sz);
574  GPR_ASSERT(stream->Read(&req));
575  resp.set_message(req.message() + "_dup");
576  GPR_ASSERT(stream->Write(resp));
577  return Status::OK;
578  }
579 };
580 
581 TEST_F(HybridEnd2endTest,
582  AsyncRequestStreamResponseStream_SyncStreamedUnaryDupService) {
583  typedef EchoTestService::WithAsyncMethod_RequestStream<
584  EchoTestService::WithAsyncMethod_ResponseStream<TestServiceImpl>>
585  SType;
586  SType service;
587  StreamedUnaryDupPkg dup_service;
588  SetUpServer(&service, &dup_service, nullptr, nullptr, 8192);
589  ResetStub();
590  std::thread response_stream_handler_thread(HandleServerStreaming<SType>,
591  &service, cqs_[0].get());
592  std::thread request_stream_handler_thread(HandleClientStreaming<SType>,
593  &service, cqs_[1].get());
594  TestAllMethods();
595  SendEchoToDupService();
596  response_stream_handler_thread.join();
597  request_stream_handler_thread.join();
598 }
599 
600 // Add a second service that is fully Streamed Unary
601 class FullyStreamedUnaryDupPkg
602  : public duplicate::EchoTestService::StreamedUnaryService {
603  public:
604  Status StreamedEcho(
605  ServerContext* /*context*/,
606  ServerUnaryStreamer<EchoRequest, EchoResponse>* stream) override {
607  EchoRequest req;
608  EchoResponse resp;
609  uint32_t next_msg_sz;
610  stream->NextMessageSize(&next_msg_sz);
611  gpr_log(GPR_INFO, "Streamed Unary Next Message Size is %u", next_msg_sz);
612  GPR_ASSERT(stream->Read(&req));
613  resp.set_message(req.message() + "_dup");
614  GPR_ASSERT(stream->Write(resp));
615  return Status::OK;
616  }
617 };
618 
619 TEST_F(HybridEnd2endTest,
620  AsyncRequestStreamResponseStream_SyncFullyStreamedUnaryDupService) {
621  typedef EchoTestService::WithAsyncMethod_RequestStream<
622  EchoTestService::WithAsyncMethod_ResponseStream<TestServiceImpl>>
623  SType;
624  SType service;
625  FullyStreamedUnaryDupPkg dup_service;
626  SetUpServer(&service, &dup_service, nullptr, nullptr, 8192);
627  ResetStub();
628  std::thread response_stream_handler_thread(HandleServerStreaming<SType>,
629  &service, cqs_[0].get());
630  std::thread request_stream_handler_thread(HandleClientStreaming<SType>,
631  &service, cqs_[1].get());
632  TestAllMethods();
633  SendEchoToDupService();
634  response_stream_handler_thread.join();
635  request_stream_handler_thread.join();
636 }
637 
638 // Add a second service with one sync split server streaming method.
639 class SplitResponseStreamDupPkg
640  : public duplicate::EchoTestService::
641  WithSplitStreamingMethod_ResponseStream<TestServiceImplDupPkg> {
642  public:
643  Status StreamedResponseStream(
644  ServerContext* /*context*/,
645  ServerSplitStreamer<EchoRequest, EchoResponse>* stream) override {
646  EchoRequest req;
647  EchoResponse resp;
648  uint32_t next_msg_sz;
649  stream->NextMessageSize(&next_msg_sz);
650  gpr_log(GPR_INFO, "Split Streamed Next Message Size is %u", next_msg_sz);
651  GPR_ASSERT(stream->Read(&req));
652  for (int i = 0; i < kServerDefaultResponseStreamsToSend; i++) {
653  resp.set_message(req.message() + std::to_string(i) + "_dup");
654  GPR_ASSERT(stream->Write(resp));
655  }
656  return Status::OK;
657  }
658 };
659 
660 TEST_F(HybridEnd2endTest,
661  AsyncRequestStreamResponseStream_SyncSplitStreamedDupService) {
662  typedef EchoTestService::WithAsyncMethod_RequestStream<
663  EchoTestService::WithAsyncMethod_ResponseStream<TestServiceImpl>>
664  SType;
665  SType service;
666  SplitResponseStreamDupPkg dup_service;
667  SetUpServer(&service, &dup_service, nullptr, nullptr, 8192);
668  ResetStub();
669  std::thread response_stream_handler_thread(HandleServerStreaming<SType>,
670  &service, cqs_[0].get());
671  std::thread request_stream_handler_thread(HandleClientStreaming<SType>,
672  &service, cqs_[1].get());
673  TestAllMethods();
674  SendSimpleServerStreamingToDupService();
675  response_stream_handler_thread.join();
676  request_stream_handler_thread.join();
677 }
678 
679 // Add a second service that is fully split server streamed
680 class FullySplitStreamedDupPkg
681  : public duplicate::EchoTestService::SplitStreamedService {
682  public:
683  Status StreamedResponseStream(
684  ServerContext* /*context*/,
685  ServerSplitStreamer<EchoRequest, EchoResponse>* stream) override {
686  EchoRequest req;
687  EchoResponse resp;
688  uint32_t next_msg_sz;
689  stream->NextMessageSize(&next_msg_sz);
690  gpr_log(GPR_INFO, "Split Streamed Next Message Size is %u", next_msg_sz);
691  GPR_ASSERT(stream->Read(&req));
692  for (int i = 0; i < kServerDefaultResponseStreamsToSend; i++) {
693  resp.set_message(req.message() + std::to_string(i) + "_dup");
694  GPR_ASSERT(stream->Write(resp));
695  }
696  return Status::OK;
697  }
698 };
699 
700 TEST_F(HybridEnd2endTest,
701  AsyncRequestStreamResponseStream_FullySplitStreamedDupService) {
702  typedef EchoTestService::WithAsyncMethod_RequestStream<
703  EchoTestService::WithAsyncMethod_ResponseStream<TestServiceImpl>>
704  SType;
705  SType service;
706  FullySplitStreamedDupPkg dup_service;
707  SetUpServer(&service, &dup_service, nullptr, nullptr, 8192);
708  ResetStub();
709  std::thread response_stream_handler_thread(HandleServerStreaming<SType>,
710  &service, cqs_[0].get());
711  std::thread request_stream_handler_thread(HandleClientStreaming<SType>,
712  &service, cqs_[1].get());
713  TestAllMethods();
714  SendSimpleServerStreamingToDupService();
715  response_stream_handler_thread.join();
716  request_stream_handler_thread.join();
717 }
718 
719 // Add a second service that is fully server streamed
720 class FullyStreamedDupPkg : public duplicate::EchoTestService::StreamedService {
721  public:
722  Status StreamedEcho(
723  ServerContext* /*context*/,
724  ServerUnaryStreamer<EchoRequest, EchoResponse>* stream) override {
725  EchoRequest req;
726  EchoResponse resp;
727  uint32_t next_msg_sz;
728  stream->NextMessageSize(&next_msg_sz);
729  gpr_log(GPR_INFO, "Streamed Unary Next Message Size is %u", next_msg_sz);
730  GPR_ASSERT(stream->Read(&req));
731  resp.set_message(req.message() + "_dup");
732  GPR_ASSERT(stream->Write(resp));
733  return Status::OK;
734  }
735  Status StreamedResponseStream(
736  ServerContext* /*context*/,
737  ServerSplitStreamer<EchoRequest, EchoResponse>* stream) override {
738  EchoRequest req;
739  EchoResponse resp;
740  uint32_t next_msg_sz;
741  stream->NextMessageSize(&next_msg_sz);
742  gpr_log(GPR_INFO, "Split Streamed Next Message Size is %u", next_msg_sz);
743  GPR_ASSERT(stream->Read(&req));
744  for (int i = 0; i < kServerDefaultResponseStreamsToSend; i++) {
745  resp.set_message(req.message() + std::to_string(i) + "_dup");
746  GPR_ASSERT(stream->Write(resp));
747  }
748  return Status::OK;
749  }
750 };
751 
752 TEST_F(HybridEnd2endTest,
753  AsyncRequestStreamResponseStream_FullyStreamedDupService) {
754  typedef EchoTestService::WithAsyncMethod_RequestStream<
755  EchoTestService::WithAsyncMethod_ResponseStream<TestServiceImpl>>
756  SType;
757  SType service;
758  FullyStreamedDupPkg dup_service;
759  SetUpServer(&service, &dup_service, nullptr, nullptr, 8192);
760  ResetStub();
761  std::thread response_stream_handler_thread(HandleServerStreaming<SType>,
762  &service, cqs_[0].get());
763  std::thread request_stream_handler_thread(HandleClientStreaming<SType>,
764  &service, cqs_[1].get());
765  TestAllMethods();
766  SendEchoToDupService();
767  SendSimpleServerStreamingToDupService();
768  response_stream_handler_thread.join();
769  request_stream_handler_thread.join();
770 }
771 
772 // Add a second service with one async method.
773 TEST_F(HybridEnd2endTest, AsyncRequestStreamResponseStream_AsyncDupService) {
774  typedef EchoTestService::WithAsyncMethod_RequestStream<
775  EchoTestService::WithAsyncMethod_ResponseStream<TestServiceImpl>>
776  SType;
777  SType service;
778  duplicate::EchoTestService::AsyncService dup_service;
779  SetUpServer(&service, &dup_service, nullptr, nullptr);
780  ResetStub();
781  std::thread response_stream_handler_thread(HandleServerStreaming<SType>,
782  &service, cqs_[0].get());
783  std::thread request_stream_handler_thread(HandleClientStreaming<SType>,
784  &service, cqs_[1].get());
785  std::thread echo_handler_thread(
786  HandleEcho<duplicate::EchoTestService::AsyncService>, &dup_service,
787  cqs_[2].get(), true);
788  TestAllMethods();
789  SendEchoToDupService();
790  response_stream_handler_thread.join();
791  request_stream_handler_thread.join();
792  echo_handler_thread.join();
793 }
794 
795 TEST_F(HybridEnd2endTest, GenericEcho) {
796  EchoTestService::WithGenericMethod_Echo<TestServiceImpl> service;
797  AsyncGenericService generic_service;
798  SetUpServer(&service, nullptr, &generic_service, nullptr);
799  ResetStub();
800  std::thread generic_handler_thread(HandleGenericCall, &generic_service,
801  cqs_[0].get());
802  TestAllMethods();
803  generic_handler_thread.join();
804 }
805 
806 TEST_P(HybridEnd2endTest, CallbackGenericEcho) {
807  EchoTestService::WithGenericMethod_Echo<TestServiceImpl> service;
808  class GenericEchoService : public CallbackGenericService {
809  private:
810  ServerGenericBidiReactor* CreateReactor(
811  GenericCallbackServerContext* context) override {
812  EXPECT_EQ(context->method(), "/grpc.testing.EchoTestService/Echo");
813  gpr_log(GPR_DEBUG, "Constructor of generic service %d",
814  static_cast<int>(context->deadline().time_since_epoch().count()));
815 
816  class Reactor : public ServerGenericBidiReactor {
817  public:
818  Reactor() { StartRead(&request_); }
819 
820  private:
821  void OnDone() override { delete this; }
822  void OnReadDone(bool ok) override {
823  if (!ok) {
825  } else {
828  StartWrite(&response_);
829  StartRead(&request_);
830  }
831  }
832  void OnWriteDone(bool ok) override {
834  : Status(StatusCode::UNKNOWN, "Unexpected failure"));
835  }
836  ByteBuffer request_;
837  ByteBuffer response_;
838  std::atomic_int reads_complete_{0};
839  };
840  return new Reactor;
841  }
842  } generic_service;
843 
844  if (!SetUpServer(&service, nullptr, nullptr, &generic_service)) {
845  return;
846  }
847  ResetStub();
848  TestAllMethods();
849 }
850 
851 TEST_F(HybridEnd2endTest, GenericEchoAsyncRequestStream) {
852  typedef EchoTestService::WithAsyncMethod_RequestStream<
853  EchoTestService::WithGenericMethod_Echo<TestServiceImpl>>
854  SType;
855  SType service;
856  AsyncGenericService generic_service;
857  SetUpServer(&service, nullptr, &generic_service, nullptr);
858  ResetStub();
859  std::thread generic_handler_thread(HandleGenericCall, &generic_service,
860  cqs_[0].get());
861  std::thread request_stream_handler_thread(HandleClientStreaming<SType>,
862  &service, cqs_[1].get());
863  TestAllMethods();
864  generic_handler_thread.join();
865  request_stream_handler_thread.join();
866 }
867 
868 // Add a second service with one sync method.
869 TEST_F(HybridEnd2endTest, GenericEchoAsyncRequestStream_SyncDupService) {
870  typedef EchoTestService::WithAsyncMethod_RequestStream<
871  EchoTestService::WithGenericMethod_Echo<TestServiceImpl>>
872  SType;
873  SType service;
874  AsyncGenericService generic_service;
875  TestServiceImplDupPkg dup_service;
876  SetUpServer(&service, &dup_service, &generic_service, nullptr);
877  ResetStub();
878  std::thread generic_handler_thread(HandleGenericCall, &generic_service,
879  cqs_[0].get());
880  std::thread request_stream_handler_thread(HandleClientStreaming<SType>,
881  &service, cqs_[1].get());
882  TestAllMethods();
883  SendEchoToDupService();
884  generic_handler_thread.join();
885  request_stream_handler_thread.join();
886 }
887 
888 // Add a second service with one async method.
889 TEST_F(HybridEnd2endTest, GenericEchoAsyncRequestStream_AsyncDupService) {
890  typedef EchoTestService::WithAsyncMethod_RequestStream<
891  EchoTestService::WithGenericMethod_Echo<TestServiceImpl>>
892  SType;
893  SType service;
894  AsyncGenericService generic_service;
895  duplicate::EchoTestService::AsyncService dup_service;
896  SetUpServer(&service, &dup_service, &generic_service, nullptr);
897  ResetStub();
898  std::thread generic_handler_thread(HandleGenericCall, &generic_service,
899  cqs_[0].get());
900  std::thread request_stream_handler_thread(HandleClientStreaming<SType>,
901  &service, cqs_[1].get());
902  std::thread echo_handler_thread(
903  HandleEcho<duplicate::EchoTestService::AsyncService>, &dup_service,
904  cqs_[2].get(), true);
905  TestAllMethods();
906  SendEchoToDupService();
907  generic_handler_thread.join();
908  request_stream_handler_thread.join();
909  echo_handler_thread.join();
910 }
911 
912 TEST_F(HybridEnd2endTest, GenericEchoAsyncRequestStreamResponseStream) {
913  typedef EchoTestService::WithAsyncMethod_RequestStream<
914  EchoTestService::WithGenericMethod_Echo<
915  EchoTestService::WithAsyncMethod_ResponseStream<TestServiceImpl>>>
916  SType;
917  SType service;
918  AsyncGenericService generic_service;
919  SetUpServer(&service, nullptr, &generic_service, nullptr);
920  ResetStub();
921  std::thread generic_handler_thread(HandleGenericCall, &generic_service,
922  cqs_[0].get());
923  std::thread request_stream_handler_thread(HandleClientStreaming<SType>,
924  &service, cqs_[1].get());
925  std::thread response_stream_handler_thread(HandleServerStreaming<SType>,
926  &service, cqs_[2].get());
927  TestAllMethods();
928  generic_handler_thread.join();
929  request_stream_handler_thread.join();
930  response_stream_handler_thread.join();
931 }
932 
933 TEST_F(HybridEnd2endTest, GenericEchoRequestStreamAsyncResponseStream) {
934  typedef EchoTestService::WithGenericMethod_RequestStream<
935  EchoTestService::WithGenericMethod_Echo<
936  EchoTestService::WithAsyncMethod_ResponseStream<TestServiceImpl>>>
937  SType;
938  SType service;
939  AsyncGenericService generic_service;
940  SetUpServer(&service, nullptr, &generic_service, nullptr);
941  ResetStub();
942  std::thread generic_handler_thread(HandleGenericCall, &generic_service,
943  cqs_[0].get());
944  std::thread generic_handler_thread2(HandleGenericCall, &generic_service,
945  cqs_[1].get());
946  std::thread response_stream_handler_thread(HandleServerStreaming<SType>,
947  &service, cqs_[2].get());
948  TestAllMethods();
949  generic_handler_thread.join();
950  generic_handler_thread2.join();
951  response_stream_handler_thread.join();
952 }
953 
954 // If WithGenericMethod is called and no generic service is registered, the
955 // server will fail to build.
956 TEST_F(HybridEnd2endTest, GenericMethodWithoutGenericService) {
957  EchoTestService::WithGenericMethod_RequestStream<
958  EchoTestService::WithGenericMethod_Echo<
959  EchoTestService::WithAsyncMethod_ResponseStream<TestServiceImpl>>>
960  service;
961  SetUpServer(&service, nullptr, nullptr, nullptr);
962  EXPECT_EQ(nullptr, server_.get());
963 }
964 
965 INSTANTIATE_TEST_SUITE_P(HybridEnd2endTest, HybridEnd2endTest,
966  ::testing::Bool());
967 
968 } // namespace
969 } // namespace testing
970 } // namespace grpc
971 
972 int main(int argc, char** argv) {
973  grpc::testing::TestEnvironment env(&argc, argv);
974  ::testing::InitGoogleTest(&argc, argv);
975  return RUN_ALL_TESTS();
976 }
EXPECT_FALSE
#define EXPECT_FALSE(condition)
Definition: bloaty/third_party/googletest/googletest/include/gtest/gtest.h:1970
grpc::gpr_setenv
gpr_setenv("STS_CREDENTIALS", creds_file_name)
GPR_INFO
#define GPR_INFO
Definition: include/grpc/impl/codegen/log.h:56
testing
Definition: aws_request_signer_test.cc:25
iomgr.h
regen-readme.it
it
Definition: regen-readme.py:15
port.h
get
absl::string_view get(const Cont &c)
Definition: abseil-cpp/absl/strings/str_replace_test.cc:185
testing::UnitTest::current_test_info
const TestInfo * current_test_info() const GTEST_LOCK_EXCLUDED_(mutex_)
Definition: bloaty/third_party/googletest/googletest/src/gtest.cc:4961
generate.env
env
Definition: generate.py:37
response_
grpc_http_response response_
Definition: google_c2p_resolver.cc:101
send_buffer
static char * send_buffer
Definition: test-tcp-writealot.c:38
grpc
Definition: grpcpp/alarm.h:33
unimplemented_service_
grpc::testing::UnimplementedEchoService::Service unimplemented_service_
Definition: hybrid_end2end_test.cc:436
send_request
Definition: ares_private.h:147
server_
std::unique_ptr< Server > server_
Definition: hybrid_end2end_test.cc:439
grpc::ClientContext::set_wait_for_ready
void set_wait_for_ready(bool wait_for_ready)
Definition: grpcpp/impl/codegen/client_context.h:285
benchmark.request
request
Definition: benchmark.py:77
grpc::GenericServerAsyncReader
ServerAsyncReader< ByteBuffer, ByteBuffer > GenericServerAsyncReader
Definition: grpcpp/impl/codegen/async_generic_service.h:38
testing::internal::string
::std::string string
Definition: bloaty/third_party/protobuf/third_party/googletest/googletest/include/gtest/internal/gtest-port.h:881
grpc::GenericServerAsyncResponseWriter
ServerAsyncResponseWriter< ByteBuffer > GenericServerAsyncResponseWriter
Definition: grpcpp/impl/codegen/async_generic_service.h:37
stub_
std::unique_ptr< grpc::testing::EchoTestService::Stub > stub_
Definition: hybrid_end2end_test.cc:438
async_generic_service.h
env.h
absl::FormatConversionChar::s
@ s
async_greeter_client.stub
stub
Definition: hellostreamingworld/async_greeter_client.py:26
reads_complete_
int reads_complete_
Definition: client_callback_end2end_test.cc:1011
test_service_impl.h
testing::TestWithParam
Definition: bloaty/third_party/googletest/googletest/include/gtest/gtest.h:1883
grpc::Service
Desriptor of an RPC service and its various RPC methods.
Definition: grpcpp/impl/codegen/service_type.h:58
grpc::testing::ParseFromByteBuffer
bool ParseFromByteBuffer(ByteBuffer *buffer, grpc::protobuf::Message *message)
Definition: byte_buffer_proto_helper.cc:26
uint32_t
unsigned int uint32_t
Definition: stdint-msvc2008.h:80
grpc_iomgr_run_in_background
bool grpc_iomgr_run_in_background()
inproc_
bool inproc_
Definition: hybrid_end2end_test.cc:441
main
int main(int argc, char **argv)
Definition: hybrid_end2end_test.cc:972
profile_analyzer.builder
builder
Definition: profile_analyzer.py:159
channel
wrapped_grpc_channel * channel
Definition: src/php/ext/grpc/call.h:33
grpc::testing::kServerDefaultResponseStreamsToSend
const int kServerDefaultResponseStreamsToSend
Definition: test_service_impl.h:42
GPR_ASSERT
#define GPR_ASSERT(x)
Definition: include/grpc/impl/codegen/log.h:94
grpc.StatusCode.UNKNOWN
tuple UNKNOWN
Definition: src/python/grpcio/grpc/__init__.py:262
req
static uv_connect_t req
Definition: test-connection-fail.c:30
gpr_log
GPRAPI void gpr_log(const char *file, int line, gpr_log_severity severity, const char *format,...) GPR_PRINT_FORMAT_CHECK(4
http2_server_health_check.resp
resp
Definition: http2_server_health_check.py:31
Verify
static int Verify(X509 *leaf, const std::vector< X509 * > &roots, const std::vector< X509 * > &intermediates, const std::vector< X509_CRL * > &crls, unsigned long flags=0, std::function< void(X509_VERIFY_PARAM *)> configure_callback=nullptr, int(*verify_callback)(int, X509_STORE_CTX *)=nullptr)
Definition: x509_test.cc:1111
grpc.h
grpc::testing::SerializeToByteBuffer
std::unique_ptr< ByteBuffer > SerializeToByteBuffer(grpc::protobuf::Message *message)
Definition: byte_buffer_proto_helper.cc:37
channel.h
grpc::testing::INSTANTIATE_TEST_SUITE_P
INSTANTIATE_TEST_SUITE_P(HistogramTestCases, HistogramTest, ::testing::Range< int >(0, GRPC_STATS_HISTOGRAM_COUNT))
grpc::Status::OK
static const Status & OK
An OK pre-defined instance.
Definition: include/grpcpp/impl/codegen/status.h:113
grpc::CreateChannel
std::shared_ptr< Channel > CreateChannel(const grpc::string &target, const std::shared_ptr< ChannelCredentials > &creds)
RUN_ALL_TESTS
int RUN_ALL_TESTS() GTEST_MUST_USE_RESULT_
Definition: bloaty/third_party/googletest/googletest/include/gtest/gtest.h:2471
GPR_ERROR
#define GPR_ERROR
Definition: include/grpc/impl/codegen/log.h:57
grpc_pick_unused_port_or_die
int grpc_pick_unused_port_or_die(void)
msg
std::string msg
Definition: client_interceptors_end2end_test.cc:372
grpc::testing::tag
static void * tag(intptr_t t)
Definition: h2_ssl_cert_test.cc:263
request_
EchoRequest request_
Definition: client_callback_end2end_test.cc:724
tests.unit._exit_scenarios.port
port
Definition: _exit_scenarios.py:179
test_config.h
testing::TestInfo::value_param
const char * value_param() const
Definition: bloaty/third_party/googletest/googletest/include/gtest/gtest.h:721
client_context.h
testing::InitGoogleTest
GTEST_API_ void InitGoogleTest(int *argc, char **argv)
Definition: bloaty/third_party/googletest/googletest/src/gtest.cc:6106
grpc::ServerGenericBidiReactor
ServerBidiReactor< ByteBuffer, ByteBuffer > ServerGenericBidiReactor
Definition: grpcpp/impl/codegen/async_generic_service.h:87
grpc::testing::TEST_F
TEST_F(ChannelArgumentsTest, SetInt)
Definition: channel_arguments_test.cc:134
grpc::ClientContext::deadline
std::chrono::system_clock::time_point deadline() const
Return the deadline for the client call.
Definition: grpcpp/impl/codegen/client_context.h:294
benchmark::internal::Finish
double Finish(Counter const &c, IterationCount iterations, double cpu_time, double num_threads)
Definition: benchmark/src/counter.cc:20
server_address_
std::ostringstream server_address_
Definition: hybrid_end2end_test.cc:440
server_context.h
asyncio_get_stats.response
response
Definition: asyncio_get_stats.py:28
grpc::testing::TestEnvironment
Definition: test/core/util/test_config.h:54
grpc::protobuf::util::Status
GRPC_CUSTOM_UTIL_STATUS Status
Definition: include/grpcpp/impl/codegen/config_protobuf.h:93
ok
bool ok
Definition: async_end2end_test.cc:197
testing::UnitTest::GetInstance
static UnitTest * GetInstance()
Definition: bloaty/third_party/googletest/googletest/src/gtest.cc:4616
grpc::testing::EXPECT_EQ
EXPECT_EQ(options.token_exchange_service_uri, "https://foo/exchange")
grpc::GenericServerAsyncReaderWriter
ServerAsyncReaderWriter< ByteBuffer, ByteBuffer > GenericServerAsyncReaderWriter
Definition: grpcpp/impl/codegen/async_generic_service.h:36
grpc::InsecureServerCredentials
std::shared_ptr< ServerCredentials > InsecureServerCredentials()
Definition: insecure_server_credentials.cc:52
GPR_DEBUG
#define GPR_DEBUG
Definition: include/grpc/impl/codegen/log.h:55
context
grpc::ClientContext context
Definition: istio_echo_server_lib.cc:61
grpc::testing::TEST_P
TEST_P(HistogramTest, IncHistogram)
Definition: stats_test.cc:87
grpc::testing::EXPECT_TRUE
EXPECT_TRUE(grpc::experimental::StsCredentialsOptionsFromJson(minimum_valid_json, &options) .ok())
cqs_
std::vector< std::unique_ptr< ServerCompletionQueue > > cqs_
Definition: hybrid_end2end_test.cc:437
server.h
grpc::ServerAsyncResponseWriter
Definition: grpcpp/impl/codegen/async_unary_call.h:295
grpc::InsecureChannelCredentials
std::shared_ptr< ChannelCredentials > InsecureChannelCredentials()
Credentials for an unencrypted, unauthenticated channel.
Definition: cpp/client/insecure_credentials.cc:69
service
__attribute__((deprecated("Please use GRPCProtoMethod."))) @interface ProtoMethod NSString * service
Definition: ProtoMethod.h:25
testing::Bool
internal::ParamGenerator< bool > Bool()
Definition: bloaty/third_party/googletest/googletest/include/gtest/gtest-param-test.h:359
to_string
static bool to_string(zval *from)
Definition: protobuf/php/ext/google/protobuf/convert.c:333
thread
static uv_thread_t thread
Definition: test-async-null-cb.c:29
server_builder.h
cq
static grpc_completion_queue * cq
Definition: test/core/fling/client.cc:37
byte_buffer_proto_helper.h
i
uint64_t i
Definition: abseil-cpp/absl/container/btree_benchmark.cc:230
create_channel.h
stream
voidpf stream
Definition: bloaty/third_party/zlib/contrib/minizip/ioapi.h:136


grpc
Author(s):
autogenerated on Fri May 16 2025 02:59:02