22 #include <gtest/gtest.h>
35 #include "src/proto/grpc/testing/duplicate/echo_duplicate.grpc.pb.h"
36 #include "src/proto/grpc/testing/echo.grpc.pb.h"
46 void*
tag(
int i) {
return reinterpret_cast<void*
>(
i); }
48 bool VerifyReturnSuccess(CompletionQueue*
cq,
int i) {
56 void Verify(CompletionQueue*
cq,
int i,
bool expect_ok) {
61 template <
class Service>
62 void HandleEcho(Service*
service, ServerCompletionQueue*
cq,
bool dup_service) {
63 ServerContext srv_ctx;
65 EchoRequest recv_request;
66 EchoResponse send_response;
67 service->RequestEcho(&srv_ctx, &recv_request, &response_writer,
cq,
cq,
70 send_response.set_message(recv_request.message());
72 send_response.mutable_message()->append(
"_dup");
81 template <
class Service>
82 void HandleRawEcho(Service*
service, ServerCompletionQueue*
cq,
84 ServerContext srv_ctx;
86 ByteBuffer recv_buffer;
87 service->RequestEcho(&srv_ctx, &recv_buffer, &response_writer,
cq,
cq,
90 EchoRequest recv_request;
92 EchoResponse send_response;
93 send_response.set_message(recv_request.message());
99 template <
class Service>
100 void HandleClientStreaming(Service*
service, ServerCompletionQueue*
cq) {
101 ServerContext srv_ctx;
102 EchoRequest recv_request;
103 EchoResponse send_response;
104 ServerAsyncReader<EchoResponse, EchoRequest> srv_stream(&srv_ctx);
105 service->RequestRequestStream(&srv_ctx, &srv_stream,
cq,
cq,
tag(1));
110 send_response.mutable_message()->append(recv_request.message());
111 srv_stream.Read(&recv_request,
tag(i));
112 }
while (VerifyReturnSuccess(
cq, i));
117 template <
class Service>
118 void HandleRawClientStreaming(Service*
service, ServerCompletionQueue*
cq) {
119 ServerContext srv_ctx;
120 ByteBuffer recv_buffer;
121 EchoRequest recv_request;
122 EchoResponse send_response;
124 service->RequestRequestStream(&srv_ctx, &srv_stream,
cq,
cq,
tag(1));
129 srv_stream.Read(&recv_buffer,
tag(i));
130 if (!VerifyReturnSuccess(
cq, i)) {
134 send_response.mutable_message()->append(recv_request.message());
141 template <
class Service>
142 void HandleServerStreaming(Service*
service, ServerCompletionQueue*
cq) {
143 ServerContext srv_ctx;
144 EchoRequest recv_request;
145 EchoResponse send_response;
146 ServerAsyncWriter<EchoResponse> srv_stream(&srv_ctx);
147 service->RequestResponseStream(&srv_ctx, &recv_request, &srv_stream,
cq,
cq,
150 send_response.set_message(recv_request.message() +
"0");
151 srv_stream.Write(send_response,
tag(2));
153 send_response.set_message(recv_request.message() +
"1");
154 srv_stream.Write(send_response,
tag(3));
156 send_response.set_message(recv_request.message() +
"2");
157 srv_stream.Write(send_response,
tag(4));
164 CompletionQueue*
cq) {
165 ByteBuffer recv_buffer;
168 EchoRequest recv_request;
170 EchoResponse send_response;
171 send_response.set_message(recv_request.message());
180 CompletionQueue*
cq) {
181 ByteBuffer recv_buffer;
182 EchoRequest recv_request;
183 EchoResponse send_response;
188 if (!VerifyReturnSuccess(
cq, i)) {
192 send_response.mutable_message()->append(recv_request.message());
202 void HandleGenericCall(AsyncGenericService*
service,
203 ServerCompletionQueue*
cq) {
204 GenericServerContext srv_ctx;
208 if (srv_ctx.method() ==
"/grpc.testing.EchoTestService/Echo") {
210 }
else if (srv_ctx.method() ==
211 "/grpc.testing.EchoTestService/RequestStream") {
212 HandleGenericRequestStream(&
stream,
cq);
219 class TestServiceImplDupPkg
220 :
public grpc::testing::duplicate::EchoTestService::Service {
231 HybridEnd2endTest() {}
233 static void SetUpTestCase() {
240 void SetUp()
override {
249 AsyncGenericService* generic_service,
250 CallbackGenericService* callback_generic_service,
251 int max_message_size = 0) {
262 builder.RegisterService(service1);
264 builder.RegisterService(service2);
266 if (generic_service) {
267 builder.RegisterAsyncGenericService(generic_service);
269 if (callback_generic_service) {
270 builder.RegisterCallbackGenericService(callback_generic_service);
273 if (max_message_size != 0) {
274 builder.SetMaxMessageSize(max_message_size);
278 for (
int i = 0;
i < 5;
i++) {
289 void TearDown()
override {
297 while ((*it)->Next(&ignored_tag, &ignored_ok)) {
303 std::shared_ptr<Channel>
channel =
307 stub_ = grpc::testing::EchoTestService::NewStub(
channel);
311 void TestAllMethods() {
313 SendSimpleClientStreaming();
314 SendSimpleServerStreaming();
320 EchoResponse recv_response;
321 ClientContext cli_ctx;
322 cli_ctx.set_wait_for_ready(
true);
329 void SendEchoToDupService() {
332 auto stub = grpc::testing::duplicate::EchoTestService::NewStub(
channel);
334 EchoResponse recv_response;
335 ClientContext cli_ctx;
336 cli_ctx.set_wait_for_ready(
true);
343 void SendSimpleClientStreaming() {
345 EchoResponse recv_response;
347 ClientContext cli_ctx;
348 cli_ctx.set_wait_for_ready(
true);
350 auto stream =
stub_->RequestStream(&cli_ctx, &recv_response);
351 for (
int i = 0;
i < 5;
i++) {
357 EXPECT_EQ(expected_message, recv_response.message());
361 void SendSimpleServerStreaming() {
381 void SendSimpleServerStreamingToDupService() {
384 auto stub = grpc::testing::duplicate::EchoTestService::NewStub(
channel);
404 void SendBidiStreaming() {
437 std::vector<std::unique_ptr<ServerCompletionQueue>>
cqs_;
438 std::unique_ptr<grpc::testing::EchoTestService::Stub>
stub_;
444 TEST_F(HybridEnd2endTest, AsyncEcho) {
445 typedef EchoTestService::WithAsyncMethod_Echo<TestServiceImpl> SType;
447 SetUpServer(&
service,
nullptr,
nullptr,
nullptr);
452 echo_handler_thread.join();
455 TEST_F(HybridEnd2endTest, RawEcho) {
456 typedef EchoTestService::WithRawMethod_Echo<TestServiceImpl> SType;
458 SetUpServer(&
service,
nullptr,
nullptr,
nullptr);
463 echo_handler_thread.join();
466 TEST_F(HybridEnd2endTest, RawRequestStream) {
467 typedef EchoTestService::WithRawMethod_RequestStream<TestServiceImpl> SType;
469 SetUpServer(&
service,
nullptr,
nullptr,
nullptr);
471 std::thread request_stream_handler_thread(HandleRawClientStreaming<SType>,
474 request_stream_handler_thread.join();
477 TEST_F(HybridEnd2endTest, AsyncEchoRawRequestStream) {
478 typedef EchoTestService::WithRawMethod_RequestStream<
479 EchoTestService::WithAsyncMethod_Echo<TestServiceImpl>>
482 SetUpServer(&
service,
nullptr,
nullptr,
nullptr);
486 std::thread request_stream_handler_thread(HandleRawClientStreaming<SType>,
489 request_stream_handler_thread.join();
490 echo_handler_thread.join();
493 TEST_F(HybridEnd2endTest, GenericEchoRawRequestStream) {
494 typedef EchoTestService::WithRawMethod_RequestStream<
495 EchoTestService::WithGenericMethod_Echo<TestServiceImpl>>
498 AsyncGenericService generic_service;
499 SetUpServer(&
service,
nullptr, &generic_service,
nullptr);
501 std::thread generic_handler_thread(HandleGenericCall, &generic_service,
503 std::thread request_stream_handler_thread(HandleRawClientStreaming<SType>,
506 generic_handler_thread.join();
507 request_stream_handler_thread.join();
510 TEST_F(HybridEnd2endTest, AsyncEchoRequestStream) {
511 typedef EchoTestService::WithAsyncMethod_RequestStream<
512 EchoTestService::WithAsyncMethod_Echo<TestServiceImpl>>
515 SetUpServer(&
service,
nullptr,
nullptr,
nullptr);
519 std::thread request_stream_handler_thread(HandleClientStreaming<SType>,
522 echo_handler_thread.join();
523 request_stream_handler_thread.join();
526 TEST_F(HybridEnd2endTest, AsyncRequestStreamResponseStream) {
527 typedef EchoTestService::WithAsyncMethod_RequestStream<
528 EchoTestService::WithAsyncMethod_ResponseStream<TestServiceImpl>>
531 SetUpServer(&
service,
nullptr,
nullptr,
nullptr);
533 std::thread response_stream_handler_thread(HandleServerStreaming<SType>,
535 std::thread request_stream_handler_thread(HandleClientStreaming<SType>,
538 response_stream_handler_thread.join();
539 request_stream_handler_thread.join();
543 TEST_F(HybridEnd2endTest, AsyncRequestStreamResponseStream_SyncDupService) {
544 typedef EchoTestService::WithAsyncMethod_RequestStream<
545 EchoTestService::WithAsyncMethod_ResponseStream<TestServiceImpl>>
548 TestServiceImplDupPkg dup_service;
549 SetUpServer(&
service, &dup_service,
nullptr,
nullptr);
551 std::thread response_stream_handler_thread(HandleServerStreaming<SType>,
553 std::thread request_stream_handler_thread(HandleClientStreaming<SType>,
556 SendEchoToDupService();
557 response_stream_handler_thread.join();
558 request_stream_handler_thread.join();
562 class StreamedUnaryDupPkg
563 :
public duplicate::EchoTestService::WithStreamedUnaryMethod_Echo<
564 TestServiceImplDupPkg> {
568 ServerUnaryStreamer<EchoRequest, EchoResponse>*
stream)
override {
572 stream->NextMessageSize(&next_msg_sz);
573 gpr_log(
GPR_INFO,
"Streamed Unary Next Message Size is %u", next_msg_sz);
575 resp.set_message(
req.message() +
"_dup");
582 AsyncRequestStreamResponseStream_SyncStreamedUnaryDupService) {
583 typedef EchoTestService::WithAsyncMethod_RequestStream<
584 EchoTestService::WithAsyncMethod_ResponseStream<TestServiceImpl>>
587 StreamedUnaryDupPkg dup_service;
588 SetUpServer(&
service, &dup_service,
nullptr,
nullptr, 8192);
590 std::thread response_stream_handler_thread(HandleServerStreaming<SType>,
592 std::thread request_stream_handler_thread(HandleClientStreaming<SType>,
595 SendEchoToDupService();
596 response_stream_handler_thread.join();
597 request_stream_handler_thread.join();
601 class FullyStreamedUnaryDupPkg
602 :
public duplicate::EchoTestService::StreamedUnaryService {
606 ServerUnaryStreamer<EchoRequest, EchoResponse>*
stream)
override {
610 stream->NextMessageSize(&next_msg_sz);
611 gpr_log(
GPR_INFO,
"Streamed Unary Next Message Size is %u", next_msg_sz);
613 resp.set_message(
req.message() +
"_dup");
620 AsyncRequestStreamResponseStream_SyncFullyStreamedUnaryDupService) {
621 typedef EchoTestService::WithAsyncMethod_RequestStream<
622 EchoTestService::WithAsyncMethod_ResponseStream<TestServiceImpl>>
625 FullyStreamedUnaryDupPkg dup_service;
626 SetUpServer(&
service, &dup_service,
nullptr,
nullptr, 8192);
628 std::thread response_stream_handler_thread(HandleServerStreaming<SType>,
630 std::thread request_stream_handler_thread(HandleClientStreaming<SType>,
633 SendEchoToDupService();
634 response_stream_handler_thread.join();
635 request_stream_handler_thread.join();
639 class SplitResponseStreamDupPkg
640 :
public duplicate::EchoTestService::
641 WithSplitStreamingMethod_ResponseStream<TestServiceImplDupPkg> {
643 Status StreamedResponseStream(
645 ServerSplitStreamer<EchoRequest, EchoResponse>*
stream)
override {
649 stream->NextMessageSize(&next_msg_sz);
650 gpr_log(
GPR_INFO,
"Split Streamed Next Message Size is %u", next_msg_sz);
661 AsyncRequestStreamResponseStream_SyncSplitStreamedDupService) {
662 typedef EchoTestService::WithAsyncMethod_RequestStream<
663 EchoTestService::WithAsyncMethod_ResponseStream<TestServiceImpl>>
666 SplitResponseStreamDupPkg dup_service;
667 SetUpServer(&
service, &dup_service,
nullptr,
nullptr, 8192);
669 std::thread response_stream_handler_thread(HandleServerStreaming<SType>,
671 std::thread request_stream_handler_thread(HandleClientStreaming<SType>,
674 SendSimpleServerStreamingToDupService();
675 response_stream_handler_thread.join();
676 request_stream_handler_thread.join();
680 class FullySplitStreamedDupPkg
681 :
public duplicate::EchoTestService::SplitStreamedService {
683 Status StreamedResponseStream(
685 ServerSplitStreamer<EchoRequest, EchoResponse>*
stream)
override {
689 stream->NextMessageSize(&next_msg_sz);
690 gpr_log(
GPR_INFO,
"Split Streamed Next Message Size is %u", next_msg_sz);
701 AsyncRequestStreamResponseStream_FullySplitStreamedDupService) {
702 typedef EchoTestService::WithAsyncMethod_RequestStream<
703 EchoTestService::WithAsyncMethod_ResponseStream<TestServiceImpl>>
706 FullySplitStreamedDupPkg dup_service;
707 SetUpServer(&
service, &dup_service,
nullptr,
nullptr, 8192);
709 std::thread response_stream_handler_thread(HandleServerStreaming<SType>,
711 std::thread request_stream_handler_thread(HandleClientStreaming<SType>,
714 SendSimpleServerStreamingToDupService();
715 response_stream_handler_thread.join();
716 request_stream_handler_thread.join();
720 class FullyStreamedDupPkg :
public duplicate::EchoTestService::StreamedService {
724 ServerUnaryStreamer<EchoRequest, EchoResponse>*
stream)
override {
728 stream->NextMessageSize(&next_msg_sz);
729 gpr_log(
GPR_INFO,
"Streamed Unary Next Message Size is %u", next_msg_sz);
731 resp.set_message(
req.message() +
"_dup");
735 Status StreamedResponseStream(
737 ServerSplitStreamer<EchoRequest, EchoResponse>*
stream)
override {
741 stream->NextMessageSize(&next_msg_sz);
742 gpr_log(
GPR_INFO,
"Split Streamed Next Message Size is %u", next_msg_sz);
753 AsyncRequestStreamResponseStream_FullyStreamedDupService) {
754 typedef EchoTestService::WithAsyncMethod_RequestStream<
755 EchoTestService::WithAsyncMethod_ResponseStream<TestServiceImpl>>
758 FullyStreamedDupPkg dup_service;
759 SetUpServer(&
service, &dup_service,
nullptr,
nullptr, 8192);
761 std::thread response_stream_handler_thread(HandleServerStreaming<SType>,
763 std::thread request_stream_handler_thread(HandleClientStreaming<SType>,
766 SendEchoToDupService();
767 SendSimpleServerStreamingToDupService();
768 response_stream_handler_thread.join();
769 request_stream_handler_thread.join();
773 TEST_F(HybridEnd2endTest, AsyncRequestStreamResponseStream_AsyncDupService) {
774 typedef EchoTestService::WithAsyncMethod_RequestStream<
775 EchoTestService::WithAsyncMethod_ResponseStream<TestServiceImpl>>
778 duplicate::EchoTestService::AsyncService dup_service;
779 SetUpServer(&
service, &dup_service,
nullptr,
nullptr);
781 std::thread response_stream_handler_thread(HandleServerStreaming<SType>,
783 std::thread request_stream_handler_thread(HandleClientStreaming<SType>,
786 HandleEcho<duplicate::EchoTestService::AsyncService>, &dup_service,
789 SendEchoToDupService();
790 response_stream_handler_thread.join();
791 request_stream_handler_thread.join();
792 echo_handler_thread.join();
795 TEST_F(HybridEnd2endTest, GenericEcho) {
796 EchoTestService::WithGenericMethod_Echo<TestServiceImpl>
service;
797 AsyncGenericService generic_service;
798 SetUpServer(&
service,
nullptr, &generic_service,
nullptr);
800 std::thread generic_handler_thread(HandleGenericCall, &generic_service,
803 generic_handler_thread.join();
806 TEST_P(HybridEnd2endTest, CallbackGenericEcho) {
807 EchoTestService::WithGenericMethod_Echo<TestServiceImpl>
service;
808 class GenericEchoService :
public CallbackGenericService {
811 GenericCallbackServerContext*
context)
override {
821 void OnDone()
override {
delete this; }
822 void OnReadDone(
bool ok)
override {
832 void OnWriteDone(
bool ok)
override {
844 if (!SetUpServer(&
service,
nullptr,
nullptr, &generic_service)) {
851 TEST_F(HybridEnd2endTest, GenericEchoAsyncRequestStream) {
852 typedef EchoTestService::WithAsyncMethod_RequestStream<
853 EchoTestService::WithGenericMethod_Echo<TestServiceImpl>>
856 AsyncGenericService generic_service;
857 SetUpServer(&
service,
nullptr, &generic_service,
nullptr);
859 std::thread generic_handler_thread(HandleGenericCall, &generic_service,
861 std::thread request_stream_handler_thread(HandleClientStreaming<SType>,
864 generic_handler_thread.join();
865 request_stream_handler_thread.join();
869 TEST_F(HybridEnd2endTest, GenericEchoAsyncRequestStream_SyncDupService) {
870 typedef EchoTestService::WithAsyncMethod_RequestStream<
871 EchoTestService::WithGenericMethod_Echo<TestServiceImpl>>
874 AsyncGenericService generic_service;
875 TestServiceImplDupPkg dup_service;
876 SetUpServer(&
service, &dup_service, &generic_service,
nullptr);
878 std::thread generic_handler_thread(HandleGenericCall, &generic_service,
880 std::thread request_stream_handler_thread(HandleClientStreaming<SType>,
883 SendEchoToDupService();
884 generic_handler_thread.join();
885 request_stream_handler_thread.join();
889 TEST_F(HybridEnd2endTest, GenericEchoAsyncRequestStream_AsyncDupService) {
890 typedef EchoTestService::WithAsyncMethod_RequestStream<
891 EchoTestService::WithGenericMethod_Echo<TestServiceImpl>>
894 AsyncGenericService generic_service;
895 duplicate::EchoTestService::AsyncService dup_service;
896 SetUpServer(&
service, &dup_service, &generic_service,
nullptr);
898 std::thread generic_handler_thread(HandleGenericCall, &generic_service,
900 std::thread request_stream_handler_thread(HandleClientStreaming<SType>,
903 HandleEcho<duplicate::EchoTestService::AsyncService>, &dup_service,
906 SendEchoToDupService();
907 generic_handler_thread.join();
908 request_stream_handler_thread.join();
909 echo_handler_thread.join();
912 TEST_F(HybridEnd2endTest, GenericEchoAsyncRequestStreamResponseStream) {
913 typedef EchoTestService::WithAsyncMethod_RequestStream<
914 EchoTestService::WithGenericMethod_Echo<
915 EchoTestService::WithAsyncMethod_ResponseStream<TestServiceImpl>>>
918 AsyncGenericService generic_service;
919 SetUpServer(&
service,
nullptr, &generic_service,
nullptr);
921 std::thread generic_handler_thread(HandleGenericCall, &generic_service,
923 std::thread request_stream_handler_thread(HandleClientStreaming<SType>,
925 std::thread response_stream_handler_thread(HandleServerStreaming<SType>,
928 generic_handler_thread.join();
929 request_stream_handler_thread.join();
930 response_stream_handler_thread.join();
933 TEST_F(HybridEnd2endTest, GenericEchoRequestStreamAsyncResponseStream) {
934 typedef EchoTestService::WithGenericMethod_RequestStream<
935 EchoTestService::WithGenericMethod_Echo<
936 EchoTestService::WithAsyncMethod_ResponseStream<TestServiceImpl>>>
939 AsyncGenericService generic_service;
940 SetUpServer(&
service,
nullptr, &generic_service,
nullptr);
942 std::thread generic_handler_thread(HandleGenericCall, &generic_service,
944 std::thread generic_handler_thread2(HandleGenericCall, &generic_service,
946 std::thread response_stream_handler_thread(HandleServerStreaming<SType>,
949 generic_handler_thread.join();
950 generic_handler_thread2.join();
951 response_stream_handler_thread.join();
956 TEST_F(HybridEnd2endTest, GenericMethodWithoutGenericService) {
957 EchoTestService::WithGenericMethod_RequestStream<
958 EchoTestService::WithGenericMethod_Echo<
959 EchoTestService::WithAsyncMethod_ResponseStream<TestServiceImpl>>>
961 SetUpServer(&
service,
nullptr,
nullptr,
nullptr);
972 int main(
int argc,
char** argv) {