23 #include "absl/memory/memory.h"
40 #include "src/proto/grpc/health/v1/health.grpc.pb.h"
41 #include "src/proto/grpc/testing/duplicate/echo_duplicate.grpc.pb.h"
42 #include "src/proto/grpc/testing/echo.grpc.pb.h"
48 #ifdef GRPC_POSIX_SOCKET_EV
50 #endif // GRPC_POSIX_SOCKET_EV
52 #include <gtest/gtest.h>
54 using grpc::testing::EchoRequest;
55 using grpc::testing::EchoResponse;
56 using std::chrono::system_clock;
63 void*
tag(
int t) {
return reinterpret_cast<void*
>(
t); }
64 int detag(
void* p) {
return static_cast<int>(
reinterpret_cast<intptr_t>(
p)); }
93 int Next(CompletionQueue*
cq,
bool ignore_ok) {
98 return detag(got_tag);
101 template <
typename T>
103 CompletionQueue*
cq,
void** got_tag,
bool*
ok,
T deadline,
106 return cq->AsyncNext(got_tag,
ok, deadline);
109 return cq->DoThenAsyncNext(lambda, got_tag,
ok, deadline);
119 void Verify(CompletionQueue*
cq,
bool ignore_ok) {
171 void GotTag(
void* got_tag,
bool ok,
bool ignore_ok) {
181 if (it2->second.seen !=
nullptr) {
183 *it2->second.seen =
true;
206 bool plugin_has_sync_methods(std::unique_ptr<ServerBuilderPlugin>& plugin) {
207 return plugin->has_sync_methods();
216 void UpdateArguments(ChannelArguments* )
override {}
219 std::vector<std::unique_ptr<ServerBuilderPlugin>>* plugins)
override {
220 plugins->erase(std::remove_if(plugins->begin(), plugins->end(),
221 plugin_has_sync_methods),
228 TestScenario(
bool inproc_stub,
const std::string& creds_type,
bool hcs,
242 return out <<
"TestScenario{inproc=" << (
scenario.inproc ?
"true" :
"false")
243 <<
", credentials='" <<
scenario.credentials_type
244 <<
", health_check_service="
245 << (
scenario.health_check_service ?
"true" :
"false")
246 <<
"', message_size=" <<
scenario.message_content.size() <<
"}";
250 std::ostringstream
out;
255 class HealthCheck :
public health::v1::Health::Service {};
259 AsyncEnd2endTest() { GetParam().Log(); }
261 void SetUp()
override {
266 BuildAndStartServer();
269 void TearDown()
override {
275 void ServerShutdown() {
279 while (
cq_->Next(&ignored_tag, &ignored_ok)) {
287 void BuildAndStartServer() {
293 absl::make_unique<grpc::testing::EchoTestService::AsyncService>();
302 std::unique_ptr<ServerBuilderOption> sync_plugin_disabler(
303 new ServerBuilderSyncPluginDisabler());
309 ChannelArguments
args;
312 std::shared_ptr<Channel>
channel =
316 stub_ = grpc::testing::EchoTestService::NewStub(
channel);
320 for (
int i = 0;
i < num_rpcs;
i++) {
322 EchoRequest recv_request;
323 EchoResponse send_response;
324 EchoResponse recv_response;
327 ClientContext cli_ctx;
328 ServerContext srv_ctx;
332 std::unique_ptr<ClientAsyncResponseReader<EchoResponse>> response_reader(
335 service_->RequestEcho(&srv_ctx, &recv_request, &response_writer,
338 response_reader->Finish(&recv_response, &recv_status,
tag(4));
340 Verifier().Expect(2,
true).Verify(
cq_.get());
343 send_response.set_message(recv_request.message());
345 Verifier().Expect(3,
true).Expect(4,
true).Verify(
cq_.get());
347 EXPECT_EQ(send_response.message(), recv_response.message());
352 std::unique_ptr<ServerCompletionQueue>
cq_;
353 std::unique_ptr<grpc::testing::EchoTestService::Stub>
stub_;
355 std::unique_ptr<grpc::testing::EchoTestService::AsyncService>
service_;
361 TEST_P(AsyncEnd2endTest, SimpleRpc) {
366 TEST_P(AsyncEnd2endTest, SimpleRpcWithExpectedError) {
370 EchoRequest recv_request;
371 EchoResponse send_response;
372 EchoResponse recv_response;
375 ClientContext cli_ctx;
376 ServerContext srv_ctx;
378 ErrorStatus error_status;
381 error_status.set_code(1);
382 error_status.set_error_message(
"cancel error message");
383 *
send_request.mutable_param()->mutable_expected_error() = error_status;
385 std::unique_ptr<ClientAsyncResponseReader<EchoResponse>> response_reader(
388 srv_ctx.AsyncNotifyWhenDone(
tag(5));
389 service_->RequestEcho(&srv_ctx, &recv_request, &response_writer,
cq_.get(),
392 response_reader->Finish(&recv_response, &recv_status,
tag(4));
394 Verifier().Expect(2,
true).Verify(
cq_.get());
397 send_response.set_message(recv_request.message());
398 response_writer.Finish(
401 static_cast<StatusCode>(recv_request.param().expected_error().code()),
402 recv_request.param().expected_error().error_message()),
404 Verifier().Expect(3,
true).Expect(4,
true).Expect(5,
true).Verify(
cq_.get());
407 EXPECT_EQ(recv_status.error_code(), error_status.code());
408 EXPECT_EQ(recv_status.error_message(), error_status.error_message());
412 TEST_P(AsyncEnd2endTest, SequentialRpcs) {
417 TEST_P(AsyncEnd2endTest, ReconnectChannel) {
422 int poller_slowdown_factor = 1;
423 #ifdef GRPC_POSIX_SOCKET_EV
427 if (0 == strcmp(poller.get(),
"poll")) {
428 poller_slowdown_factor = 2;
430 #endif // GRPC_POSIX_SOCKET_EV
434 BuildAndStartServer();
450 TEST_P(AsyncEnd2endTest, WaitAndShutdownTest) {
461 TEST_P(AsyncEnd2endTest, ShutdownThenWait) {
470 TEST_P(AsyncEnd2endTest, AsyncNextRpc) {
474 EchoRequest recv_request;
475 EchoResponse send_response;
476 EchoResponse recv_response;
479 ClientContext cli_ctx;
480 ServerContext srv_ctx;
484 std::unique_ptr<ClientAsyncResponseReader<EchoResponse>> response_reader(
494 service_->RequestEcho(&srv_ctx, &recv_request, &response_writer,
cq_.get(),
496 response_reader->Finish(&recv_response, &recv_status,
tag(4));
498 Verifier().Expect(2,
true).Verify(
cq_.get(), time_limit);
501 send_response.set_message(recv_request.message());
503 Verifier().Expect(3,
true).Expect(4,
true).Verify(
506 EXPECT_EQ(send_response.message(), recv_response.message());
511 TEST_P(AsyncEnd2endTest, DoThenAsyncNextRpc) {
515 EchoRequest recv_request;
516 EchoResponse send_response;
517 EchoResponse recv_response;
520 ClientContext cli_ctx;
521 ServerContext srv_ctx;
525 std::unique_ptr<ClientAsyncResponseReader<EchoResponse>> response_reader(
535 auto resp_writer_ptr = &response_writer;
536 auto lambda_2 = [&,
this, resp_writer_ptr]() {
537 service_->RequestEcho(&srv_ctx, &recv_request, resp_writer_ptr,
cq_.get(),
540 response_reader->Finish(&recv_response, &recv_status,
tag(4));
542 Verifier().Expect(2,
true).Verify(
cq_.get(), time_limit, lambda_2);
545 send_response.set_message(recv_request.message());
546 auto lambda_3 = [resp_writer_ptr, send_response]() {
549 Verifier().Expect(3,
true).Expect(4,
true).Verify(
552 EXPECT_EQ(send_response.message(), recv_response.message());
557 TEST_P(AsyncEnd2endTest, SimpleClientStreaming) {
561 EchoRequest recv_request;
562 EchoResponse send_response;
563 EchoResponse recv_response;
565 ClientContext cli_ctx;
566 ServerContext srv_ctx;
567 ServerAsyncReader<EchoResponse, EchoRequest> srv_stream(&srv_ctx);
570 std::unique_ptr<ClientAsyncWriter<EchoRequest>> cli_stream(
571 stub_->AsyncRequestStream(&cli_ctx, &recv_response,
cq_.get(),
tag(1)));
573 service_->RequestRequestStream(&srv_ctx, &srv_stream,
cq_.get(),
cq_.get(),
576 Verifier().Expect(2,
true).Expect(1,
true).Verify(
cq_.get());
579 srv_stream.Read(&recv_request,
tag(4));
580 Verifier().Expect(3,
true).Expect(4,
true).Verify(
cq_.get());
584 srv_stream.Read(&recv_request,
tag(6));
585 Verifier().Expect(5,
true).Expect(6,
true).Verify(
cq_.get());
588 cli_stream->WritesDone(
tag(7));
589 srv_stream.Read(&recv_request,
tag(8));
590 Verifier().Expect(7,
true).Expect(8,
false).Verify(
cq_.get());
592 send_response.set_message(recv_request.message());
594 cli_stream->Finish(&recv_status,
tag(10));
595 Verifier().Expect(9,
true).Expect(10,
true).Verify(
cq_.get());
597 EXPECT_EQ(send_response.message(), recv_response.message());
602 TEST_P(AsyncEnd2endTest, SimpleClientStreamingWithCoalescingApi) {
606 EchoRequest recv_request;
607 EchoResponse send_response;
608 EchoResponse recv_response;
610 ClientContext cli_ctx;
611 ServerContext srv_ctx;
612 ServerAsyncReader<EchoResponse, EchoRequest> srv_stream(&srv_ctx);
615 cli_ctx.set_initial_metadata_corked(
true);
617 std::unique_ptr<ClientAsyncWriter<EchoRequest>> cli_stream(
618 stub_->AsyncRequestStream(&cli_ctx, &recv_response,
cq_.get(),
tag(1)));
620 service_->RequestRequestStream(&srv_ctx, &srv_stream,
cq_.get(),
cq_.get(),
627 Verifier().Expect(2,
true).ExpectMaybe(3,
true, &seen3).Verify(
cq_.get());
629 srv_stream.Read(&recv_request,
tag(4));
631 Verifier().ExpectUnless(3,
true, seen3).Expect(4,
true).Verify(
cq_.get());
636 srv_stream.Read(&recv_request,
tag(6));
637 Verifier().Expect(5,
true).Expect(6,
true).Verify(
cq_.get());
640 srv_stream.Read(&recv_request,
tag(7));
641 Verifier().Expect(7,
false).Verify(
cq_.get());
643 send_response.set_message(recv_request.message());
645 cli_stream->Finish(&recv_status,
tag(9));
646 Verifier().Expect(8,
true).Expect(9,
true).Verify(
cq_.get());
648 EXPECT_EQ(send_response.message(), recv_response.message());
653 TEST_P(AsyncEnd2endTest, SimpleServerStreaming) {
657 EchoRequest recv_request;
658 EchoResponse send_response;
659 EchoResponse recv_response;
661 ClientContext cli_ctx;
662 ServerContext srv_ctx;
663 ServerAsyncWriter<EchoResponse> srv_stream(&srv_ctx);
666 std::unique_ptr<ClientAsyncReader<EchoResponse>> cli_stream(
669 service_->RequestResponseStream(&srv_ctx, &recv_request, &srv_stream,
672 Verifier().Expect(1,
true).Expect(2,
true).Verify(
cq_.get());
675 send_response.set_message(recv_request.message());
676 srv_stream.Write(send_response,
tag(3));
677 cli_stream->Read(&recv_response,
tag(4));
678 Verifier().Expect(3,
true).Expect(4,
true).Verify(
cq_.get());
679 EXPECT_EQ(send_response.message(), recv_response.message());
681 srv_stream.Write(send_response,
tag(5));
682 cli_stream->Read(&recv_response,
tag(6));
683 Verifier().Expect(5,
true).Expect(6,
true).Verify(
cq_.get());
684 EXPECT_EQ(send_response.message(), recv_response.message());
687 cli_stream->Read(&recv_response,
tag(8));
688 Verifier().Expect(7,
true).Expect(8,
false).Verify(
cq_.get());
690 cli_stream->Finish(&recv_status,
tag(9));
691 Verifier().Expect(9,
true).Verify(
cq_.get());
697 TEST_P(AsyncEnd2endTest, SimpleServerStreamingWithCoalescingApiWAF) {
701 EchoRequest recv_request;
702 EchoResponse send_response;
703 EchoResponse recv_response;
705 ClientContext cli_ctx;
706 ServerContext srv_ctx;
707 ServerAsyncWriter<EchoResponse> srv_stream(&srv_ctx);
710 std::unique_ptr<ClientAsyncReader<EchoResponse>> cli_stream(
713 service_->RequestResponseStream(&srv_ctx, &recv_request, &srv_stream,
716 Verifier().Expect(1,
true).Expect(2,
true).Verify(
cq_.get());
719 send_response.set_message(recv_request.message());
720 srv_stream.Write(send_response,
tag(3));
721 cli_stream->Read(&recv_response,
tag(4));
722 Verifier().Expect(3,
true).Expect(4,
true).Verify(
cq_.get());
723 EXPECT_EQ(send_response.message(), recv_response.message());
725 srv_stream.WriteAndFinish(send_response, WriteOptions(),
Status::OK,
tag(5));
726 cli_stream->Read(&recv_response,
tag(6));
727 Verifier().Expect(5,
true).Expect(6,
true).Verify(
cq_.get());
728 EXPECT_EQ(send_response.message(), recv_response.message());
730 cli_stream->Read(&recv_response,
tag(7));
731 Verifier().Expect(7,
false).Verify(
cq_.get());
733 cli_stream->Finish(&recv_status,
tag(8));
734 Verifier().Expect(8,
true).Verify(
cq_.get());
740 TEST_P(AsyncEnd2endTest, SimpleServerStreamingWithCoalescingApiWL) {
744 EchoRequest recv_request;
745 EchoResponse send_response;
746 EchoResponse recv_response;
748 ClientContext cli_ctx;
749 ServerContext srv_ctx;
750 ServerAsyncWriter<EchoResponse> srv_stream(&srv_ctx);
753 std::unique_ptr<ClientAsyncReader<EchoResponse>> cli_stream(
756 service_->RequestResponseStream(&srv_ctx, &recv_request, &srv_stream,
759 Verifier().Expect(1,
true).Expect(2,
true).Verify(
cq_.get());
762 send_response.set_message(recv_request.message());
763 srv_stream.Write(send_response,
tag(3));
764 cli_stream->Read(&recv_response,
tag(4));
765 Verifier().Expect(3,
true).Expect(4,
true).Verify(
cq_.get());
766 EXPECT_EQ(send_response.message(), recv_response.message());
768 srv_stream.WriteLast(send_response, WriteOptions(),
tag(5));
769 cli_stream->Read(&recv_response,
tag(6));
771 Verifier().Expect(5,
true).Expect(6,
true).Expect(7,
true).Verify(
cq_.get());
772 EXPECT_EQ(send_response.message(), recv_response.message());
774 cli_stream->Read(&recv_response,
tag(8));
775 Verifier().Expect(8,
false).Verify(
cq_.get());
777 cli_stream->Finish(&recv_status,
tag(9));
778 Verifier().Expect(9,
true).Verify(
cq_.get());
784 TEST_P(AsyncEnd2endTest, SimpleBidiStreaming) {
788 EchoRequest recv_request;
789 EchoResponse send_response;
790 EchoResponse recv_response;
792 ClientContext cli_ctx;
793 ServerContext srv_ctx;
794 ServerAsyncReaderWriter<EchoResponse, EchoRequest> srv_stream(&srv_ctx);
797 std::unique_ptr<ClientAsyncReaderWriter<EchoRequest, EchoResponse>>
798 cli_stream(
stub_->AsyncBidiStream(&cli_ctx,
cq_.get(),
tag(1)));
800 service_->RequestBidiStream(&srv_ctx, &srv_stream,
cq_.get(),
cq_.get(),
803 Verifier().Expect(1,
true).Expect(2,
true).Verify(
cq_.get());
806 srv_stream.Read(&recv_request,
tag(4));
807 Verifier().Expect(3,
true).Expect(4,
true).Verify(
cq_.get());
810 send_response.set_message(recv_request.message());
811 srv_stream.Write(send_response,
tag(5));
812 cli_stream->Read(&recv_response,
tag(6));
813 Verifier().Expect(5,
true).Expect(6,
true).Verify(
cq_.get());
814 EXPECT_EQ(send_response.message(), recv_response.message());
816 cli_stream->WritesDone(
tag(7));
817 srv_stream.Read(&recv_request,
tag(8));
818 Verifier().Expect(7,
true).Expect(8,
false).Verify(
cq_.get());
821 cli_stream->Finish(&recv_status,
tag(10));
822 Verifier().Expect(9,
true).Expect(10,
true).Verify(
cq_.get());
828 TEST_P(AsyncEnd2endTest, SimpleBidiStreamingWithCoalescingApiWAF) {
832 EchoRequest recv_request;
833 EchoResponse send_response;
834 EchoResponse recv_response;
836 ClientContext cli_ctx;
837 ServerContext srv_ctx;
838 ServerAsyncReaderWriter<EchoResponse, EchoRequest> srv_stream(&srv_ctx);
841 cli_ctx.set_initial_metadata_corked(
true);
842 std::unique_ptr<ClientAsyncReaderWriter<EchoRequest, EchoResponse>>
843 cli_stream(
stub_->AsyncBidiStream(&cli_ctx,
cq_.get(),
tag(1)));
845 service_->RequestBidiStream(&srv_ctx, &srv_stream,
cq_.get(),
cq_.get(),
852 Verifier().Expect(2,
true).ExpectMaybe(3,
true, &seen3).Verify(
cq_.get());
854 srv_stream.Read(&recv_request,
tag(4));
856 Verifier().ExpectUnless(3,
true, seen3).Expect(4,
true).Verify(
cq_.get());
859 srv_stream.Read(&recv_request,
tag(5));
860 Verifier().Expect(5,
false).Verify(
cq_.get());
862 send_response.set_message(recv_request.message());
863 srv_stream.WriteAndFinish(send_response, WriteOptions(),
Status::OK,
tag(6));
864 cli_stream->Read(&recv_response,
tag(7));
865 Verifier().Expect(6,
true).Expect(7,
true).Verify(
cq_.get());
866 EXPECT_EQ(send_response.message(), recv_response.message());
868 cli_stream->Finish(&recv_status,
tag(8));
869 Verifier().Expect(8,
true).Verify(
cq_.get());
875 TEST_P(AsyncEnd2endTest, SimpleBidiStreamingWithCoalescingApiWL) {
879 EchoRequest recv_request;
880 EchoResponse send_response;
881 EchoResponse recv_response;
883 ClientContext cli_ctx;
884 ServerContext srv_ctx;
885 ServerAsyncReaderWriter<EchoResponse, EchoRequest> srv_stream(&srv_ctx);
888 cli_ctx.set_initial_metadata_corked(
true);
889 std::unique_ptr<ClientAsyncReaderWriter<EchoRequest, EchoResponse>>
890 cli_stream(
stub_->AsyncBidiStream(&cli_ctx,
cq_.get(),
tag(1)));
892 service_->RequestBidiStream(&srv_ctx, &srv_stream,
cq_.get(),
cq_.get(),
899 Verifier().Expect(2,
true).ExpectMaybe(3,
true, &seen3).Verify(
cq_.get());
901 srv_stream.Read(&recv_request,
tag(4));
903 Verifier().ExpectUnless(3,
true, seen3).Expect(4,
true).Verify(
cq_.get());
906 srv_stream.Read(&recv_request,
tag(5));
907 Verifier().Expect(5,
false).Verify(
cq_.get());
909 send_response.set_message(recv_request.message());
910 srv_stream.WriteLast(send_response, WriteOptions(),
tag(6));
912 cli_stream->Read(&recv_response,
tag(8));
913 Verifier().Expect(6,
true).Expect(7,
true).Expect(8,
true).Verify(
cq_.get());
914 EXPECT_EQ(send_response.message(), recv_response.message());
916 cli_stream->Finish(&recv_status,
tag(9));
917 Verifier().Expect(9,
true).Verify(
cq_.get());
923 TEST_P(AsyncEnd2endTest, ClientInitialMetadataRpc) {
927 EchoRequest recv_request;
928 EchoResponse send_response;
929 EchoResponse recv_response;
932 ClientContext cli_ctx;
933 ServerContext srv_ctx;
937 std::pair<std::string, std::string> meta1(
"key1",
"val1");
938 std::pair<std::string, std::string> meta2(
"key2",
"val2");
939 std::pair<std::string, std::string> meta3(
"g.r.d-bin",
"xyz");
940 cli_ctx.AddMetadata(meta1.first, meta1.second);
941 cli_ctx.AddMetadata(meta2.first, meta2.second);
942 cli_ctx.AddMetadata(meta3.first, meta3.second);
944 std::unique_ptr<ClientAsyncResponseReader<EchoResponse>> response_reader(
946 response_reader->Finish(&recv_response, &recv_status,
tag(4));
948 service_->RequestEcho(&srv_ctx, &recv_request, &response_writer,
cq_.get(),
950 Verifier().Expect(2,
true).Verify(
cq_.get());
952 const auto& client_initial_metadata = srv_ctx.client_metadata();
954 ToString(client_initial_metadata.find(meta1.first)->second));
956 ToString(client_initial_metadata.find(meta2.first)->second));
958 ToString(client_initial_metadata.find(meta3.first)->second));
959 EXPECT_GE(client_initial_metadata.size(),
static_cast<size_t>(2));
961 send_response.set_message(recv_request.message());
963 Verifier().Expect(3,
true).Expect(4,
true).Verify(
cq_.get());
965 EXPECT_EQ(send_response.message(), recv_response.message());
969 TEST_P(AsyncEnd2endTest, ServerInitialMetadataRpc) {
973 EchoRequest recv_request;
974 EchoResponse send_response;
975 EchoResponse recv_response;
978 ClientContext cli_ctx;
979 ServerContext srv_ctx;
983 std::pair<std::string, std::string> meta1(
"key1",
"val1");
984 std::pair<std::string, std::string> meta2(
"key2",
"val2");
986 std::unique_ptr<ClientAsyncResponseReader<EchoResponse>> response_reader(
988 response_reader->ReadInitialMetadata(
tag(4));
990 service_->RequestEcho(&srv_ctx, &recv_request, &response_writer,
cq_.get(),
992 Verifier().Expect(2,
true).Verify(
cq_.get());
994 srv_ctx.AddInitialMetadata(meta1.first, meta1.second);
995 srv_ctx.AddInitialMetadata(meta2.first, meta2.second);
996 response_writer.SendInitialMetadata(
tag(3));
997 Verifier().Expect(3,
true).Expect(4,
true).Verify(
cq_.get());
998 const auto& server_initial_metadata = cli_ctx.GetServerInitialMetadata();
1000 ToString(server_initial_metadata.find(meta1.first)->second));
1002 ToString(server_initial_metadata.find(meta2.first)->second));
1003 EXPECT_EQ(
static_cast<size_t>(2), server_initial_metadata.size());
1005 send_response.set_message(recv_request.message());
1007 response_reader->Finish(&recv_response, &recv_status,
tag(6));
1008 Verifier().Expect(5,
true).Expect(6,
true).Verify(
cq_.get());
1010 EXPECT_EQ(send_response.message(), recv_response.message());
1015 TEST_P(AsyncEnd2endTest, ServerInitialMetadataServerStreaming) {
1018 EchoRequest recv_request;
1019 EchoResponse send_response;
1020 EchoResponse recv_response;
1022 ClientContext cli_ctx;
1023 ServerContext srv_ctx;
1024 ServerAsyncWriter<EchoResponse> srv_stream(&srv_ctx);
1026 std::pair<::std::string, ::std::string> meta1(
"key1",
"val1");
1027 std::pair<::std::string, ::std::string> meta2(
"key2",
"val2");
1029 std::unique_ptr<ClientAsyncReader<EchoResponse>> cli_stream(
1031 cli_stream->ReadInitialMetadata(
tag(11));
1032 service_->RequestResponseStream(&srv_ctx, &recv_request, &srv_stream,
1035 Verifier().Expect(1,
true).Expect(2,
true).Verify(
cq_.get());
1037 srv_ctx.AddInitialMetadata(meta1.first, meta1.second);
1038 srv_ctx.AddInitialMetadata(meta2.first, meta2.second);
1039 srv_stream.SendInitialMetadata(
tag(10));
1040 Verifier().Expect(10,
true).Expect(11,
true).Verify(
cq_.get());
1041 auto server_initial_metadata = cli_ctx.GetServerInitialMetadata();
1043 ToString(server_initial_metadata.find(meta1.first)->second));
1045 ToString(server_initial_metadata.find(meta2.first)->second));
1046 EXPECT_EQ(
static_cast<size_t>(2), server_initial_metadata.size());
1048 srv_stream.Write(send_response,
tag(3));
1050 cli_stream->Read(&recv_response,
tag(4));
1051 Verifier().Expect(3,
true).Expect(4,
true).Verify(
cq_.get());
1053 srv_stream.Write(send_response,
tag(5));
1054 cli_stream->Read(&recv_response,
tag(6));
1055 Verifier().Expect(5,
true).Expect(6,
true).Verify(
cq_.get());
1058 cli_stream->Read(&recv_response,
tag(8));
1059 Verifier().Expect(7,
true).Expect(8,
false).Verify(
cq_.get());
1061 cli_stream->Finish(&recv_status,
tag(9));
1062 Verifier().Expect(9,
true).Verify(
cq_.get());
1069 TEST_P(AsyncEnd2endTest, ServerInitialMetadataServerStreamingImplicit) {
1072 EchoRequest recv_request;
1073 EchoResponse send_response;
1074 EchoResponse recv_response;
1076 ClientContext cli_ctx;
1077 ServerContext srv_ctx;
1078 ServerAsyncWriter<EchoResponse> srv_stream(&srv_ctx);
1081 std::pair<::std::string, ::std::string> meta1(
"key1",
"val1");
1082 std::pair<::std::string, ::std::string> meta2(
"key2",
"val2");
1084 std::unique_ptr<ClientAsyncReader<EchoResponse>> cli_stream(
1086 service_->RequestResponseStream(&srv_ctx, &recv_request, &srv_stream,
1089 Verifier().Expect(1,
true).Expect(2,
true).Verify(
cq_.get());
1092 srv_ctx.AddInitialMetadata(meta1.first, meta1.second);
1093 srv_ctx.AddInitialMetadata(meta2.first, meta2.second);
1094 send_response.set_message(recv_request.message());
1095 srv_stream.Write(send_response,
tag(3));
1097 cli_stream->Read(&recv_response,
tag(4));
1098 Verifier().Expect(3,
true).Expect(4,
true).Verify(
cq_.get());
1099 EXPECT_EQ(send_response.message(), recv_response.message());
1101 auto server_initial_metadata = cli_ctx.GetServerInitialMetadata();
1103 ToString(server_initial_metadata.find(meta1.first)->second));
1105 ToString(server_initial_metadata.find(meta2.first)->second));
1106 EXPECT_EQ(
static_cast<size_t>(2), server_initial_metadata.size());
1108 srv_stream.Write(send_response,
tag(5));
1109 cli_stream->Read(&recv_response,
tag(6));
1110 Verifier().Expect(5,
true).Expect(6,
true).Verify(
cq_.get());
1113 cli_stream->Read(&recv_response,
tag(8));
1114 Verifier().Expect(7,
true).Expect(8,
false).Verify(
cq_.get());
1116 cli_stream->Finish(&recv_status,
tag(9));
1117 Verifier().Expect(9,
true).Verify(
cq_.get());
1122 TEST_P(AsyncEnd2endTest, ServerTrailingMetadataRpc) {
1126 EchoRequest recv_request;
1127 EchoResponse send_response;
1128 EchoResponse recv_response;
1131 ClientContext cli_ctx;
1132 ServerContext srv_ctx;
1136 std::pair<std::string, std::string> meta1(
"key1",
"val1");
1137 std::pair<std::string, std::string> meta2(
"key2",
"val2");
1139 std::unique_ptr<ClientAsyncResponseReader<EchoResponse>> response_reader(
1141 response_reader->Finish(&recv_response, &recv_status,
tag(5));
1143 service_->RequestEcho(&srv_ctx, &recv_request, &response_writer,
cq_.get(),
1145 Verifier().Expect(2,
true).Verify(
cq_.get());
1147 response_writer.SendInitialMetadata(
tag(3));
1148 Verifier().Expect(3,
true).Verify(
cq_.get());
1150 send_response.set_message(recv_request.message());
1151 srv_ctx.AddTrailingMetadata(meta1.first, meta1.second);
1152 srv_ctx.AddTrailingMetadata(meta2.first, meta2.second);
1155 Verifier().Expect(4,
true).Expect(5,
true).Verify(
cq_.get());
1157 EXPECT_EQ(send_response.message(), recv_response.message());
1159 const auto& server_trailing_metadata = cli_ctx.GetServerTrailingMetadata();
1161 ToString(server_trailing_metadata.find(meta1.first)->second));
1163 ToString(server_trailing_metadata.find(meta2.first)->second));
1164 EXPECT_EQ(
static_cast<size_t>(2), server_trailing_metadata.size());
1167 TEST_P(AsyncEnd2endTest, MetadataRpc) {
1171 EchoRequest recv_request;
1172 EchoResponse send_response;
1173 EchoResponse recv_response;
1176 ClientContext cli_ctx;
1177 ServerContext srv_ctx;
1181 std::pair<std::string, std::string> meta1(
"key1",
"val1");
1182 std::pair<std::string, std::string> meta2(
1184 std::string(
"\xc0\xc1\xc2\xc3\xc4\xc5\xc6\xc7\xc8\xc9\xca\xcb\xcc", 13));
1185 std::pair<std::string, std::string> meta3(
"key3",
"val3");
1186 std::pair<std::string, std::string> meta6(
1188 std::string(
"\x10\x11\x12\x13\x14\x15\x16\x17\x18\x19\x1a\x1b\x1c\x1d",
1190 std::pair<std::string, std::string> meta5(
"key5",
"val5");
1191 std::pair<std::string, std::string> meta4(
1194 "\xe0\xe1\xe2\xe3\xe4\xe5\xe6\xe7\xe8\xe9\xea\xeb\xec\xed\xee", 15));
1196 cli_ctx.AddMetadata(meta1.first, meta1.second);
1197 cli_ctx.AddMetadata(meta2.first, meta2.second);
1199 std::unique_ptr<ClientAsyncResponseReader<EchoResponse>> response_reader(
1201 response_reader->ReadInitialMetadata(
tag(4));
1203 service_->RequestEcho(&srv_ctx, &recv_request, &response_writer,
cq_.get(),
1205 Verifier().Expect(2,
true).Verify(
cq_.get());
1207 const auto& client_initial_metadata = srv_ctx.client_metadata();
1209 ToString(client_initial_metadata.find(meta1.first)->second));
1211 ToString(client_initial_metadata.find(meta2.first)->second));
1212 EXPECT_GE(client_initial_metadata.size(),
static_cast<size_t>(2));
1214 srv_ctx.AddInitialMetadata(meta3.first, meta3.second);
1215 srv_ctx.AddInitialMetadata(meta4.first, meta4.second);
1216 response_writer.SendInitialMetadata(
tag(3));
1217 Verifier().Expect(3,
true).Expect(4,
true).Verify(
cq_.get());
1218 const auto& server_initial_metadata = cli_ctx.GetServerInitialMetadata();
1220 ToString(server_initial_metadata.find(meta3.first)->second));
1222 ToString(server_initial_metadata.find(meta4.first)->second));
1223 EXPECT_GE(server_initial_metadata.size(),
static_cast<size_t>(2));
1225 send_response.set_message(recv_request.message());
1226 srv_ctx.AddTrailingMetadata(meta5.first, meta5.second);
1227 srv_ctx.AddTrailingMetadata(meta6.first, meta6.second);
1229 response_reader->Finish(&recv_response, &recv_status,
tag(6));
1231 Verifier().Expect(5,
true).Expect(6,
true).Verify(
cq_.get());
1233 EXPECT_EQ(send_response.message(), recv_response.message());
1235 const auto& server_trailing_metadata = cli_ctx.GetServerTrailingMetadata();
1237 ToString(server_trailing_metadata.find(meta5.first)->second));
1239 ToString(server_trailing_metadata.find(meta6.first)->second));
1240 EXPECT_GE(server_trailing_metadata.size(),
static_cast<size_t>(2));
1244 TEST_P(AsyncEnd2endTest, ServerCheckCancellation) {
1248 EchoRequest recv_request;
1249 EchoResponse send_response;
1250 EchoResponse recv_response;
1253 ClientContext cli_ctx;
1254 ServerContext srv_ctx;
1258 std::unique_ptr<ClientAsyncResponseReader<EchoResponse>> response_reader(
1260 response_reader->Finish(&recv_response, &recv_status,
tag(4));
1262 srv_ctx.AsyncNotifyWhenDone(
tag(5));
1263 service_->RequestEcho(&srv_ctx, &recv_request, &response_writer,
cq_.get(),
1266 Verifier().Expect(2,
true).Verify(
cq_.get());
1269 cli_ctx.TryCancel();
1270 Verifier().Expect(5,
true).Expect(4,
true).Verify(
cq_.get());
1277 TEST_P(AsyncEnd2endTest, ServerCheckDone) {
1281 EchoRequest recv_request;
1282 EchoResponse send_response;
1283 EchoResponse recv_response;
1286 ClientContext cli_ctx;
1287 ServerContext srv_ctx;
1291 std::unique_ptr<ClientAsyncResponseReader<EchoResponse>> response_reader(
1293 response_reader->Finish(&recv_response, &recv_status,
tag(4));
1295 srv_ctx.AsyncNotifyWhenDone(
tag(5));
1296 service_->RequestEcho(&srv_ctx, &recv_request, &response_writer,
cq_.get(),
1299 Verifier().Expect(2,
true).Verify(
cq_.get());
1302 send_response.set_message(recv_request.message());
1304 Verifier().Expect(3,
true).Expect(4,
true).Expect(5,
true).Verify(
cq_.get());
1307 EXPECT_EQ(send_response.message(), recv_response.message());
1311 TEST_P(AsyncEnd2endTest, UnimplementedRpc) {
1312 ChannelArguments
args;
1315 std::shared_ptr<Channel>
channel =
1317 channel_creds,
args)
1319 std::unique_ptr<grpc::testing::UnimplementedEchoService::Stub>
stub;
1320 stub = grpc::testing::UnimplementedEchoService::NewStub(
channel);
1322 EchoResponse recv_response;
1325 ClientContext cli_ctx;
1327 std::unique_ptr<ClientAsyncResponseReader<EchoResponse>> response_reader(
1330 response_reader->Finish(&recv_response, &recv_status,
tag(4));
1331 Verifier().Expect(4,
true).Verify(
cq_.get());
1334 EXPECT_EQ(
"", recv_status.error_message());
1340 class AsyncEnd2endServerTryCancelTest :
public AsyncEnd2endTest {
1361 void TestClientStreamingServerCancel(
1365 EchoRequest recv_request;
1366 EchoResponse send_response;
1367 EchoResponse recv_response;
1370 ClientContext cli_ctx;
1371 ServerContext srv_ctx;
1372 ServerAsyncReader<EchoResponse, EchoRequest> srv_stream(&srv_ctx);
1375 CompletionQueue cli_cq;
1377 std::unique_ptr<ClientAsyncWriter<EchoRequest>> cli_stream(
1378 stub_->AsyncRequestStream(&cli_ctx, &recv_response, &cli_cq,
tag(1)));
1382 srv_ctx.AsyncNotifyWhenDone(
tag(11));
1383 service_->RequestRequestStream(&srv_ctx, &srv_stream,
cq_.get(),
cq_.get(),
1385 std::thread t1([&cli_cq] { Verifier().Expect(1,
true).Verify(&cli_cq); });
1386 Verifier().Expect(2,
true).Verify(
cq_.get());
1389 bool expected_server_cq_result =
true;
1390 bool expected_client_cq_result =
true;
1393 srv_ctx.TryCancel();
1394 Verifier().Expect(11,
true).Verify(
cq_.get());
1400 expected_server_cq_result =
false;
1401 expected_client_cq_result =
false;
1404 bool ignore_client_cq_result =
1408 std::thread cli_thread([&cli_cq, &cli_stream, &expected_client_cq_result,
1409 &ignore_client_cq_result] {
1412 for (
int tag_idx = 3; tag_idx <= 5; tag_idx++) {
1416 .Expect(tag_idx, expected_client_cq_result)
1417 .Verify(&cli_cq, ignore_client_cq_result);
1419 cli_stream->WritesDone(
tag(6));
1422 .Expect(6, expected_client_cq_result)
1423 .Verify(&cli_cq, ignore_client_cq_result);
1426 bool ignore_cq_result =
false;
1427 bool want_done_tag =
false;
1430 auto verif = Verifier();
1433 server_try_cancel_thd =
1434 new std::thread([&srv_ctx] { srv_ctx.TryCancel(); });
1439 ignore_cq_result =
true;
1442 want_done_tag =
true;
1443 verif.Expect(11,
true);
1448 for (
int tag_idx = 6; tag_idx <= 8; tag_idx++) {
1449 srv_stream.Read(&recv_request,
tag(tag_idx));
1453 int got_tag = verif.Expect(tag_idx, expected_server_cq_result)
1454 .Next(
cq_.get(), ignore_cq_result);
1455 GPR_ASSERT((got_tag == tag_idx) || (got_tag == 11 && want_done_tag));
1456 if (got_tag == 11) {
1458 want_done_tag =
false;
1460 EXPECT_EQ(verif.Next(
cq_.get(), ignore_cq_result), tag_idx);
1466 if (server_try_cancel_thd !=
nullptr) {
1467 server_try_cancel_thd->join();
1468 delete server_try_cancel_thd;
1472 srv_ctx.TryCancel();
1473 want_done_tag =
true;
1474 verif.Expect(11,
true);
1477 if (want_done_tag) {
1478 verif.Verify(
cq_.get());
1480 want_done_tag =
false;
1490 Verifier().Expect(9,
false).Verify(
cq_.get());
1493 cli_stream->Finish(&recv_status,
tag(10));
1494 Verifier().Expect(10,
true).Verify(&cli_cq);
1501 while (cli_cq.Next(&phony_tag, &phony_ok)) {
1517 void TestServerStreamingServerCancel(
1522 EchoRequest recv_request;
1523 EchoResponse send_response;
1525 ClientContext cli_ctx;
1526 ServerContext srv_ctx;
1527 ServerAsyncWriter<EchoResponse> srv_stream(&srv_ctx);
1531 CompletionQueue cli_cq;
1532 std::unique_ptr<ClientAsyncReader<EchoResponse>> cli_stream(
1536 srv_ctx.AsyncNotifyWhenDone(
tag(11));
1537 service_->RequestResponseStream(&srv_ctx, &recv_request, &srv_stream,
1540 std::thread t1([&cli_cq] { Verifier().Expect(1,
true).Verify(&cli_cq); });
1541 Verifier().Expect(2,
true).Verify(
cq_.get());
1546 bool expected_cq_result =
true;
1547 bool ignore_cq_result =
false;
1548 bool want_done_tag =
false;
1549 bool expected_client_cq_result =
true;
1550 bool ignore_client_cq_result =
1554 srv_ctx.TryCancel();
1555 Verifier().Expect(11,
true).Verify(
cq_.get());
1560 expected_cq_result =
false;
1561 expected_client_cq_result =
false;
1564 std::thread cli_thread([&cli_cq, &cli_stream, &expected_client_cq_result,
1565 &ignore_client_cq_result] {
1567 for (
int tag_idx = 6; tag_idx <= 8; tag_idx++) {
1568 EchoResponse recv_response;
1569 cli_stream->Read(&recv_response,
tag(tag_idx));
1571 .Expect(tag_idx, expected_client_cq_result)
1572 .Verify(&cli_cq, ignore_client_cq_result);
1578 auto verif = Verifier();
1581 server_try_cancel_thd =
1582 new std::thread([&srv_ctx] { srv_ctx.TryCancel(); });
1588 ignore_cq_result =
true;
1591 want_done_tag =
true;
1592 verif.Expect(11,
true);
1597 for (
int tag_idx = 3; tag_idx <= 5; tag_idx++) {
1599 srv_stream.Write(send_response,
tag(tag_idx));
1603 int got_tag = verif.Expect(tag_idx, expected_cq_result)
1604 .Next(
cq_.get(), ignore_cq_result);
1605 GPR_ASSERT((got_tag == tag_idx) || (got_tag == 11 && want_done_tag));
1606 if (got_tag == 11) {
1608 want_done_tag =
false;
1610 EXPECT_EQ(verif.Next(
cq_.get(), ignore_cq_result), tag_idx);
1614 if (server_try_cancel_thd !=
nullptr) {
1615 server_try_cancel_thd->join();
1616 delete server_try_cancel_thd;
1620 srv_ctx.TryCancel();
1621 want_done_tag =
true;
1622 verif.Expect(11,
true);
1625 if (want_done_tag) {
1626 verif.Verify(
cq_.get());
1628 want_done_tag =
false;
1639 Verifier().Expect(9,
false).Verify(
cq_.get());
1642 cli_stream->Finish(&recv_status,
tag(10));
1643 Verifier().Expect(10,
true).Verify(&cli_cq);
1650 while (cli_cq.Next(&phony_tag, &phony_ok)) {
1668 void TestBidiStreamingServerCancel(
1673 EchoRequest recv_request;
1674 EchoResponse send_response;
1675 EchoResponse recv_response;
1677 ClientContext cli_ctx;
1678 ServerContext srv_ctx;
1679 ServerAsyncReaderWriter<EchoResponse, EchoRequest> srv_stream(&srv_ctx);
1682 std::unique_ptr<ClientAsyncReaderWriter<EchoRequest, EchoResponse>>
1683 cli_stream(
stub_->AsyncBidiStream(&cli_ctx,
cq_.get(),
tag(1)));
1687 srv_ctx.AsyncNotifyWhenDone(
tag(11));
1688 service_->RequestBidiStream(&srv_ctx, &srv_stream,
cq_.get(),
cq_.get(),
1690 Verifier().Expect(1,
true).Expect(2,
true).Verify(
cq_.get());
1692 auto verif = Verifier();
1697 verif.Expect(3,
true);
1699 bool expected_cq_result =
true;
1700 bool ignore_cq_result =
false;
1701 bool want_done_tag =
false;
1703 int got_tag, got_tag2;
1704 bool tag_3_done =
false;
1707 srv_ctx.TryCancel();
1708 verif.Expect(11,
true);
1712 expected_cq_result =
false;
1713 ignore_cq_result =
true;
1716 got_tag = verif.Next(
cq_.get(), ignore_cq_result);
1717 GPR_ASSERT(((got_tag == 3) && !tag_3_done) || (got_tag == 11));
1721 }
while (got_tag != 11);
1728 server_try_cancel_thd =
1729 new std::thread([&srv_ctx] { srv_ctx.TryCancel(); });
1734 ignore_cq_result =
true;
1737 want_done_tag =
true;
1738 verif.Expect(11,
true);
1741 srv_stream.Read(&recv_request,
tag(4));
1742 verif.Expect(4, expected_cq_result);
1743 got_tag = tag_3_done ? 3 : verif.Next(
cq_.get(), ignore_cq_result);
1744 got_tag2 = verif.Next(
cq_.get(), ignore_cq_result);
1745 GPR_ASSERT((got_tag == 3) || (got_tag == 4) ||
1746 (got_tag == 11 && want_done_tag));
1747 GPR_ASSERT((got_tag2 == 3) || (got_tag2 == 4) ||
1748 (got_tag2 == 11 && want_done_tag));
1751 if (got_tag + got_tag2 != 7) {
1753 want_done_tag =
false;
1754 got_tag = verif.Next(
cq_.get(), ignore_cq_result);
1755 GPR_ASSERT((got_tag == 3) || (got_tag == 4));
1758 send_response.set_message(
"Pong");
1759 srv_stream.Write(send_response,
tag(5));
1760 verif.Expect(5, expected_cq_result);
1762 cli_stream->Read(&recv_response,
tag(6));
1763 verif.Expect(6, expected_cq_result);
1764 got_tag = verif.Next(
cq_.get(), ignore_cq_result);
1765 got_tag2 = verif.Next(
cq_.get(), ignore_cq_result);
1766 GPR_ASSERT((got_tag == 5) || (got_tag == 6) ||
1767 (got_tag == 11 && want_done_tag));
1768 GPR_ASSERT((got_tag2 == 5) || (got_tag2 == 6) ||
1769 (got_tag2 == 11 && want_done_tag));
1772 if (got_tag + got_tag2 != 11) {
1774 want_done_tag =
false;
1775 got_tag = verif.Next(
cq_.get(), ignore_cq_result);
1776 GPR_ASSERT((got_tag == 5) || (got_tag == 6));
1780 cli_stream->WritesDone(
tag(7));
1781 verif.Expect(7,
true);
1784 bool ignore_cq_wd_result =
1786 got_tag = verif.Next(
cq_.get(), ignore_cq_wd_result);
1787 GPR_ASSERT((got_tag == 7) || (got_tag == 11 && want_done_tag));
1788 if (got_tag == 11) {
1790 want_done_tag =
false;
1792 EXPECT_EQ(verif.Next(
cq_.get(), ignore_cq_wd_result), 7);
1799 srv_stream.Read(&recv_request,
tag(8));
1800 verif.Expect(8,
false);
1801 got_tag = verif.Next(
cq_.get(), ignore_cq_result);
1802 GPR_ASSERT((got_tag == 8) || (got_tag == 11 && want_done_tag));
1803 if (got_tag == 11) {
1805 want_done_tag =
false;
1810 if (server_try_cancel_thd !=
nullptr) {
1811 server_try_cancel_thd->join();
1812 delete server_try_cancel_thd;
1816 srv_ctx.TryCancel();
1817 want_done_tag =
true;
1818 verif.Expect(11,
true);
1821 if (want_done_tag) {
1822 verif.Verify(
cq_.get());
1824 want_done_tag =
false;
1832 Verifier().Expect(9,
false).Verify(
cq_.get());
1834 cli_stream->Finish(&recv_status,
tag(10));
1835 Verifier().Expect(10,
true).Verify(
cq_.get());
1841 TEST_P(AsyncEnd2endServerTryCancelTest, ClientStreamingServerTryCancelBefore) {
1845 TEST_P(AsyncEnd2endServerTryCancelTest, ClientStreamingServerTryCancelDuring) {
1849 TEST_P(AsyncEnd2endServerTryCancelTest, ClientStreamingServerTryCancelAfter) {
1853 TEST_P(AsyncEnd2endServerTryCancelTest, ServerStreamingServerTryCancelBefore) {
1857 TEST_P(AsyncEnd2endServerTryCancelTest, ServerStreamingServerTryCancelDuring) {
1861 TEST_P(AsyncEnd2endServerTryCancelTest, ServerStreamingServerTryCancelAfter) {
1865 TEST_P(AsyncEnd2endServerTryCancelTest, ServerBidiStreamingTryCancelBefore) {
1869 TEST_P(AsyncEnd2endServerTryCancelTest, ServerBidiStreamingTryCancelDuring) {
1873 TEST_P(AsyncEnd2endServerTryCancelTest, ServerBidiStreamingTryCancelAfter) {
1878 bool test_message_size_limit) {
1880 std::vector<std::string> credentials_types;
1881 std::vector<std::string> messages;
1883 auto insec_ok = [] {
1894 for (
auto sec = sec_list.begin(); sec != sec_list.end(); sec++) {
1895 credentials_types.push_back(*sec);
1899 messages.push_back(
"Hello");
1900 if (test_message_size_limit) {
1904 for (
size_t i = 0;
i <
k * 1024; ++
i) {
1905 char c =
'a' + (
i % 26);
1908 messages.push_back(big_msg);
1922 for (
auto msg = messages.begin();
msg != messages.end();
msg++) {
1923 for (
auto cred = credentials_types.begin();
1924 cred != credentials_types.end(); ++cred) {
1939 AsyncEnd2endServerTryCancelTest,