20 #include <condition_variable>
26 #include <gtest/gtest.h>
28 #include "absl/memory/memory.h"
42 #include "src/proto/grpc/testing/echo.grpc.pb.h"
59 TestScenario(
bool serve_callback, Protocol
protocol,
bool intercept,
73 return out <<
"TestScenario{callback_server="
74 << (
scenario.callback_server ?
"true" :
"false") <<
",protocol="
76 <<
",intercept=" << (
scenario.use_interceptors ?
"true" :
"false")
77 <<
",creds=" <<
scenario.credentials_type <<
"}";
81 std::ostringstream
out;
86 class ClientCallbackEnd2endTest
89 ClientCallbackEnd2endTest() { GetParam().Log(); }
91 void SetUp()
override {
111 std::unique_ptr<experimental::ServerInterceptorFactoryInterface>>
114 creators.reserve(20);
115 for (
auto i = 0;
i < 20;
i++) {
116 creators.push_back(absl::make_unique<PhonyInterceptorFactory>());
126 std::unique_ptr<experimental::ClientInterceptorFactoryInterface>
127 interceptor =
nullptr) {
128 ChannelArguments
args;
132 if (interceptor !=
nullptr) interceptors.push_back(
std::move(interceptor));
137 channel_creds,
args);
160 void TearDown()
override {
172 void SendRpcs(
int num_rpcs,
bool with_binary_metadata) {
174 for (
int i = 0;
i < num_rpcs;
i++) {
177 ClientContext cli_ctx;
179 test_string +=
"Hello world. ";
180 request.set_message(test_string);
182 if (with_binary_metadata) {
183 request.mutable_param()->set_echo_metadata(
true);
184 char bytes[8] = {
'\0',
'\1',
'\2',
'\3',
185 '\4',
'\5',
'\6',
static_cast<char>(
i)};
187 cli_ctx.AddMetadata(
"custom-bin", val);
193 std::condition_variable
cv;
195 stub_->async()->Echo(
198 with_binary_metadata](
Status s) {
202 if (with_binary_metadata) {
204 1u, cli_ctx.GetServerTrailingMetadata().count(
"custom-bin"));
209 std::lock_guard<std::mutex>
l(
mu);
213 std::unique_lock<std::mutex>
l(
mu);
220 void SendRpcsGeneric(
int num_rpcs,
bool maybe_except,
221 const char* suffix_for_stats) {
222 const std::string kMethodName(
"/grpc.testing.EchoTestService/Echo");
224 for (
int i = 0;
i < num_rpcs;
i++) {
226 std::unique_ptr<ByteBuffer> send_buf;
228 ClientContext cli_ctx;
230 test_string +=
"Hello world. ";
231 request.set_message(test_string);
235 std::condition_variable
cv;
237 StubOptions
options(suffix_for_stats);
239 &cli_ctx, kMethodName,
options, send_buf.get(), &recv_buf,
243 EchoResponse response;
244 EXPECT_TRUE(ParseFromByteBuffer(&recv_buf, &response));
245 EXPECT_EQ(request.message(), response.message());
246 std::lock_guard<std::mutex> l(mu);
249 #if GRPC_ALLOW_EXCEPTIONS
257 std::unique_lock<std::mutex>
l(
mu);
264 void SendGenericEchoAsBidi(
int num_rpcs,
int reuses,
bool do_writes_done,
265 const char* suffix_for_stats) {
266 const std::string kMethodName(
"/grpc.testing.EchoTestService/Echo");
268 for (
int i = 0;
i < num_rpcs;
i++) {
269 test_string +=
"Hello world. ";
274 int reuses,
bool do_writes_done)
275 : reuses_remaining_(reuses), do_writes_done_(do_writes_done) {
277 if (reuses_remaining_ > 0) {
278 cli_ctx_ = absl::make_unique<ClientContext>();
280 StubOptions
options(suffix_for_stats);
281 test->generic_stub_->PrepareBidiStreamingCall(
285 StartWrite(send_buf_.get());
286 StartRead(&recv_buf_);
289 std::unique_lock<std::mutex>
l(
mu_);
296 void OnWriteDone(
bool )
override {
297 if (do_writes_done_) {
301 void OnReadDone(
bool )
override {
306 void OnDone(
const Status& s)
override {
311 std::unique_lock<std::mutex>
l(
mu_);
318 std::unique_ptr<ByteBuffer> send_buf_;
319 ByteBuffer recv_buf_;
320 std::unique_ptr<ClientContext>
cli_ctx_;
321 int reuses_remaining_;
324 std::condition_variable
cv_;
326 const bool do_writes_done_;
329 Client rpc(
this, kMethodName, suffix_for_stats, test_string, reuses,
338 std::unique_ptr<grpc::testing::EchoTestService::Stub>
stub_;
346 TEST_P(ClientCallbackEnd2endTest, SimpleRpc) {
351 TEST_P(ClientCallbackEnd2endTest, SimpleRpcExpectedError) {
356 ClientContext cli_ctx;
357 ErrorStatus error_status;
359 request.set_message(
"Hello failure");
360 error_status.set_code(1);
361 error_status.set_error_message(
"cancel error message");
362 *
request.mutable_param()->mutable_expected_error() = error_status;
365 std::condition_variable
cv;
371 EXPECT_EQ(error_status.code(),
s.error_code());
374 std::lock_guard<std::mutex>
l(
mu);
379 std::unique_lock<std::mutex>
l(
mu);
385 TEST_P(ClientCallbackEnd2endTest, SimpleRpcUnderLockNested) {
392 std::condition_variable
cv;
396 ClientContext cli_ctx;
398 RpcState() =
default;
402 std::lock_guard<std::mutex> lock(
mu);
427 nested_call(
index + 1);
436 std::unique_lock<std::mutex>
l(
state.mu);
437 while (!
state.done) {
444 TEST_P(ClientCallbackEnd2endTest, SimpleRpcUnderLock) {
447 std::condition_variable
cv;
450 request.set_message(
"Hello locked world.");
452 ClientContext cli_ctx;
454 std::lock_guard<std::mutex>
l(
mu);
457 std::lock_guard<std::mutex>
l(
mu);
464 std::unique_lock<std::mutex>
l(
mu);
470 TEST_P(ClientCallbackEnd2endTest, SequentialRpcs) {
475 TEST_P(ClientCallbackEnd2endTest, SendClientInitialMetadata) {
479 ClientContext cli_ctx;
485 std::condition_variable
cv;
487 stub_->async()->CheckClientInitialMetadata(
491 std::lock_guard<std::mutex>
l(
mu);
495 std::unique_lock<std::mutex>
l(
mu);
501 TEST_P(ClientCallbackEnd2endTest, SimpleRpcWithBinaryMetadata) {
506 TEST_P(ClientCallbackEnd2endTest, SequentialRpcsWithVariedBinaryMetadataValue) {
511 TEST_P(ClientCallbackEnd2endTest, SequentialGenericRpcs) {
512 ResetStub(absl::make_unique<TestInterceptorFactory>(
513 "/grpc.testing.EchoTestService/Echo",
nullptr));
514 SendRpcsGeneric(10,
false,
nullptr);
517 TEST_P(ClientCallbackEnd2endTest, SequentialGenericRpcsWithSuffix) {
518 ResetStub(absl::make_unique<TestInterceptorFactory>(
519 "/grpc.testing.EchoTestService/Echo",
"TestSuffix"));
520 SendRpcsGeneric(10,
false,
"TestSuffix");
523 TEST_P(ClientCallbackEnd2endTest, SequentialGenericRpcsAsBidi) {
524 ResetStub(absl::make_unique<TestInterceptorFactory>(
525 "/grpc.testing.EchoTestService/Echo",
nullptr));
526 SendGenericEchoAsBidi(10, 1,
true,
530 TEST_P(ClientCallbackEnd2endTest, SequentialGenericRpcsAsBidiWithSuffix) {
531 ResetStub(absl::make_unique<TestInterceptorFactory>(
532 "/grpc.testing.EchoTestService/Echo",
"TestSuffix"));
533 SendGenericEchoAsBidi(10, 1,
true,
"TestSuffix");
536 TEST_P(ClientCallbackEnd2endTest, SequentialGenericRpcsAsBidiWithReactorReuse) {
538 SendGenericEchoAsBidi(10, 10,
true,
542 TEST_P(ClientCallbackEnd2endTest, GenericRpcNoWritesDone) {
544 SendGenericEchoAsBidi(1, 1,
false,
548 #if GRPC_ALLOW_EXCEPTIONS
549 TEST_P(ClientCallbackEnd2endTest, ExceptingRpc) {
551 SendRpcsGeneric(10,
true,
nullptr);
555 TEST_P(ClientCallbackEnd2endTest, MultipleRpcsWithVariedBinaryMetadataValue) {
557 std::vector<std::thread>
threads;
559 for (
int i = 0;
i < 10; ++
i) {
560 threads.emplace_back([
this] { SendRpcs(10,
true); });
562 for (
int i = 0;
i < 10; ++
i) {
567 TEST_P(ClientCallbackEnd2endTest, MultipleRpcs) {
569 std::vector<std::thread>
threads;
571 for (
int i = 0;
i < 10; ++
i) {
572 threads.emplace_back([
this] { SendRpcs(10,
false); });
574 for (
int i = 0;
i < 10; ++
i) {
579 TEST_P(ClientCallbackEnd2endTest, CancelRpcBeforeStart) {
588 std::condition_variable
cv;
594 std::lock_guard<std::mutex>
l(
mu);
598 std::unique_lock<std::mutex>
l(
mu);
607 TEST_P(ClientCallbackEnd2endTest, RequestEchoServerCancel) {
617 std::condition_variable
cv;
623 std::lock_guard<std::mutex>
l(
mu);
627 std::unique_lock<std::mutex>
l(
mu);
633 struct ClientCancelInfo {
637 ClientCancelInfo() :
cancel{
false} {}
643 WriteClient(grpc::testing::EchoTestService::Stub*
stub,
645 int num_msgs_to_send, ClientCancelInfo client_cancel = {})
650 for (
int i = 0;
i < num_msgs_to_send;
i++) {
658 context_.set_initial_metadata_corked(
true);
664 void OnWriteDone(
bool ok)
override {
670 void OnDone(
const Status& s)
override {
702 std::unique_lock<std::mutex>
l(
mu_);
707 std::unique_lock<std::mutex>
l(
mu_);
721 StartWriteLast(&
request_, WriteOptions());
733 std::condition_variable
cv_;
737 TEST_P(ClientCallbackEnd2endTest, RequestStream) {
747 TEST_P(ClientCallbackEnd2endTest, ClientCancelsRequestStream) {
758 TEST_P(ClientCallbackEnd2endTest, RequestStreamServerCancelBeforeReads) {
769 TEST_P(ClientCallbackEnd2endTest, RequestStreamServerCancelDuringRead) {
781 TEST_P(ClientCallbackEnd2endTest, RequestStreamServerCancelAfterReads) {
791 TEST_P(ClientCallbackEnd2endTest, UnaryReactor) {
795 explicit UnaryClient(grpc::testing::EchoTestService::Stub*
stub) {
796 cli_ctx_.AddMetadata(
"key1",
"val1");
797 cli_ctx_.AddMetadata(
"key2",
"val2");
798 request_.mutable_param()->set_echo_metadata_initially(
true);
799 request_.set_message(
"Hello metadata");
803 void OnReadInitialMetadataDone(
bool ok)
override {
813 initial_metadata_done_ =
true;
815 void OnDone(
const Status& s)
override {
820 std::unique_lock<std::mutex>
l(
mu_);
825 std::unique_lock<std::mutex>
l(
mu_);
836 std::condition_variable
cv_;
838 bool initial_metadata_done_{
false};
849 TEST_P(ClientCallbackEnd2endTest, GenericUnaryReactor) {
850 const std::string kMethodName(
"/grpc.testing.EchoTestService/Echo");
851 constexpr
char kSuffixForStats[] =
"TestSuffixForStats";
853 absl::make_unique<TestInterceptorFactory>(kMethodName, kSuffixForStats));
857 const char* suffix_for_stats) {
858 cli_ctx_.AddMetadata(
"key1",
"val1");
859 cli_ctx_.AddMetadata(
"key2",
"val2");
860 request_.mutable_param()->set_echo_metadata_initially(
true);
861 request_.set_message(
"Hello metadata");
864 StubOptions
options(suffix_for_stats);
869 void OnReadInitialMetadataDone(
bool ok)
override {
879 initial_metadata_done_ =
true;
881 void OnDone(
const Status& s)
override {
888 std::unique_lock<std::mutex>
l(
mu_);
893 std::unique_lock<std::mutex>
l(
mu_);
901 std::unique_ptr<ByteBuffer> send_buf_;
902 ByteBuffer recv_buf_;
905 std::condition_variable
cv_;
907 bool initial_metadata_done_{
false};
920 ReadClient(grpc::testing::EchoTestService::Stub*
stub,
922 ClientCancelInfo client_cancel = {})
929 request_.set_message(
"Hello client ");
940 void OnReadDone(
bool ok)
override {
959 void OnDone(
const Status& s)
override {
995 std::unique_lock<std::mutex>
l(
mu_);
1000 std::unique_lock<std::mutex>
l(
mu_);
1014 std::condition_variable
cv_;
1018 TEST_P(ClientCallbackEnd2endTest, ResponseStream) {
1028 TEST_P(ClientCallbackEnd2endTest, ClientCancelsResponseStream) {
1037 TEST_P(ClientCallbackEnd2endTest, ResponseStreamServerCancelBefore) {
1048 TEST_P(ClientCallbackEnd2endTest, ResponseStreamServerCancelDuring) {
1060 TEST_P(ClientCallbackEnd2endTest, ResponseStreamServerCancelAfter) {
1074 int num_msgs_to_send,
bool cork_metadata,
bool first_write_async,
1075 ClientCancelInfo client_cancel = {})
1084 request_.set_message(
"Hello fren ");
1085 context_.set_initial_metadata_corked(cork_metadata);
1087 MaybeAsyncWrite(first_write_async);
1164 std::unique_lock<std::mutex>
l(
mu_);
1169 std::unique_lock<std::mutex>
l(
mu_);
1176 void MaybeAsyncWrite(
bool first_write_async) {
1177 if (first_write_async) {
1215 std::condition_variable
cv_;
1223 TEST_P(ClientCallbackEnd2endTest, BidiStream) {
1235 TEST_P(ClientCallbackEnd2endTest, BidiStreamFirstWriteAsync) {
1247 TEST_P(ClientCallbackEnd2endTest, BidiStreamCorked) {
1259 TEST_P(ClientCallbackEnd2endTest, BidiStreamCorkedFirstWriteAsync) {
1271 TEST_P(ClientCallbackEnd2endTest, ClientCancelsBidiStream) {
1276 ClientCancelInfo(2));
1285 TEST_P(ClientCallbackEnd2endTest, BidiStreamServerCancelBefore) {
1298 TEST_P(ClientCallbackEnd2endTest, BidiStreamServerCancelDuring) {
1312 TEST_P(ClientCallbackEnd2endTest, BidiStreamServerCancelAfter) {
1323 TEST_P(ClientCallbackEnd2endTest, SimultaneousReadAndWritesDone) {
1327 explicit Client(grpc::testing::EchoTestService::Stub*
stub) {
1328 request_.set_message(
"Hello bidi ");
1333 void OnReadDone(
bool ok)
override {
1337 void OnWriteDone(
bool ok)
override {
1343 void OnDone(
const Status& s)
override {
1346 std::unique_lock<std::mutex>
l(
mu_);
1351 std::unique_lock<std::mutex>
l(
mu_);
1362 std::condition_variable
cv_;
1369 TEST_P(ClientCallbackEnd2endTest, UnimplementedRpc) {
1370 ChannelArguments
args;
1373 std::shared_ptr<Channel>
channel =
1378 std::unique_ptr<grpc::testing::UnimplementedEchoService::Stub>
stub;
1379 stub = grpc::testing::UnimplementedEchoService::NewStub(
channel);
1382 ClientContext cli_ctx;
1383 request.set_message(
"Hello world.");
1385 std::condition_variable
cv;
1387 stub->async()->Unimplemented(
1392 std::lock_guard<std::mutex>
l(
mu);
1396 std::unique_lock<std::mutex>
l(
mu);
1402 TEST_P(ClientCallbackEnd2endTest, TestTrailersOnlyOnError) {
1412 explicit Reactor(grpc::testing::EchoTestService::Stub*
stub) {
1417 std::unique_lock<std::mutex>
l(
mu_);
1425 void OnDone(
const Status& s)
override {
1428 std::unique_lock<std::mutex>
l(
mu_);
1430 done_cv_.notify_one();
1435 std::condition_variable done_cv_;
1442 TEST_P(ClientCallbackEnd2endTest,
1443 ResponseStreamExtraReactionFlowReadsUntilDone) {
1445 class ReadAllIncomingDataClient
1448 explicit ReadAllIncomingDataClient(
1449 grpc::testing::EchoTestService::Stub*
stub) {
1450 request_.set_message(
"Hello client ");
1453 bool WaitForReadDone() {
1454 std::unique_lock<std::mutex>
l(
mu_);
1455 while (!read_done_) {
1462 std::unique_lock<std::mutex>
l(
mu_);
1469 void RemoveHoldUnderLock() {
1470 std::unique_lock<std::mutex>
l(
mu_);
1474 std::unique_lock<std::mutex>
l(
mu_);
1479 void OnReadDone(
bool ok)
override {
1480 std::unique_lock<std::mutex>
l(
mu_);
1485 void OnDone(
const Status& s)
override {
1486 std::unique_lock<std::mutex>
l(
mu_);
1489 done_cv_.notify_one();
1495 bool read_ok_ =
false;
1496 bool read_done_ =
false;
1499 std::condition_variable done_cv_;
1504 int reads_complete = 0;
1509 bool read_ok =
true;
1512 read_ok =
client.WaitForReadDone();
1517 client.RemoveHoldUnderLock();
1525 #if TARGET_OS_IPHONE
1531 std::vector<std::string> credentials_types{
1533 auto insec_ok = [] {
1539 if (test_insecure && insec_ok()) {
1544 bool barr[]{
false,
true};
1546 for (Protocol p : parr) {
1547 for (
const auto& cred : credentials_types) {