async_end2end_test.cc
Go to the documentation of this file.
1 /*
2  *
3  * Copyright 2015 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
19 #include <cinttypes>
20 #include <memory>
21 #include <thread>
22 
23 #include "absl/memory/memory.h"
24 
25 #include <grpc/grpc.h>
26 #include <grpc/support/alloc.h>
27 #include <grpc/support/log.h>
28 #include <grpc/support/time.h>
29 #include <grpcpp/channel.h>
30 #include <grpcpp/client_context.h>
31 #include <grpcpp/create_channel.h>
33 #include <grpcpp/server.h>
34 #include <grpcpp/server_builder.h>
35 #include <grpcpp/server_context.h>
36 
38 #include "src/core/lib/gpr/tls.h"
40 #include "src/proto/grpc/health/v1/health.grpc.pb.h"
41 #include "src/proto/grpc/testing/duplicate/echo_duplicate.grpc.pb.h"
42 #include "src/proto/grpc/testing/echo.grpc.pb.h"
43 #include "test/core/util/port.h"
47 
48 #ifdef GRPC_POSIX_SOCKET_EV
50 #endif // GRPC_POSIX_SOCKET_EV
51 
52 #include <gtest/gtest.h>
53 
54 using grpc::testing::EchoRequest;
55 using grpc::testing::EchoResponse;
56 using std::chrono::system_clock;
57 
58 namespace grpc {
59 namespace testing {
60 
61 namespace {
62 
63 void* tag(int t) { return reinterpret_cast<void*>(t); }
64 int detag(void* p) { return static_cast<int>(reinterpret_cast<intptr_t>(p)); }
65 
66 class Verifier {
67  public:
68  Verifier() : lambda_run_(false) {}
69  // Expect sets the expected ok value for a specific tag
70  Verifier& Expect(int i, bool expect_ok) {
71  return ExpectUnless(i, expect_ok, false);
72  }
73  // ExpectUnless sets the expected ok value for a specific tag
74  // unless the tag was already marked seen (as a result of ExpectMaybe)
75  Verifier& ExpectUnless(int i, bool expect_ok, bool seen) {
76  if (!seen) {
77  expectations_[tag(i)] = expect_ok;
78  }
79  return *this;
80  }
81  // ExpectMaybe sets the expected ok value for a specific tag, but does not
82  // require it to appear
83  // If it does, sets *seen to true
84  Verifier& ExpectMaybe(int i, bool expect_ok, bool* seen) {
85  if (!*seen) {
86  maybe_expectations_[tag(i)] = MaybeExpect{expect_ok, seen};
87  }
88  return *this;
89  }
90 
91  // Next waits for 1 async tag to complete, checks its
92  // expectations, and returns the tag
93  int Next(CompletionQueue* cq, bool ignore_ok) {
94  bool ok;
95  void* got_tag;
96  EXPECT_TRUE(cq->Next(&got_tag, &ok));
97  GotTag(got_tag, ok, ignore_ok);
98  return detag(got_tag);
99  }
100 
101  template <typename T>
103  CompletionQueue* cq, void** got_tag, bool* ok, T deadline,
104  std::function<void(void)> lambda) {
105  if (lambda_run_) {
106  return cq->AsyncNext(got_tag, ok, deadline);
107  } else {
108  lambda_run_ = true;
109  return cq->DoThenAsyncNext(lambda, got_tag, ok, deadline);
110  }
111  }
112 
113  // Verify keeps calling Next until all currently set
114  // expected tags are complete
115  void Verify(CompletionQueue* cq) { Verify(cq, false); }
116 
117  // This version of Verify allows optionally ignoring the
118  // outcome of the expectation
119  void Verify(CompletionQueue* cq, bool ignore_ok) {
120  GPR_ASSERT(!expectations_.empty() || !maybe_expectations_.empty());
121  while (!expectations_.empty()) {
122  Next(cq, ignore_ok);
123  }
124  maybe_expectations_.clear();
125  }
126 
127  // This version of Verify stops after a certain deadline
128  void Verify(CompletionQueue* cq,
130  if (expectations_.empty()) {
131  bool ok;
132  void* got_tag;
133  EXPECT_EQ(cq->AsyncNext(&got_tag, &ok, deadline),
135  } else {
136  while (!expectations_.empty()) {
137  bool ok;
138  void* got_tag;
139  EXPECT_EQ(cq->AsyncNext(&got_tag, &ok, deadline),
141  GotTag(got_tag, ok, false);
142  }
143  }
144  maybe_expectations_.clear();
145  }
146 
147  // This version of Verify stops after a certain deadline, and uses the
148  // DoThenAsyncNext API
149  // to call the lambda
150  void Verify(CompletionQueue* cq,
152  const std::function<void(void)>& lambda) {
153  if (expectations_.empty()) {
154  bool ok;
155  void* got_tag;
156  EXPECT_EQ(DoOnceThenAsyncNext(cq, &got_tag, &ok, deadline, lambda),
158  } else {
159  while (!expectations_.empty()) {
160  bool ok;
161  void* got_tag;
162  EXPECT_EQ(DoOnceThenAsyncNext(cq, &got_tag, &ok, deadline, lambda),
164  GotTag(got_tag, ok, false);
165  }
166  }
167  maybe_expectations_.clear();
168  }
169 
170  private:
171  void GotTag(void* got_tag, bool ok, bool ignore_ok) {
172  auto it = expectations_.find(got_tag);
173  if (it != expectations_.end()) {
174  if (!ignore_ok) {
175  EXPECT_EQ(it->second, ok);
176  }
177  expectations_.erase(it);
178  } else {
179  auto it2 = maybe_expectations_.find(got_tag);
180  if (it2 != maybe_expectations_.end()) {
181  if (it2->second.seen != nullptr) {
182  EXPECT_FALSE(*it2->second.seen);
183  *it2->second.seen = true;
184  }
185  if (!ignore_ok) {
186  EXPECT_EQ(it2->second.ok, ok);
187  }
188  maybe_expectations_.erase(it2);
189  } else {
190  gpr_log(GPR_ERROR, "Unexpected tag: %p", got_tag);
191  abort();
192  }
193  }
194  }
195 
196  struct MaybeExpect {
197  bool ok;
198  bool* seen;
199  };
200 
201  std::map<void*, bool> expectations_;
202  std::map<void*, MaybeExpect> maybe_expectations_;
204 };
205 
206 bool plugin_has_sync_methods(std::unique_ptr<ServerBuilderPlugin>& plugin) {
207  return plugin->has_sync_methods();
208 }
209 
210 // This class disables the server builder plugins that may add sync services to
211 // the server. If there are sync services, UnimplementedRpc test will triger
212 // the sync unknown rpc routine on the server side, rather than the async one
213 // that needs to be tested here.
214 class ServerBuilderSyncPluginDisabler : public grpc::ServerBuilderOption {
215  public:
216  void UpdateArguments(ChannelArguments* /*arg*/) override {}
217 
218  void UpdatePlugins(
219  std::vector<std::unique_ptr<ServerBuilderPlugin>>* plugins) override {
220  plugins->erase(std::remove_if(plugins->begin(), plugins->end(),
221  plugin_has_sync_methods),
222  plugins->end());
223  }
224 };
225 
226 class TestScenario {
227  public:
228  TestScenario(bool inproc_stub, const std::string& creds_type, bool hcs,
229  const std::string& content)
230  : inproc(inproc_stub),
232  credentials_type(creds_type),
234  void Log() const;
235  bool inproc;
239 };
240 
241 std::ostream& operator<<(std::ostream& out, const TestScenario& scenario) {
242  return out << "TestScenario{inproc=" << (scenario.inproc ? "true" : "false")
243  << ", credentials='" << scenario.credentials_type
244  << ", health_check_service="
245  << (scenario.health_check_service ? "true" : "false")
246  << "', message_size=" << scenario.message_content.size() << "}";
247 }
248 
249 void TestScenario::Log() const {
250  std::ostringstream out;
251  out << *this;
252  gpr_log(GPR_DEBUG, "%s", out.str().c_str());
253 }
254 
255 class HealthCheck : public health::v1::Health::Service {};
256 
257 class AsyncEnd2endTest : public ::testing::TestWithParam<TestScenario> {
258  protected:
259  AsyncEnd2endTest() { GetParam().Log(); }
260 
261  void SetUp() override {
263  server_address_ << "localhost:" << port_;
264 
265  // Setup server
266  BuildAndStartServer();
267  }
268 
269  void TearDown() override {
270  stub_.reset();
271  ServerShutdown();
273  }
274 
275  void ServerShutdown() {
276  std::thread t([this]() {
277  void* ignored_tag;
278  bool ignored_ok;
279  while (cq_->Next(&ignored_tag, &ignored_ok)) {
280  }
281  });
282  server_->Shutdown();
283  cq_->Shutdown();
284  t.join();
285  }
286 
287  void BuildAndStartServer() {
288  ServerBuilder builder;
289  auto server_creds = GetCredentialsProvider()->GetServerCredentials(
290  GetParam().credentials_type);
291  builder.AddListeningPort(server_address_.str(), server_creds);
292  service_ =
293  absl::make_unique<grpc::testing::EchoTestService::AsyncService>();
294  builder.RegisterService(service_.get());
295  if (GetParam().health_check_service) {
296  builder.RegisterService(&health_check_);
297  }
298  cq_ = builder.AddCompletionQueue();
299 
300  // TODO(zyc): make a test option to choose wheather sync plugins should be
301  // deleted
302  std::unique_ptr<ServerBuilderOption> sync_plugin_disabler(
303  new ServerBuilderSyncPluginDisabler());
304  builder.SetOption(move(sync_plugin_disabler));
305  server_ = builder.BuildAndStart();
306  }
307 
308  void ResetStub() {
309  ChannelArguments args;
310  auto channel_creds = GetCredentialsProvider()->GetChannelCredentials(
311  GetParam().credentials_type, &args);
312  std::shared_ptr<Channel> channel =
313  !(GetParam().inproc) ? grpc::CreateCustomChannel(server_address_.str(),
314  channel_creds, args)
315  : server_->InProcessChannel(args);
316  stub_ = grpc::testing::EchoTestService::NewStub(channel);
317  }
318 
319  void SendRpc(int num_rpcs) {
320  for (int i = 0; i < num_rpcs; i++) {
321  EchoRequest send_request;
322  EchoRequest recv_request;
323  EchoResponse send_response;
324  EchoResponse recv_response;
325  Status recv_status;
326 
327  ClientContext cli_ctx;
328  ServerContext srv_ctx;
329  grpc::ServerAsyncResponseWriter<EchoResponse> response_writer(&srv_ctx);
330 
331  send_request.set_message(GetParam().message_content);
332  std::unique_ptr<ClientAsyncResponseReader<EchoResponse>> response_reader(
333  stub_->AsyncEcho(&cli_ctx, send_request, cq_.get()));
334 
335  service_->RequestEcho(&srv_ctx, &recv_request, &response_writer,
336  cq_.get(), cq_.get(), tag(2));
337 
338  response_reader->Finish(&recv_response, &recv_status, tag(4));
339 
340  Verifier().Expect(2, true).Verify(cq_.get());
341  EXPECT_EQ(send_request.message(), recv_request.message());
342 
343  send_response.set_message(recv_request.message());
344  response_writer.Finish(send_response, Status::OK, tag(3));
345  Verifier().Expect(3, true).Expect(4, true).Verify(cq_.get());
346 
347  EXPECT_EQ(send_response.message(), recv_response.message());
348  EXPECT_TRUE(recv_status.ok());
349  }
350  }
351 
352  std::unique_ptr<ServerCompletionQueue> cq_;
353  std::unique_ptr<grpc::testing::EchoTestService::Stub> stub_;
354  std::unique_ptr<Server> server_;
355  std::unique_ptr<grpc::testing::EchoTestService::AsyncService> service_;
356  HealthCheck health_check_;
357  std::ostringstream server_address_;
358  int port_;
359 };
360 
361 TEST_P(AsyncEnd2endTest, SimpleRpc) {
362  ResetStub();
363  SendRpc(1);
364 }
365 
366 TEST_P(AsyncEnd2endTest, SimpleRpcWithExpectedError) {
367  ResetStub();
368 
369  EchoRequest send_request;
370  EchoRequest recv_request;
371  EchoResponse send_response;
372  EchoResponse recv_response;
373  Status recv_status;
374 
375  ClientContext cli_ctx;
376  ServerContext srv_ctx;
377  grpc::ServerAsyncResponseWriter<EchoResponse> response_writer(&srv_ctx);
378  ErrorStatus error_status;
379 
380  send_request.set_message(GetParam().message_content);
381  error_status.set_code(1); // CANCELLED
382  error_status.set_error_message("cancel error message");
383  *send_request.mutable_param()->mutable_expected_error() = error_status;
384 
385  std::unique_ptr<ClientAsyncResponseReader<EchoResponse>> response_reader(
386  stub_->AsyncEcho(&cli_ctx, send_request, cq_.get()));
387 
388  srv_ctx.AsyncNotifyWhenDone(tag(5));
389  service_->RequestEcho(&srv_ctx, &recv_request, &response_writer, cq_.get(),
390  cq_.get(), tag(2));
391 
392  response_reader->Finish(&recv_response, &recv_status, tag(4));
393 
394  Verifier().Expect(2, true).Verify(cq_.get());
395  EXPECT_EQ(send_request.message(), recv_request.message());
396 
397  send_response.set_message(recv_request.message());
398  response_writer.Finish(
399  send_response,
400  Status(
401  static_cast<StatusCode>(recv_request.param().expected_error().code()),
402  recv_request.param().expected_error().error_message()),
403  tag(3));
404  Verifier().Expect(3, true).Expect(4, true).Expect(5, true).Verify(cq_.get());
405 
406  EXPECT_EQ(recv_response.message(), "");
407  EXPECT_EQ(recv_status.error_code(), error_status.code());
408  EXPECT_EQ(recv_status.error_message(), error_status.error_message());
409  EXPECT_FALSE(srv_ctx.IsCancelled());
410 }
411 
412 TEST_P(AsyncEnd2endTest, SequentialRpcs) {
413  ResetStub();
414  SendRpc(10);
415 }
416 
417 TEST_P(AsyncEnd2endTest, ReconnectChannel) {
418  // GRPC_CLIENT_CHANNEL_BACKUP_POLL_INTERVAL_MS is set to 100ms in main()
419  if (GetParam().inproc) {
420  return;
421  }
422  int poller_slowdown_factor = 1;
423 #ifdef GRPC_POSIX_SOCKET_EV
424  // It needs 2 pollset_works to reconnect the channel with polling engine
425  // "poll"
426  grpc_core::UniquePtr<char> poller = GPR_GLOBAL_CONFIG_GET(grpc_poll_strategy);
427  if (0 == strcmp(poller.get(), "poll")) {
428  poller_slowdown_factor = 2;
429  }
430 #endif // GRPC_POSIX_SOCKET_EV
431  ResetStub();
432  SendRpc(1);
433  ServerShutdown();
434  BuildAndStartServer();
435  // It needs more than GRPC_CLIENT_CHANNEL_BACKUP_POLL_INTERVAL_MS time to
436  // reconnect the channel.
440  300 * poller_slowdown_factor * grpc_test_slowdown_factor(),
441  GPR_TIMESPAN)));
442  SendRpc(1);
443 }
444 
445 // We do not need to protect notify because the use is synchronized.
446 void ServerWait(Server* server, int* notify) {
447  server->Wait();
448  *notify = 1;
449 }
450 TEST_P(AsyncEnd2endTest, WaitAndShutdownTest) {
451  int notify = 0;
452  std::thread wait_thread(&ServerWait, server_.get(), &notify);
453  ResetStub();
454  SendRpc(1);
455  EXPECT_EQ(0, notify);
456  ServerShutdown();
457  wait_thread.join();
458  EXPECT_EQ(1, notify);
459 }
460 
461 TEST_P(AsyncEnd2endTest, ShutdownThenWait) {
462  ResetStub();
463  SendRpc(1);
464  std::thread t([this]() { ServerShutdown(); });
465  server_->Wait();
466  t.join();
467 }
468 
469 // Test a simple RPC using the async version of Next
470 TEST_P(AsyncEnd2endTest, AsyncNextRpc) {
471  ResetStub();
472 
473  EchoRequest send_request;
474  EchoRequest recv_request;
475  EchoResponse send_response;
476  EchoResponse recv_response;
477  Status recv_status;
478 
479  ClientContext cli_ctx;
480  ServerContext srv_ctx;
481  grpc::ServerAsyncResponseWriter<EchoResponse> response_writer(&srv_ctx);
482 
483  send_request.set_message(GetParam().message_content);
484  std::unique_ptr<ClientAsyncResponseReader<EchoResponse>> response_reader(
485  stub_->AsyncEcho(&cli_ctx, send_request, cq_.get()));
486 
491  Verifier().Verify(cq_.get(), time_now);
492  Verifier().Verify(cq_.get(), time_now);
493 
494  service_->RequestEcho(&srv_ctx, &recv_request, &response_writer, cq_.get(),
495  cq_.get(), tag(2));
496  response_reader->Finish(&recv_response, &recv_status, tag(4));
497 
498  Verifier().Expect(2, true).Verify(cq_.get(), time_limit);
499  EXPECT_EQ(send_request.message(), recv_request.message());
500 
501  send_response.set_message(recv_request.message());
502  response_writer.Finish(send_response, Status::OK, tag(3));
503  Verifier().Expect(3, true).Expect(4, true).Verify(
505 
506  EXPECT_EQ(send_response.message(), recv_response.message());
507  EXPECT_TRUE(recv_status.ok());
508 }
509 
510 // Test a simple RPC using the async version of Next
511 TEST_P(AsyncEnd2endTest, DoThenAsyncNextRpc) {
512  ResetStub();
513 
514  EchoRequest send_request;
515  EchoRequest recv_request;
516  EchoResponse send_response;
517  EchoResponse recv_response;
518  Status recv_status;
519 
520  ClientContext cli_ctx;
521  ServerContext srv_ctx;
522  grpc::ServerAsyncResponseWriter<EchoResponse> response_writer(&srv_ctx);
523 
524  send_request.set_message(GetParam().message_content);
525  std::unique_ptr<ClientAsyncResponseReader<EchoResponse>> response_reader(
526  stub_->AsyncEcho(&cli_ctx, send_request, cq_.get()));
527 
532  Verifier().Verify(cq_.get(), time_now);
533  Verifier().Verify(cq_.get(), time_now);
534 
535  auto resp_writer_ptr = &response_writer;
536  auto lambda_2 = [&, this, resp_writer_ptr]() {
537  service_->RequestEcho(&srv_ctx, &recv_request, resp_writer_ptr, cq_.get(),
538  cq_.get(), tag(2));
539  };
540  response_reader->Finish(&recv_response, &recv_status, tag(4));
541 
542  Verifier().Expect(2, true).Verify(cq_.get(), time_limit, lambda_2);
543  EXPECT_EQ(send_request.message(), recv_request.message());
544 
545  send_response.set_message(recv_request.message());
546  auto lambda_3 = [resp_writer_ptr, send_response]() {
547  resp_writer_ptr->Finish(send_response, Status::OK, tag(3));
548  };
549  Verifier().Expect(3, true).Expect(4, true).Verify(
551 
552  EXPECT_EQ(send_response.message(), recv_response.message());
553  EXPECT_TRUE(recv_status.ok());
554 }
555 
556 // Two pings and a final pong.
557 TEST_P(AsyncEnd2endTest, SimpleClientStreaming) {
558  ResetStub();
559 
560  EchoRequest send_request;
561  EchoRequest recv_request;
562  EchoResponse send_response;
563  EchoResponse recv_response;
564  Status recv_status;
565  ClientContext cli_ctx;
566  ServerContext srv_ctx;
567  ServerAsyncReader<EchoResponse, EchoRequest> srv_stream(&srv_ctx);
568 
569  send_request.set_message(GetParam().message_content);
570  std::unique_ptr<ClientAsyncWriter<EchoRequest>> cli_stream(
571  stub_->AsyncRequestStream(&cli_ctx, &recv_response, cq_.get(), tag(1)));
572 
573  service_->RequestRequestStream(&srv_ctx, &srv_stream, cq_.get(), cq_.get(),
574  tag(2));
575 
576  Verifier().Expect(2, true).Expect(1, true).Verify(cq_.get());
577 
578  cli_stream->Write(send_request, tag(3));
579  srv_stream.Read(&recv_request, tag(4));
580  Verifier().Expect(3, true).Expect(4, true).Verify(cq_.get());
581  EXPECT_EQ(send_request.message(), recv_request.message());
582 
583  cli_stream->Write(send_request, tag(5));
584  srv_stream.Read(&recv_request, tag(6));
585  Verifier().Expect(5, true).Expect(6, true).Verify(cq_.get());
586 
587  EXPECT_EQ(send_request.message(), recv_request.message());
588  cli_stream->WritesDone(tag(7));
589  srv_stream.Read(&recv_request, tag(8));
590  Verifier().Expect(7, true).Expect(8, false).Verify(cq_.get());
591 
592  send_response.set_message(recv_request.message());
593  srv_stream.Finish(send_response, Status::OK, tag(9));
594  cli_stream->Finish(&recv_status, tag(10));
595  Verifier().Expect(9, true).Expect(10, true).Verify(cq_.get());
596 
597  EXPECT_EQ(send_response.message(), recv_response.message());
598  EXPECT_TRUE(recv_status.ok());
599 }
600 
601 // Two pings and a final pong.
602 TEST_P(AsyncEnd2endTest, SimpleClientStreamingWithCoalescingApi) {
603  ResetStub();
604 
605  EchoRequest send_request;
606  EchoRequest recv_request;
607  EchoResponse send_response;
608  EchoResponse recv_response;
609  Status recv_status;
610  ClientContext cli_ctx;
611  ServerContext srv_ctx;
612  ServerAsyncReader<EchoResponse, EchoRequest> srv_stream(&srv_ctx);
613 
614  send_request.set_message(GetParam().message_content);
615  cli_ctx.set_initial_metadata_corked(true);
616  // tag:1 never comes up since no op is performed
617  std::unique_ptr<ClientAsyncWriter<EchoRequest>> cli_stream(
618  stub_->AsyncRequestStream(&cli_ctx, &recv_response, cq_.get(), tag(1)));
619 
620  service_->RequestRequestStream(&srv_ctx, &srv_stream, cq_.get(), cq_.get(),
621  tag(2));
622 
623  cli_stream->Write(send_request, tag(3));
624 
625  bool seen3 = false;
626 
627  Verifier().Expect(2, true).ExpectMaybe(3, true, &seen3).Verify(cq_.get());
628 
629  srv_stream.Read(&recv_request, tag(4));
630 
631  Verifier().ExpectUnless(3, true, seen3).Expect(4, true).Verify(cq_.get());
632 
633  EXPECT_EQ(send_request.message(), recv_request.message());
634 
635  cli_stream->WriteLast(send_request, WriteOptions(), tag(5));
636  srv_stream.Read(&recv_request, tag(6));
637  Verifier().Expect(5, true).Expect(6, true).Verify(cq_.get());
638  EXPECT_EQ(send_request.message(), recv_request.message());
639 
640  srv_stream.Read(&recv_request, tag(7));
641  Verifier().Expect(7, false).Verify(cq_.get());
642 
643  send_response.set_message(recv_request.message());
644  srv_stream.Finish(send_response, Status::OK, tag(8));
645  cli_stream->Finish(&recv_status, tag(9));
646  Verifier().Expect(8, true).Expect(9, true).Verify(cq_.get());
647 
648  EXPECT_EQ(send_response.message(), recv_response.message());
649  EXPECT_TRUE(recv_status.ok());
650 }
651 
652 // One ping, two pongs.
653 TEST_P(AsyncEnd2endTest, SimpleServerStreaming) {
654  ResetStub();
655 
656  EchoRequest send_request;
657  EchoRequest recv_request;
658  EchoResponse send_response;
659  EchoResponse recv_response;
660  Status recv_status;
661  ClientContext cli_ctx;
662  ServerContext srv_ctx;
663  ServerAsyncWriter<EchoResponse> srv_stream(&srv_ctx);
664 
665  send_request.set_message(GetParam().message_content);
666  std::unique_ptr<ClientAsyncReader<EchoResponse>> cli_stream(
667  stub_->AsyncResponseStream(&cli_ctx, send_request, cq_.get(), tag(1)));
668 
669  service_->RequestResponseStream(&srv_ctx, &recv_request, &srv_stream,
670  cq_.get(), cq_.get(), tag(2));
671 
672  Verifier().Expect(1, true).Expect(2, true).Verify(cq_.get());
673  EXPECT_EQ(send_request.message(), recv_request.message());
674 
675  send_response.set_message(recv_request.message());
676  srv_stream.Write(send_response, tag(3));
677  cli_stream->Read(&recv_response, tag(4));
678  Verifier().Expect(3, true).Expect(4, true).Verify(cq_.get());
679  EXPECT_EQ(send_response.message(), recv_response.message());
680 
681  srv_stream.Write(send_response, tag(5));
682  cli_stream->Read(&recv_response, tag(6));
683  Verifier().Expect(5, true).Expect(6, true).Verify(cq_.get());
684  EXPECT_EQ(send_response.message(), recv_response.message());
685 
686  srv_stream.Finish(Status::OK, tag(7));
687  cli_stream->Read(&recv_response, tag(8));
688  Verifier().Expect(7, true).Expect(8, false).Verify(cq_.get());
689 
690  cli_stream->Finish(&recv_status, tag(9));
691  Verifier().Expect(9, true).Verify(cq_.get());
692 
693  EXPECT_TRUE(recv_status.ok());
694 }
695 
696 // One ping, two pongs. Using WriteAndFinish API
697 TEST_P(AsyncEnd2endTest, SimpleServerStreamingWithCoalescingApiWAF) {
698  ResetStub();
699 
700  EchoRequest send_request;
701  EchoRequest recv_request;
702  EchoResponse send_response;
703  EchoResponse recv_response;
704  Status recv_status;
705  ClientContext cli_ctx;
706  ServerContext srv_ctx;
707  ServerAsyncWriter<EchoResponse> srv_stream(&srv_ctx);
708 
709  send_request.set_message(GetParam().message_content);
710  std::unique_ptr<ClientAsyncReader<EchoResponse>> cli_stream(
711  stub_->AsyncResponseStream(&cli_ctx, send_request, cq_.get(), tag(1)));
712 
713  service_->RequestResponseStream(&srv_ctx, &recv_request, &srv_stream,
714  cq_.get(), cq_.get(), tag(2));
715 
716  Verifier().Expect(1, true).Expect(2, true).Verify(cq_.get());
717  EXPECT_EQ(send_request.message(), recv_request.message());
718 
719  send_response.set_message(recv_request.message());
720  srv_stream.Write(send_response, tag(3));
721  cli_stream->Read(&recv_response, tag(4));
722  Verifier().Expect(3, true).Expect(4, true).Verify(cq_.get());
723  EXPECT_EQ(send_response.message(), recv_response.message());
724 
725  srv_stream.WriteAndFinish(send_response, WriteOptions(), Status::OK, tag(5));
726  cli_stream->Read(&recv_response, tag(6));
727  Verifier().Expect(5, true).Expect(6, true).Verify(cq_.get());
728  EXPECT_EQ(send_response.message(), recv_response.message());
729 
730  cli_stream->Read(&recv_response, tag(7));
731  Verifier().Expect(7, false).Verify(cq_.get());
732 
733  cli_stream->Finish(&recv_status, tag(8));
734  Verifier().Expect(8, true).Verify(cq_.get());
735 
736  EXPECT_TRUE(recv_status.ok());
737 }
738 
739 // One ping, two pongs. Using WriteLast API
740 TEST_P(AsyncEnd2endTest, SimpleServerStreamingWithCoalescingApiWL) {
741  ResetStub();
742 
743  EchoRequest send_request;
744  EchoRequest recv_request;
745  EchoResponse send_response;
746  EchoResponse recv_response;
747  Status recv_status;
748  ClientContext cli_ctx;
749  ServerContext srv_ctx;
750  ServerAsyncWriter<EchoResponse> srv_stream(&srv_ctx);
751 
752  send_request.set_message(GetParam().message_content);
753  std::unique_ptr<ClientAsyncReader<EchoResponse>> cli_stream(
754  stub_->AsyncResponseStream(&cli_ctx, send_request, cq_.get(), tag(1)));
755 
756  service_->RequestResponseStream(&srv_ctx, &recv_request, &srv_stream,
757  cq_.get(), cq_.get(), tag(2));
758 
759  Verifier().Expect(1, true).Expect(2, true).Verify(cq_.get());
760  EXPECT_EQ(send_request.message(), recv_request.message());
761 
762  send_response.set_message(recv_request.message());
763  srv_stream.Write(send_response, tag(3));
764  cli_stream->Read(&recv_response, tag(4));
765  Verifier().Expect(3, true).Expect(4, true).Verify(cq_.get());
766  EXPECT_EQ(send_response.message(), recv_response.message());
767 
768  srv_stream.WriteLast(send_response, WriteOptions(), tag(5));
769  cli_stream->Read(&recv_response, tag(6));
770  srv_stream.Finish(Status::OK, tag(7));
771  Verifier().Expect(5, true).Expect(6, true).Expect(7, true).Verify(cq_.get());
772  EXPECT_EQ(send_response.message(), recv_response.message());
773 
774  cli_stream->Read(&recv_response, tag(8));
775  Verifier().Expect(8, false).Verify(cq_.get());
776 
777  cli_stream->Finish(&recv_status, tag(9));
778  Verifier().Expect(9, true).Verify(cq_.get());
779 
780  EXPECT_TRUE(recv_status.ok());
781 }
782 
783 // One ping, one pong.
784 TEST_P(AsyncEnd2endTest, SimpleBidiStreaming) {
785  ResetStub();
786 
787  EchoRequest send_request;
788  EchoRequest recv_request;
789  EchoResponse send_response;
790  EchoResponse recv_response;
791  Status recv_status;
792  ClientContext cli_ctx;
793  ServerContext srv_ctx;
794  ServerAsyncReaderWriter<EchoResponse, EchoRequest> srv_stream(&srv_ctx);
795 
796  send_request.set_message(GetParam().message_content);
797  std::unique_ptr<ClientAsyncReaderWriter<EchoRequest, EchoResponse>>
798  cli_stream(stub_->AsyncBidiStream(&cli_ctx, cq_.get(), tag(1)));
799 
800  service_->RequestBidiStream(&srv_ctx, &srv_stream, cq_.get(), cq_.get(),
801  tag(2));
802 
803  Verifier().Expect(1, true).Expect(2, true).Verify(cq_.get());
804 
805  cli_stream->Write(send_request, tag(3));
806  srv_stream.Read(&recv_request, tag(4));
807  Verifier().Expect(3, true).Expect(4, true).Verify(cq_.get());
808  EXPECT_EQ(send_request.message(), recv_request.message());
809 
810  send_response.set_message(recv_request.message());
811  srv_stream.Write(send_response, tag(5));
812  cli_stream->Read(&recv_response, tag(6));
813  Verifier().Expect(5, true).Expect(6, true).Verify(cq_.get());
814  EXPECT_EQ(send_response.message(), recv_response.message());
815 
816  cli_stream->WritesDone(tag(7));
817  srv_stream.Read(&recv_request, tag(8));
818  Verifier().Expect(7, true).Expect(8, false).Verify(cq_.get());
819 
820  srv_stream.Finish(Status::OK, tag(9));
821  cli_stream->Finish(&recv_status, tag(10));
822  Verifier().Expect(9, true).Expect(10, true).Verify(cq_.get());
823 
824  EXPECT_TRUE(recv_status.ok());
825 }
826 
827 // One ping, one pong. Using server:WriteAndFinish api
828 TEST_P(AsyncEnd2endTest, SimpleBidiStreamingWithCoalescingApiWAF) {
829  ResetStub();
830 
831  EchoRequest send_request;
832  EchoRequest recv_request;
833  EchoResponse send_response;
834  EchoResponse recv_response;
835  Status recv_status;
836  ClientContext cli_ctx;
837  ServerContext srv_ctx;
838  ServerAsyncReaderWriter<EchoResponse, EchoRequest> srv_stream(&srv_ctx);
839 
840  send_request.set_message(GetParam().message_content);
841  cli_ctx.set_initial_metadata_corked(true);
842  std::unique_ptr<ClientAsyncReaderWriter<EchoRequest, EchoResponse>>
843  cli_stream(stub_->AsyncBidiStream(&cli_ctx, cq_.get(), tag(1)));
844 
845  service_->RequestBidiStream(&srv_ctx, &srv_stream, cq_.get(), cq_.get(),
846  tag(2));
847 
848  cli_stream->WriteLast(send_request, WriteOptions(), tag(3));
849 
850  bool seen3 = false;
851 
852  Verifier().Expect(2, true).ExpectMaybe(3, true, &seen3).Verify(cq_.get());
853 
854  srv_stream.Read(&recv_request, tag(4));
855 
856  Verifier().ExpectUnless(3, true, seen3).Expect(4, true).Verify(cq_.get());
857  EXPECT_EQ(send_request.message(), recv_request.message());
858 
859  srv_stream.Read(&recv_request, tag(5));
860  Verifier().Expect(5, false).Verify(cq_.get());
861 
862  send_response.set_message(recv_request.message());
863  srv_stream.WriteAndFinish(send_response, WriteOptions(), Status::OK, tag(6));
864  cli_stream->Read(&recv_response, tag(7));
865  Verifier().Expect(6, true).Expect(7, true).Verify(cq_.get());
866  EXPECT_EQ(send_response.message(), recv_response.message());
867 
868  cli_stream->Finish(&recv_status, tag(8));
869  Verifier().Expect(8, true).Verify(cq_.get());
870 
871  EXPECT_TRUE(recv_status.ok());
872 }
873 
874 // One ping, one pong. Using server:WriteLast api
875 TEST_P(AsyncEnd2endTest, SimpleBidiStreamingWithCoalescingApiWL) {
876  ResetStub();
877 
878  EchoRequest send_request;
879  EchoRequest recv_request;
880  EchoResponse send_response;
881  EchoResponse recv_response;
882  Status recv_status;
883  ClientContext cli_ctx;
884  ServerContext srv_ctx;
885  ServerAsyncReaderWriter<EchoResponse, EchoRequest> srv_stream(&srv_ctx);
886 
887  send_request.set_message(GetParam().message_content);
888  cli_ctx.set_initial_metadata_corked(true);
889  std::unique_ptr<ClientAsyncReaderWriter<EchoRequest, EchoResponse>>
890  cli_stream(stub_->AsyncBidiStream(&cli_ctx, cq_.get(), tag(1)));
891 
892  service_->RequestBidiStream(&srv_ctx, &srv_stream, cq_.get(), cq_.get(),
893  tag(2));
894 
895  cli_stream->WriteLast(send_request, WriteOptions(), tag(3));
896 
897  bool seen3 = false;
898 
899  Verifier().Expect(2, true).ExpectMaybe(3, true, &seen3).Verify(cq_.get());
900 
901  srv_stream.Read(&recv_request, tag(4));
902 
903  Verifier().ExpectUnless(3, true, seen3).Expect(4, true).Verify(cq_.get());
904  EXPECT_EQ(send_request.message(), recv_request.message());
905 
906  srv_stream.Read(&recv_request, tag(5));
907  Verifier().Expect(5, false).Verify(cq_.get());
908 
909  send_response.set_message(recv_request.message());
910  srv_stream.WriteLast(send_response, WriteOptions(), tag(6));
911  srv_stream.Finish(Status::OK, tag(7));
912  cli_stream->Read(&recv_response, tag(8));
913  Verifier().Expect(6, true).Expect(7, true).Expect(8, true).Verify(cq_.get());
914  EXPECT_EQ(send_response.message(), recv_response.message());
915 
916  cli_stream->Finish(&recv_status, tag(9));
917  Verifier().Expect(9, true).Verify(cq_.get());
918 
919  EXPECT_TRUE(recv_status.ok());
920 }
921 
922 // Metadata tests
923 TEST_P(AsyncEnd2endTest, ClientInitialMetadataRpc) {
924  ResetStub();
925 
926  EchoRequest send_request;
927  EchoRequest recv_request;
928  EchoResponse send_response;
929  EchoResponse recv_response;
930  Status recv_status;
931 
932  ClientContext cli_ctx;
933  ServerContext srv_ctx;
934  grpc::ServerAsyncResponseWriter<EchoResponse> response_writer(&srv_ctx);
935 
936  send_request.set_message(GetParam().message_content);
937  std::pair<std::string, std::string> meta1("key1", "val1");
938  std::pair<std::string, std::string> meta2("key2", "val2");
939  std::pair<std::string, std::string> meta3("g.r.d-bin", "xyz");
940  cli_ctx.AddMetadata(meta1.first, meta1.second);
941  cli_ctx.AddMetadata(meta2.first, meta2.second);
942  cli_ctx.AddMetadata(meta3.first, meta3.second);
943 
944  std::unique_ptr<ClientAsyncResponseReader<EchoResponse>> response_reader(
945  stub_->AsyncEcho(&cli_ctx, send_request, cq_.get()));
946  response_reader->Finish(&recv_response, &recv_status, tag(4));
947 
948  service_->RequestEcho(&srv_ctx, &recv_request, &response_writer, cq_.get(),
949  cq_.get(), tag(2));
950  Verifier().Expect(2, true).Verify(cq_.get());
951  EXPECT_EQ(send_request.message(), recv_request.message());
952  const auto& client_initial_metadata = srv_ctx.client_metadata();
953  EXPECT_EQ(meta1.second,
954  ToString(client_initial_metadata.find(meta1.first)->second));
955  EXPECT_EQ(meta2.second,
956  ToString(client_initial_metadata.find(meta2.first)->second));
957  EXPECT_EQ(meta3.second,
958  ToString(client_initial_metadata.find(meta3.first)->second));
959  EXPECT_GE(client_initial_metadata.size(), static_cast<size_t>(2));
960 
961  send_response.set_message(recv_request.message());
962  response_writer.Finish(send_response, Status::OK, tag(3));
963  Verifier().Expect(3, true).Expect(4, true).Verify(cq_.get());
964 
965  EXPECT_EQ(send_response.message(), recv_response.message());
966  EXPECT_TRUE(recv_status.ok());
967 }
968 
969 TEST_P(AsyncEnd2endTest, ServerInitialMetadataRpc) {
970  ResetStub();
971 
972  EchoRequest send_request;
973  EchoRequest recv_request;
974  EchoResponse send_response;
975  EchoResponse recv_response;
976  Status recv_status;
977 
978  ClientContext cli_ctx;
979  ServerContext srv_ctx;
980  grpc::ServerAsyncResponseWriter<EchoResponse> response_writer(&srv_ctx);
981 
982  send_request.set_message(GetParam().message_content);
983  std::pair<std::string, std::string> meta1("key1", "val1");
984  std::pair<std::string, std::string> meta2("key2", "val2");
985 
986  std::unique_ptr<ClientAsyncResponseReader<EchoResponse>> response_reader(
987  stub_->AsyncEcho(&cli_ctx, send_request, cq_.get()));
988  response_reader->ReadInitialMetadata(tag(4));
989 
990  service_->RequestEcho(&srv_ctx, &recv_request, &response_writer, cq_.get(),
991  cq_.get(), tag(2));
992  Verifier().Expect(2, true).Verify(cq_.get());
993  EXPECT_EQ(send_request.message(), recv_request.message());
994  srv_ctx.AddInitialMetadata(meta1.first, meta1.second);
995  srv_ctx.AddInitialMetadata(meta2.first, meta2.second);
996  response_writer.SendInitialMetadata(tag(3));
997  Verifier().Expect(3, true).Expect(4, true).Verify(cq_.get());
998  const auto& server_initial_metadata = cli_ctx.GetServerInitialMetadata();
999  EXPECT_EQ(meta1.second,
1000  ToString(server_initial_metadata.find(meta1.first)->second));
1001  EXPECT_EQ(meta2.second,
1002  ToString(server_initial_metadata.find(meta2.first)->second));
1003  EXPECT_EQ(static_cast<size_t>(2), server_initial_metadata.size());
1004 
1005  send_response.set_message(recv_request.message());
1006  response_writer.Finish(send_response, Status::OK, tag(5));
1007  response_reader->Finish(&recv_response, &recv_status, tag(6));
1008  Verifier().Expect(5, true).Expect(6, true).Verify(cq_.get());
1009 
1010  EXPECT_EQ(send_response.message(), recv_response.message());
1011  EXPECT_TRUE(recv_status.ok());
1012 }
1013 
1014 // 1 ping, 2 pongs.
1015 TEST_P(AsyncEnd2endTest, ServerInitialMetadataServerStreaming) {
1016  ResetStub();
1017  EchoRequest send_request;
1018  EchoRequest recv_request;
1019  EchoResponse send_response;
1020  EchoResponse recv_response;
1021  Status recv_status;
1022  ClientContext cli_ctx;
1023  ServerContext srv_ctx;
1024  ServerAsyncWriter<EchoResponse> srv_stream(&srv_ctx);
1025 
1026  std::pair<::std::string, ::std::string> meta1("key1", "val1");
1027  std::pair<::std::string, ::std::string> meta2("key2", "val2");
1028 
1029  std::unique_ptr<ClientAsyncReader<EchoResponse>> cli_stream(
1030  stub_->AsyncResponseStream(&cli_ctx, send_request, cq_.get(), tag(1)));
1031  cli_stream->ReadInitialMetadata(tag(11));
1032  service_->RequestResponseStream(&srv_ctx, &recv_request, &srv_stream,
1033  cq_.get(), cq_.get(), tag(2));
1034 
1035  Verifier().Expect(1, true).Expect(2, true).Verify(cq_.get());
1036 
1037  srv_ctx.AddInitialMetadata(meta1.first, meta1.second);
1038  srv_ctx.AddInitialMetadata(meta2.first, meta2.second);
1039  srv_stream.SendInitialMetadata(tag(10));
1040  Verifier().Expect(10, true).Expect(11, true).Verify(cq_.get());
1041  auto server_initial_metadata = cli_ctx.GetServerInitialMetadata();
1042  EXPECT_EQ(meta1.second,
1043  ToString(server_initial_metadata.find(meta1.first)->second));
1044  EXPECT_EQ(meta2.second,
1045  ToString(server_initial_metadata.find(meta2.first)->second));
1046  EXPECT_EQ(static_cast<size_t>(2), server_initial_metadata.size());
1047 
1048  srv_stream.Write(send_response, tag(3));
1049 
1050  cli_stream->Read(&recv_response, tag(4));
1051  Verifier().Expect(3, true).Expect(4, true).Verify(cq_.get());
1052 
1053  srv_stream.Write(send_response, tag(5));
1054  cli_stream->Read(&recv_response, tag(6));
1055  Verifier().Expect(5, true).Expect(6, true).Verify(cq_.get());
1056 
1057  srv_stream.Finish(Status::OK, tag(7));
1058  cli_stream->Read(&recv_response, tag(8));
1059  Verifier().Expect(7, true).Expect(8, false).Verify(cq_.get());
1060 
1061  cli_stream->Finish(&recv_status, tag(9));
1062  Verifier().Expect(9, true).Verify(cq_.get());
1063 
1064  EXPECT_TRUE(recv_status.ok());
1065 }
1066 
1067 // 1 ping, 2 pongs.
1068 // Test for server initial metadata being sent implicitly
1069 TEST_P(AsyncEnd2endTest, ServerInitialMetadataServerStreamingImplicit) {
1070  ResetStub();
1071  EchoRequest send_request;
1072  EchoRequest recv_request;
1073  EchoResponse send_response;
1074  EchoResponse recv_response;
1075  Status recv_status;
1076  ClientContext cli_ctx;
1077  ServerContext srv_ctx;
1078  ServerAsyncWriter<EchoResponse> srv_stream(&srv_ctx);
1079 
1080  send_request.set_message(GetParam().message_content);
1081  std::pair<::std::string, ::std::string> meta1("key1", "val1");
1082  std::pair<::std::string, ::std::string> meta2("key2", "val2");
1083 
1084  std::unique_ptr<ClientAsyncReader<EchoResponse>> cli_stream(
1085  stub_->AsyncResponseStream(&cli_ctx, send_request, cq_.get(), tag(1)));
1086  service_->RequestResponseStream(&srv_ctx, &recv_request, &srv_stream,
1087  cq_.get(), cq_.get(), tag(2));
1088 
1089  Verifier().Expect(1, true).Expect(2, true).Verify(cq_.get());
1090  EXPECT_EQ(send_request.message(), recv_request.message());
1091 
1092  srv_ctx.AddInitialMetadata(meta1.first, meta1.second);
1093  srv_ctx.AddInitialMetadata(meta2.first, meta2.second);
1094  send_response.set_message(recv_request.message());
1095  srv_stream.Write(send_response, tag(3));
1096 
1097  cli_stream->Read(&recv_response, tag(4));
1098  Verifier().Expect(3, true).Expect(4, true).Verify(cq_.get());
1099  EXPECT_EQ(send_response.message(), recv_response.message());
1100 
1101  auto server_initial_metadata = cli_ctx.GetServerInitialMetadata();
1102  EXPECT_EQ(meta1.second,
1103  ToString(server_initial_metadata.find(meta1.first)->second));
1104  EXPECT_EQ(meta2.second,
1105  ToString(server_initial_metadata.find(meta2.first)->second));
1106  EXPECT_EQ(static_cast<size_t>(2), server_initial_metadata.size());
1107 
1108  srv_stream.Write(send_response, tag(5));
1109  cli_stream->Read(&recv_response, tag(6));
1110  Verifier().Expect(5, true).Expect(6, true).Verify(cq_.get());
1111 
1112  srv_stream.Finish(Status::OK, tag(7));
1113  cli_stream->Read(&recv_response, tag(8));
1114  Verifier().Expect(7, true).Expect(8, false).Verify(cq_.get());
1115 
1116  cli_stream->Finish(&recv_status, tag(9));
1117  Verifier().Expect(9, true).Verify(cq_.get());
1118 
1119  EXPECT_TRUE(recv_status.ok());
1120 }
1121 
1122 TEST_P(AsyncEnd2endTest, ServerTrailingMetadataRpc) {
1123  ResetStub();
1124 
1125  EchoRequest send_request;
1126  EchoRequest recv_request;
1127  EchoResponse send_response;
1128  EchoResponse recv_response;
1129  Status recv_status;
1130 
1131  ClientContext cli_ctx;
1132  ServerContext srv_ctx;
1133  grpc::ServerAsyncResponseWriter<EchoResponse> response_writer(&srv_ctx);
1134 
1135  send_request.set_message(GetParam().message_content);
1136  std::pair<std::string, std::string> meta1("key1", "val1");
1137  std::pair<std::string, std::string> meta2("key2", "val2");
1138 
1139  std::unique_ptr<ClientAsyncResponseReader<EchoResponse>> response_reader(
1140  stub_->AsyncEcho(&cli_ctx, send_request, cq_.get()));
1141  response_reader->Finish(&recv_response, &recv_status, tag(5));
1142 
1143  service_->RequestEcho(&srv_ctx, &recv_request, &response_writer, cq_.get(),
1144  cq_.get(), tag(2));
1145  Verifier().Expect(2, true).Verify(cq_.get());
1146  EXPECT_EQ(send_request.message(), recv_request.message());
1147  response_writer.SendInitialMetadata(tag(3));
1148  Verifier().Expect(3, true).Verify(cq_.get());
1149 
1150  send_response.set_message(recv_request.message());
1151  srv_ctx.AddTrailingMetadata(meta1.first, meta1.second);
1152  srv_ctx.AddTrailingMetadata(meta2.first, meta2.second);
1153  response_writer.Finish(send_response, Status::OK, tag(4));
1154 
1155  Verifier().Expect(4, true).Expect(5, true).Verify(cq_.get());
1156 
1157  EXPECT_EQ(send_response.message(), recv_response.message());
1158  EXPECT_TRUE(recv_status.ok());
1159  const auto& server_trailing_metadata = cli_ctx.GetServerTrailingMetadata();
1160  EXPECT_EQ(meta1.second,
1161  ToString(server_trailing_metadata.find(meta1.first)->second));
1162  EXPECT_EQ(meta2.second,
1163  ToString(server_trailing_metadata.find(meta2.first)->second));
1164  EXPECT_EQ(static_cast<size_t>(2), server_trailing_metadata.size());
1165 }
1166 
1167 TEST_P(AsyncEnd2endTest, MetadataRpc) {
1168  ResetStub();
1169 
1170  EchoRequest send_request;
1171  EchoRequest recv_request;
1172  EchoResponse send_response;
1173  EchoResponse recv_response;
1174  Status recv_status;
1175 
1176  ClientContext cli_ctx;
1177  ServerContext srv_ctx;
1178  grpc::ServerAsyncResponseWriter<EchoResponse> response_writer(&srv_ctx);
1179 
1180  send_request.set_message(GetParam().message_content);
1181  std::pair<std::string, std::string> meta1("key1", "val1");
1182  std::pair<std::string, std::string> meta2(
1183  "key2-bin",
1184  std::string("\xc0\xc1\xc2\xc3\xc4\xc5\xc6\xc7\xc8\xc9\xca\xcb\xcc", 13));
1185  std::pair<std::string, std::string> meta3("key3", "val3");
1186  std::pair<std::string, std::string> meta6(
1187  "key4-bin",
1188  std::string("\x10\x11\x12\x13\x14\x15\x16\x17\x18\x19\x1a\x1b\x1c\x1d",
1189  14));
1190  std::pair<std::string, std::string> meta5("key5", "val5");
1191  std::pair<std::string, std::string> meta4(
1192  "key6-bin",
1193  std::string(
1194  "\xe0\xe1\xe2\xe3\xe4\xe5\xe6\xe7\xe8\xe9\xea\xeb\xec\xed\xee", 15));
1195 
1196  cli_ctx.AddMetadata(meta1.first, meta1.second);
1197  cli_ctx.AddMetadata(meta2.first, meta2.second);
1198 
1199  std::unique_ptr<ClientAsyncResponseReader<EchoResponse>> response_reader(
1200  stub_->AsyncEcho(&cli_ctx, send_request, cq_.get()));
1201  response_reader->ReadInitialMetadata(tag(4));
1202 
1203  service_->RequestEcho(&srv_ctx, &recv_request, &response_writer, cq_.get(),
1204  cq_.get(), tag(2));
1205  Verifier().Expect(2, true).Verify(cq_.get());
1206  EXPECT_EQ(send_request.message(), recv_request.message());
1207  const auto& client_initial_metadata = srv_ctx.client_metadata();
1208  EXPECT_EQ(meta1.second,
1209  ToString(client_initial_metadata.find(meta1.first)->second));
1210  EXPECT_EQ(meta2.second,
1211  ToString(client_initial_metadata.find(meta2.first)->second));
1212  EXPECT_GE(client_initial_metadata.size(), static_cast<size_t>(2));
1213 
1214  srv_ctx.AddInitialMetadata(meta3.first, meta3.second);
1215  srv_ctx.AddInitialMetadata(meta4.first, meta4.second);
1216  response_writer.SendInitialMetadata(tag(3));
1217  Verifier().Expect(3, true).Expect(4, true).Verify(cq_.get());
1218  const auto& server_initial_metadata = cli_ctx.GetServerInitialMetadata();
1219  EXPECT_EQ(meta3.second,
1220  ToString(server_initial_metadata.find(meta3.first)->second));
1221  EXPECT_EQ(meta4.second,
1222  ToString(server_initial_metadata.find(meta4.first)->second));
1223  EXPECT_GE(server_initial_metadata.size(), static_cast<size_t>(2));
1224 
1225  send_response.set_message(recv_request.message());
1226  srv_ctx.AddTrailingMetadata(meta5.first, meta5.second);
1227  srv_ctx.AddTrailingMetadata(meta6.first, meta6.second);
1228  response_writer.Finish(send_response, Status::OK, tag(5));
1229  response_reader->Finish(&recv_response, &recv_status, tag(6));
1230 
1231  Verifier().Expect(5, true).Expect(6, true).Verify(cq_.get());
1232 
1233  EXPECT_EQ(send_response.message(), recv_response.message());
1234  EXPECT_TRUE(recv_status.ok());
1235  const auto& server_trailing_metadata = cli_ctx.GetServerTrailingMetadata();
1236  EXPECT_EQ(meta5.second,
1237  ToString(server_trailing_metadata.find(meta5.first)->second));
1238  EXPECT_EQ(meta6.second,
1239  ToString(server_trailing_metadata.find(meta6.first)->second));
1240  EXPECT_GE(server_trailing_metadata.size(), static_cast<size_t>(2));
1241 }
1242 
1243 // Server uses AsyncNotifyWhenDone API to check for cancellation
1244 TEST_P(AsyncEnd2endTest, ServerCheckCancellation) {
1245  ResetStub();
1246 
1247  EchoRequest send_request;
1248  EchoRequest recv_request;
1249  EchoResponse send_response;
1250  EchoResponse recv_response;
1251  Status recv_status;
1252 
1253  ClientContext cli_ctx;
1254  ServerContext srv_ctx;
1255  grpc::ServerAsyncResponseWriter<EchoResponse> response_writer(&srv_ctx);
1256 
1257  send_request.set_message(GetParam().message_content);
1258  std::unique_ptr<ClientAsyncResponseReader<EchoResponse>> response_reader(
1259  stub_->AsyncEcho(&cli_ctx, send_request, cq_.get()));
1260  response_reader->Finish(&recv_response, &recv_status, tag(4));
1261 
1262  srv_ctx.AsyncNotifyWhenDone(tag(5));
1263  service_->RequestEcho(&srv_ctx, &recv_request, &response_writer, cq_.get(),
1264  cq_.get(), tag(2));
1265 
1266  Verifier().Expect(2, true).Verify(cq_.get());
1267  EXPECT_EQ(send_request.message(), recv_request.message());
1268 
1269  cli_ctx.TryCancel();
1270  Verifier().Expect(5, true).Expect(4, true).Verify(cq_.get());
1271  EXPECT_TRUE(srv_ctx.IsCancelled());
1272 
1273  EXPECT_EQ(StatusCode::CANCELLED, recv_status.error_code());
1274 }
1275 
1276 // Server uses AsyncNotifyWhenDone API to check for normal finish
1277 TEST_P(AsyncEnd2endTest, ServerCheckDone) {
1278  ResetStub();
1279 
1280  EchoRequest send_request;
1281  EchoRequest recv_request;
1282  EchoResponse send_response;
1283  EchoResponse recv_response;
1284  Status recv_status;
1285 
1286  ClientContext cli_ctx;
1287  ServerContext srv_ctx;
1288  grpc::ServerAsyncResponseWriter<EchoResponse> response_writer(&srv_ctx);
1289 
1290  send_request.set_message(GetParam().message_content);
1291  std::unique_ptr<ClientAsyncResponseReader<EchoResponse>> response_reader(
1292  stub_->AsyncEcho(&cli_ctx, send_request, cq_.get()));
1293  response_reader->Finish(&recv_response, &recv_status, tag(4));
1294 
1295  srv_ctx.AsyncNotifyWhenDone(tag(5));
1296  service_->RequestEcho(&srv_ctx, &recv_request, &response_writer, cq_.get(),
1297  cq_.get(), tag(2));
1298 
1299  Verifier().Expect(2, true).Verify(cq_.get());
1300  EXPECT_EQ(send_request.message(), recv_request.message());
1301 
1302  send_response.set_message(recv_request.message());
1303  response_writer.Finish(send_response, Status::OK, tag(3));
1304  Verifier().Expect(3, true).Expect(4, true).Expect(5, true).Verify(cq_.get());
1305  EXPECT_FALSE(srv_ctx.IsCancelled());
1306 
1307  EXPECT_EQ(send_response.message(), recv_response.message());
1308  EXPECT_TRUE(recv_status.ok());
1309 }
1310 
1311 TEST_P(AsyncEnd2endTest, UnimplementedRpc) {
1312  ChannelArguments args;
1313  const auto& channel_creds = GetCredentialsProvider()->GetChannelCredentials(
1314  GetParam().credentials_type, &args);
1315  std::shared_ptr<Channel> channel =
1316  !(GetParam().inproc) ? grpc::CreateCustomChannel(server_address_.str(),
1317  channel_creds, args)
1318  : server_->InProcessChannel(args);
1319  std::unique_ptr<grpc::testing::UnimplementedEchoService::Stub> stub;
1320  stub = grpc::testing::UnimplementedEchoService::NewStub(channel);
1321  EchoRequest send_request;
1322  EchoResponse recv_response;
1323  Status recv_status;
1324 
1325  ClientContext cli_ctx;
1326  send_request.set_message(GetParam().message_content);
1327  std::unique_ptr<ClientAsyncResponseReader<EchoResponse>> response_reader(
1328  stub->AsyncUnimplemented(&cli_ctx, send_request, cq_.get()));
1329 
1330  response_reader->Finish(&recv_response, &recv_status, tag(4));
1331  Verifier().Expect(4, true).Verify(cq_.get());
1332 
1333  EXPECT_EQ(StatusCode::UNIMPLEMENTED, recv_status.error_code());
1334  EXPECT_EQ("", recv_status.error_message());
1335 }
1336 
1337 // This class is for testing scenarios where RPCs are cancelled on the server
1338 // by calling ServerContext::TryCancel(). Server uses AsyncNotifyWhenDone
1339 // API to check for cancellation
1340 class AsyncEnd2endServerTryCancelTest : public AsyncEnd2endTest {
1341  protected:
1342  typedef enum {
1343  DO_NOT_CANCEL = 0,
1348 
1349  // Helper for testing client-streaming RPCs which are cancelled on the server.
1350  // Depending on the value of server_try_cancel parameter, this will test one
1351  // of the following three scenarios:
1352  // CANCEL_BEFORE_PROCESSING: Rpc is cancelled by the server before reading
1353  // any messages from the client
1354  //
1355  // CANCEL_DURING_PROCESSING: Rpc is cancelled by the server while reading
1356  // messages from the client
1357  //
1358  // CANCEL_AFTER PROCESSING: Rpc is cancelled by server after reading all
1359  // messages from the client (but before sending any status back to the
1360  // client)
1361  void TestClientStreamingServerCancel(
1362  ServerTryCancelRequestPhase server_try_cancel) {
1363  ResetStub();
1364 
1365  EchoRequest recv_request;
1366  EchoResponse send_response;
1367  EchoResponse recv_response;
1368  Status recv_status;
1369 
1370  ClientContext cli_ctx;
1371  ServerContext srv_ctx;
1372  ServerAsyncReader<EchoResponse, EchoRequest> srv_stream(&srv_ctx);
1373 
1374  // Initiate the 'RequestStream' call on client
1375  CompletionQueue cli_cq;
1376 
1377  std::unique_ptr<ClientAsyncWriter<EchoRequest>> cli_stream(
1378  stub_->AsyncRequestStream(&cli_ctx, &recv_response, &cli_cq, tag(1)));
1379 
1380  // On the server, request to be notified of 'RequestStream' calls
1381  // and receive the 'RequestStream' call just made by the client
1382  srv_ctx.AsyncNotifyWhenDone(tag(11));
1383  service_->RequestRequestStream(&srv_ctx, &srv_stream, cq_.get(), cq_.get(),
1384  tag(2));
1385  std::thread t1([&cli_cq] { Verifier().Expect(1, true).Verify(&cli_cq); });
1386  Verifier().Expect(2, true).Verify(cq_.get());
1387  t1.join();
1388 
1389  bool expected_server_cq_result = true;
1390  bool expected_client_cq_result = true;
1391 
1392  if (server_try_cancel == CANCEL_BEFORE_PROCESSING) {
1393  srv_ctx.TryCancel();
1394  Verifier().Expect(11, true).Verify(cq_.get());
1395  EXPECT_TRUE(srv_ctx.IsCancelled());
1396 
1397  // Since cancellation is done before server reads any results, we know
1398  // for sure that all server cq results will return false from this
1399  // point forward
1400  expected_server_cq_result = false;
1401  expected_client_cq_result = false;
1402  }
1403 
1404  bool ignore_client_cq_result =
1405  (server_try_cancel == CANCEL_DURING_PROCESSING) ||
1406  (server_try_cancel == CANCEL_BEFORE_PROCESSING);
1407 
1408  std::thread cli_thread([&cli_cq, &cli_stream, &expected_client_cq_result,
1409  &ignore_client_cq_result] {
1410  EchoRequest send_request;
1411  // Client sends 3 messages (tags 3, 4 and 5)
1412  for (int tag_idx = 3; tag_idx <= 5; tag_idx++) {
1413  send_request.set_message("Ping " + std::to_string(tag_idx));
1414  cli_stream->Write(send_request, tag(tag_idx));
1415  Verifier()
1416  .Expect(tag_idx, expected_client_cq_result)
1417  .Verify(&cli_cq, ignore_client_cq_result);
1418  }
1419  cli_stream->WritesDone(tag(6));
1420  // Ignore ok on WritesDone since cancel can affect it
1421  Verifier()
1422  .Expect(6, expected_client_cq_result)
1423  .Verify(&cli_cq, ignore_client_cq_result);
1424  });
1425 
1426  bool ignore_cq_result = false;
1427  bool want_done_tag = false;
1428  std::thread* server_try_cancel_thd = nullptr;
1429 
1430  auto verif = Verifier();
1431 
1432  if (server_try_cancel == CANCEL_DURING_PROCESSING) {
1433  server_try_cancel_thd =
1434  new std::thread([&srv_ctx] { srv_ctx.TryCancel(); });
1435  // Server will cancel the RPC in a parallel thread while reading the
1436  // requests from the client. Since the cancellation can happen at anytime,
1437  // some of the cq results (i.e those until cancellation) might be true but
1438  // its non deterministic. So better to ignore the cq results
1439  ignore_cq_result = true;
1440  // Expect that we might possibly see the done tag that
1441  // indicates cancellation completion in this case
1442  want_done_tag = true;
1443  verif.Expect(11, true);
1444  }
1445 
1446  // Server reads 3 messages (tags 6, 7 and 8)
1447  // But if want_done_tag is true, we might also see tag 11
1448  for (int tag_idx = 6; tag_idx <= 8; tag_idx++) {
1449  srv_stream.Read(&recv_request, tag(tag_idx));
1450  // Note that we'll add something to the verifier and verify that
1451  // something was seen, but it might be tag 11 and not what we
1452  // just added
1453  int got_tag = verif.Expect(tag_idx, expected_server_cq_result)
1454  .Next(cq_.get(), ignore_cq_result);
1455  GPR_ASSERT((got_tag == tag_idx) || (got_tag == 11 && want_done_tag));
1456  if (got_tag == 11) {
1457  EXPECT_TRUE(srv_ctx.IsCancelled());
1458  want_done_tag = false;
1459  // Now get the other entry that we were waiting on
1460  EXPECT_EQ(verif.Next(cq_.get(), ignore_cq_result), tag_idx);
1461  }
1462  }
1463 
1464  cli_thread.join();
1465 
1466  if (server_try_cancel_thd != nullptr) {
1467  server_try_cancel_thd->join();
1468  delete server_try_cancel_thd;
1469  }
1470 
1471  if (server_try_cancel == CANCEL_AFTER_PROCESSING) {
1472  srv_ctx.TryCancel();
1473  want_done_tag = true;
1474  verif.Expect(11, true);
1475  }
1476 
1477  if (want_done_tag) {
1478  verif.Verify(cq_.get());
1479  EXPECT_TRUE(srv_ctx.IsCancelled());
1480  want_done_tag = false;
1481  }
1482 
1483  // The RPC has been cancelled at this point for sure (i.e irrespective of
1484  // the value of `server_try_cancel` is). So, from this point forward, we
1485  // know that cq results are supposed to return false on server.
1486 
1487  // Server sends the final message and cancelled status (but the RPC is
1488  // already cancelled at this point. So we expect the operation to fail)
1489  srv_stream.Finish(send_response, Status::CANCELLED, tag(9));
1490  Verifier().Expect(9, false).Verify(cq_.get());
1491 
1492  // Client will see the cancellation
1493  cli_stream->Finish(&recv_status, tag(10));
1494  Verifier().Expect(10, true).Verify(&cli_cq);
1495  EXPECT_FALSE(recv_status.ok());
1496  EXPECT_EQ(grpc::StatusCode::CANCELLED, recv_status.error_code());
1497 
1498  cli_cq.Shutdown();
1499  void* phony_tag;
1500  bool phony_ok;
1501  while (cli_cq.Next(&phony_tag, &phony_ok)) {
1502  }
1503  }
1504 
1505  // Helper for testing server-streaming RPCs which are cancelled on the server.
1506  // Depending on the value of server_try_cancel parameter, this will test one
1507  // of the following three scenarios:
1508  // CANCEL_BEFORE_PROCESSING: Rpc is cancelled by the server before sending
1509  // any messages to the client
1510  //
1511  // CANCEL_DURING_PROCESSING: Rpc is cancelled by the server while sending
1512  // messages to the client
1513  //
1514  // CANCEL_AFTER PROCESSING: Rpc is cancelled by server after sending all
1515  // messages to the client (but before sending any status back to the
1516  // client)
1517  void TestServerStreamingServerCancel(
1518  ServerTryCancelRequestPhase server_try_cancel) {
1519  ResetStub();
1520 
1521  EchoRequest send_request;
1522  EchoRequest recv_request;
1523  EchoResponse send_response;
1524  Status recv_status;
1525  ClientContext cli_ctx;
1526  ServerContext srv_ctx;
1527  ServerAsyncWriter<EchoResponse> srv_stream(&srv_ctx);
1528 
1529  send_request.set_message("Ping");
1530  // Initiate the 'ResponseStream' call on the client
1531  CompletionQueue cli_cq;
1532  std::unique_ptr<ClientAsyncReader<EchoResponse>> cli_stream(
1533  stub_->AsyncResponseStream(&cli_ctx, send_request, &cli_cq, tag(1)));
1534  // On the server, request to be notified of 'ResponseStream' calls and
1535  // receive the call just made by the client
1536  srv_ctx.AsyncNotifyWhenDone(tag(11));
1537  service_->RequestResponseStream(&srv_ctx, &recv_request, &srv_stream,
1538  cq_.get(), cq_.get(), tag(2));
1539 
1540  std::thread t1([&cli_cq] { Verifier().Expect(1, true).Verify(&cli_cq); });
1541  Verifier().Expect(2, true).Verify(cq_.get());
1542  t1.join();
1543 
1544  EXPECT_EQ(send_request.message(), recv_request.message());
1545 
1546  bool expected_cq_result = true;
1547  bool ignore_cq_result = false;
1548  bool want_done_tag = false;
1549  bool expected_client_cq_result = true;
1550  bool ignore_client_cq_result =
1551  (server_try_cancel != CANCEL_BEFORE_PROCESSING);
1552 
1553  if (server_try_cancel == CANCEL_BEFORE_PROCESSING) {
1554  srv_ctx.TryCancel();
1555  Verifier().Expect(11, true).Verify(cq_.get());
1556  EXPECT_TRUE(srv_ctx.IsCancelled());
1557 
1558  // We know for sure that all cq results will be false from this point
1559  // since the server cancelled the RPC
1560  expected_cq_result = false;
1561  expected_client_cq_result = false;
1562  }
1563 
1564  std::thread cli_thread([&cli_cq, &cli_stream, &expected_client_cq_result,
1565  &ignore_client_cq_result] {
1566  // Client attempts to read the three messages from the server
1567  for (int tag_idx = 6; tag_idx <= 8; tag_idx++) {
1568  EchoResponse recv_response;
1569  cli_stream->Read(&recv_response, tag(tag_idx));
1570  Verifier()
1571  .Expect(tag_idx, expected_client_cq_result)
1572  .Verify(&cli_cq, ignore_client_cq_result);
1573  }
1574  });
1575 
1576  std::thread* server_try_cancel_thd = nullptr;
1577 
1578  auto verif = Verifier();
1579 
1580  if (server_try_cancel == CANCEL_DURING_PROCESSING) {
1581  server_try_cancel_thd =
1582  new std::thread([&srv_ctx] { srv_ctx.TryCancel(); });
1583 
1584  // Server will cancel the RPC in a parallel thread while writing responses
1585  // to the client. Since the cancellation can happen at anytime, some of
1586  // the cq results (i.e those until cancellation) might be true but it is
1587  // non deterministic. So better to ignore the cq results
1588  ignore_cq_result = true;
1589  // Expect that we might possibly see the done tag that
1590  // indicates cancellation completion in this case
1591  want_done_tag = true;
1592  verif.Expect(11, true);
1593  }
1594 
1595  // Server sends three messages (tags 3, 4 and 5)
1596  // But if want_done tag is true, we might also see tag 11
1597  for (int tag_idx = 3; tag_idx <= 5; tag_idx++) {
1598  send_response.set_message("Pong " + std::to_string(tag_idx));
1599  srv_stream.Write(send_response, tag(tag_idx));
1600  // Note that we'll add something to the verifier and verify that
1601  // something was seen, but it might be tag 11 and not what we
1602  // just added
1603  int got_tag = verif.Expect(tag_idx, expected_cq_result)
1604  .Next(cq_.get(), ignore_cq_result);
1605  GPR_ASSERT((got_tag == tag_idx) || (got_tag == 11 && want_done_tag));
1606  if (got_tag == 11) {
1607  EXPECT_TRUE(srv_ctx.IsCancelled());
1608  want_done_tag = false;
1609  // Now get the other entry that we were waiting on
1610  EXPECT_EQ(verif.Next(cq_.get(), ignore_cq_result), tag_idx);
1611  }
1612  }
1613 
1614  if (server_try_cancel_thd != nullptr) {
1615  server_try_cancel_thd->join();
1616  delete server_try_cancel_thd;
1617  }
1618 
1619  if (server_try_cancel == CANCEL_AFTER_PROCESSING) {
1620  srv_ctx.TryCancel();
1621  want_done_tag = true;
1622  verif.Expect(11, true);
1623  }
1624 
1625  if (want_done_tag) {
1626  verif.Verify(cq_.get());
1627  EXPECT_TRUE(srv_ctx.IsCancelled());
1628  want_done_tag = false;
1629  }
1630 
1631  cli_thread.join();
1632 
1633  // The RPC has been cancelled at this point for sure (i.e irrespective of
1634  // the value of `server_try_cancel` is). So, from this point forward, we
1635  // know that cq results are supposed to return false on server.
1636 
1637  // Server finishes the stream (but the RPC is already cancelled)
1638  srv_stream.Finish(Status::CANCELLED, tag(9));
1639  Verifier().Expect(9, false).Verify(cq_.get());
1640 
1641  // Client will see the cancellation
1642  cli_stream->Finish(&recv_status, tag(10));
1643  Verifier().Expect(10, true).Verify(&cli_cq);
1644  EXPECT_FALSE(recv_status.ok());
1645  EXPECT_EQ(grpc::StatusCode::CANCELLED, recv_status.error_code());
1646 
1647  cli_cq.Shutdown();
1648  void* phony_tag;
1649  bool phony_ok;
1650  while (cli_cq.Next(&phony_tag, &phony_ok)) {
1651  }
1652  }
1653 
1654  // Helper for testing bidirectinal-streaming RPCs which are cancelled on the
1655  // server.
1656  //
1657  // Depending on the value of server_try_cancel parameter, this will
1658  // test one of the following three scenarios:
1659  // CANCEL_BEFORE_PROCESSING: Rpc is cancelled by the server before reading/
1660  // writing any messages from/to the client
1661  //
1662  // CANCEL_DURING_PROCESSING: Rpc is cancelled by the server while reading
1663  // messages from the client
1664  //
1665  // CANCEL_AFTER PROCESSING: Rpc is cancelled by server after reading all
1666  // messages from the client (but before sending any status back to the
1667  // client)
1668  void TestBidiStreamingServerCancel(
1669  ServerTryCancelRequestPhase server_try_cancel) {
1670  ResetStub();
1671 
1672  EchoRequest send_request;
1673  EchoRequest recv_request;
1674  EchoResponse send_response;
1675  EchoResponse recv_response;
1676  Status recv_status;
1677  ClientContext cli_ctx;
1678  ServerContext srv_ctx;
1679  ServerAsyncReaderWriter<EchoResponse, EchoRequest> srv_stream(&srv_ctx);
1680 
1681  // Initiate the call from the client side
1682  std::unique_ptr<ClientAsyncReaderWriter<EchoRequest, EchoResponse>>
1683  cli_stream(stub_->AsyncBidiStream(&cli_ctx, cq_.get(), tag(1)));
1684 
1685  // On the server, request to be notified of the 'BidiStream' call and
1686  // receive the call just made by the client
1687  srv_ctx.AsyncNotifyWhenDone(tag(11));
1688  service_->RequestBidiStream(&srv_ctx, &srv_stream, cq_.get(), cq_.get(),
1689  tag(2));
1690  Verifier().Expect(1, true).Expect(2, true).Verify(cq_.get());
1691 
1692  auto verif = Verifier();
1693 
1694  // Client sends the first and the only message
1695  send_request.set_message("Ping");
1696  cli_stream->Write(send_request, tag(3));
1697  verif.Expect(3, true);
1698 
1699  bool expected_cq_result = true;
1700  bool ignore_cq_result = false;
1701  bool want_done_tag = false;
1702 
1703  int got_tag, got_tag2;
1704  bool tag_3_done = false;
1705 
1706  if (server_try_cancel == CANCEL_BEFORE_PROCESSING) {
1707  srv_ctx.TryCancel();
1708  verif.Expect(11, true);
1709  // We know for sure that all server cq results will be false from
1710  // this point since the server cancelled the RPC. However, we can't
1711  // say for sure about the client
1712  expected_cq_result = false;
1713  ignore_cq_result = true;
1714 
1715  do {
1716  got_tag = verif.Next(cq_.get(), ignore_cq_result);
1717  GPR_ASSERT(((got_tag == 3) && !tag_3_done) || (got_tag == 11));
1718  if (got_tag == 3) {
1719  tag_3_done = true;
1720  }
1721  } while (got_tag != 11);
1722  EXPECT_TRUE(srv_ctx.IsCancelled());
1723  }
1724 
1725  std::thread* server_try_cancel_thd = nullptr;
1726 
1727  if (server_try_cancel == CANCEL_DURING_PROCESSING) {
1728  server_try_cancel_thd =
1729  new std::thread([&srv_ctx] { srv_ctx.TryCancel(); });
1730 
1731  // Since server is going to cancel the RPC in a parallel thread, some of
1732  // the cq results (i.e those until the cancellation) might be true. Since
1733  // that number is non-deterministic, it is better to ignore the cq results
1734  ignore_cq_result = true;
1735  // Expect that we might possibly see the done tag that
1736  // indicates cancellation completion in this case
1737  want_done_tag = true;
1738  verif.Expect(11, true);
1739  }
1740 
1741  srv_stream.Read(&recv_request, tag(4));
1742  verif.Expect(4, expected_cq_result);
1743  got_tag = tag_3_done ? 3 : verif.Next(cq_.get(), ignore_cq_result);
1744  got_tag2 = verif.Next(cq_.get(), ignore_cq_result);
1745  GPR_ASSERT((got_tag == 3) || (got_tag == 4) ||
1746  (got_tag == 11 && want_done_tag));
1747  GPR_ASSERT((got_tag2 == 3) || (got_tag2 == 4) ||
1748  (got_tag2 == 11 && want_done_tag));
1749  // If we get 3 and 4, we don't need to wait for 11, but if
1750  // we get 11, we should also clear 3 and 4
1751  if (got_tag + got_tag2 != 7) {
1752  EXPECT_TRUE(srv_ctx.IsCancelled());
1753  want_done_tag = false;
1754  got_tag = verif.Next(cq_.get(), ignore_cq_result);
1755  GPR_ASSERT((got_tag == 3) || (got_tag == 4));
1756  }
1757 
1758  send_response.set_message("Pong");
1759  srv_stream.Write(send_response, tag(5));
1760  verif.Expect(5, expected_cq_result);
1761 
1762  cli_stream->Read(&recv_response, tag(6));
1763  verif.Expect(6, expected_cq_result);
1764  got_tag = verif.Next(cq_.get(), ignore_cq_result);
1765  got_tag2 = verif.Next(cq_.get(), ignore_cq_result);
1766  GPR_ASSERT((got_tag == 5) || (got_tag == 6) ||
1767  (got_tag == 11 && want_done_tag));
1768  GPR_ASSERT((got_tag2 == 5) || (got_tag2 == 6) ||
1769  (got_tag2 == 11 && want_done_tag));
1770  // If we get 5 and 6, we don't need to wait for 11, but if
1771  // we get 11, we should also clear 5 and 6
1772  if (got_tag + got_tag2 != 11) {
1773  EXPECT_TRUE(srv_ctx.IsCancelled());
1774  want_done_tag = false;
1775  got_tag = verif.Next(cq_.get(), ignore_cq_result);
1776  GPR_ASSERT((got_tag == 5) || (got_tag == 6));
1777  }
1778 
1779  // This is expected to succeed in all cases
1780  cli_stream->WritesDone(tag(7));
1781  verif.Expect(7, true);
1782  // TODO(vjpai): Consider whether the following is too flexible
1783  // or whether it should just be reset to ignore_cq_result
1784  bool ignore_cq_wd_result =
1785  ignore_cq_result || (server_try_cancel == CANCEL_BEFORE_PROCESSING);
1786  got_tag = verif.Next(cq_.get(), ignore_cq_wd_result);
1787  GPR_ASSERT((got_tag == 7) || (got_tag == 11 && want_done_tag));
1788  if (got_tag == 11) {
1789  EXPECT_TRUE(srv_ctx.IsCancelled());
1790  want_done_tag = false;
1791  // Now get the other entry that we were waiting on
1792  EXPECT_EQ(verif.Next(cq_.get(), ignore_cq_wd_result), 7);
1793  }
1794 
1795  // This is expected to fail in all cases i.e for all values of
1796  // server_try_cancel. This is because at this point, either there are no
1797  // more msgs from the client (because client called WritesDone) or the RPC
1798  // is cancelled on the server
1799  srv_stream.Read(&recv_request, tag(8));
1800  verif.Expect(8, false);
1801  got_tag = verif.Next(cq_.get(), ignore_cq_result);
1802  GPR_ASSERT((got_tag == 8) || (got_tag == 11 && want_done_tag));
1803  if (got_tag == 11) {
1804  EXPECT_TRUE(srv_ctx.IsCancelled());
1805  want_done_tag = false;
1806  // Now get the other entry that we were waiting on
1807  EXPECT_EQ(verif.Next(cq_.get(), ignore_cq_result), 8);
1808  }
1809 
1810  if (server_try_cancel_thd != nullptr) {
1811  server_try_cancel_thd->join();
1812  delete server_try_cancel_thd;
1813  }
1814 
1815  if (server_try_cancel == CANCEL_AFTER_PROCESSING) {
1816  srv_ctx.TryCancel();
1817  want_done_tag = true;
1818  verif.Expect(11, true);
1819  }
1820 
1821  if (want_done_tag) {
1822  verif.Verify(cq_.get());
1823  EXPECT_TRUE(srv_ctx.IsCancelled());
1824  want_done_tag = false;
1825  }
1826 
1827  // The RPC has been cancelled at this point for sure (i.e irrespective of
1828  // the value of `server_try_cancel` is). So, from this point forward, we
1829  // know that cq results are supposed to return false on server.
1830 
1831  srv_stream.Finish(Status::CANCELLED, tag(9));
1832  Verifier().Expect(9, false).Verify(cq_.get());
1833 
1834  cli_stream->Finish(&recv_status, tag(10));
1835  Verifier().Expect(10, true).Verify(cq_.get());
1836  EXPECT_FALSE(recv_status.ok());
1837  EXPECT_EQ(grpc::StatusCode::CANCELLED, recv_status.error_code());
1838  }
1839 };
1840 
1841 TEST_P(AsyncEnd2endServerTryCancelTest, ClientStreamingServerTryCancelBefore) {
1842  TestClientStreamingServerCancel(CANCEL_BEFORE_PROCESSING);
1843 }
1844 
1845 TEST_P(AsyncEnd2endServerTryCancelTest, ClientStreamingServerTryCancelDuring) {
1846  TestClientStreamingServerCancel(CANCEL_DURING_PROCESSING);
1847 }
1848 
1849 TEST_P(AsyncEnd2endServerTryCancelTest, ClientStreamingServerTryCancelAfter) {
1850  TestClientStreamingServerCancel(CANCEL_AFTER_PROCESSING);
1851 }
1852 
1853 TEST_P(AsyncEnd2endServerTryCancelTest, ServerStreamingServerTryCancelBefore) {
1854  TestServerStreamingServerCancel(CANCEL_BEFORE_PROCESSING);
1855 }
1856 
1857 TEST_P(AsyncEnd2endServerTryCancelTest, ServerStreamingServerTryCancelDuring) {
1858  TestServerStreamingServerCancel(CANCEL_DURING_PROCESSING);
1859 }
1860 
1861 TEST_P(AsyncEnd2endServerTryCancelTest, ServerStreamingServerTryCancelAfter) {
1862  TestServerStreamingServerCancel(CANCEL_AFTER_PROCESSING);
1863 }
1864 
1865 TEST_P(AsyncEnd2endServerTryCancelTest, ServerBidiStreamingTryCancelBefore) {
1866  TestBidiStreamingServerCancel(CANCEL_BEFORE_PROCESSING);
1867 }
1868 
1869 TEST_P(AsyncEnd2endServerTryCancelTest, ServerBidiStreamingTryCancelDuring) {
1870  TestBidiStreamingServerCancel(CANCEL_DURING_PROCESSING);
1871 }
1872 
1873 TEST_P(AsyncEnd2endServerTryCancelTest, ServerBidiStreamingTryCancelAfter) {
1874  TestBidiStreamingServerCancel(CANCEL_AFTER_PROCESSING);
1875 }
1876 
1877 std::vector<TestScenario> CreateTestScenarios(bool /*test_secure*/,
1878  bool test_message_size_limit) {
1879  std::vector<TestScenario> scenarios;
1880  std::vector<std::string> credentials_types;
1881  std::vector<std::string> messages;
1882 
1883  auto insec_ok = [] {
1884  // Only allow insecure credentials type when it is registered with the
1885  // provider. User may create providers that do not have insecure.
1887  kInsecureCredentialsType, nullptr) != nullptr;
1888  };
1889 
1890  if (insec_ok()) {
1891  credentials_types.push_back(kInsecureCredentialsType);
1892  }
1894  for (auto sec = sec_list.begin(); sec != sec_list.end(); sec++) {
1895  credentials_types.push_back(*sec);
1896  }
1897  GPR_ASSERT(!credentials_types.empty());
1898 
1899  messages.push_back("Hello");
1900  if (test_message_size_limit) {
1901  for (size_t k = 1; k < GRPC_DEFAULT_MAX_RECV_MESSAGE_LENGTH / 1024;
1902  k *= 32) {
1903  std::string big_msg;
1904  for (size_t i = 0; i < k * 1024; ++i) {
1905  char c = 'a' + (i % 26);
1906  big_msg += c;
1907  }
1908  messages.push_back(big_msg);
1909  }
1910  if (!BuiltUnderMsan()) {
1911  // 4MB message processing with SSL is very slow under msan
1912  // (causes timeouts) and doesn't really increase the signal from tests.
1913  // Reserve 100 bytes for other fields of the message proto.
1914  messages.push_back(
1916  }
1917  }
1918 
1919  // TODO (sreek) Renable tests with health check service after the issue
1920  // https://github.com/grpc/grpc/issues/11223 is resolved
1921  for (auto health_check_service : {false}) {
1922  for (auto msg = messages.begin(); msg != messages.end(); msg++) {
1923  for (auto cred = credentials_types.begin();
1924  cred != credentials_types.end(); ++cred) {
1925  scenarios.emplace_back(false, *cred, health_check_service, *msg);
1926  }
1927  if (insec_ok()) {
1928  scenarios.emplace_back(true, kInsecureCredentialsType,
1930  }
1931  }
1932  }
1933  return scenarios;
1934 }
1935 
1936 INSTANTIATE_TEST_SUITE_P(AsyncEnd2end, AsyncEnd2endTest,
1937  ::testing::ValuesIn(CreateTestScenarios(true, true)));
1938 INSTANTIATE_TEST_SUITE_P(AsyncEnd2endServerTryCancel,
1939  AsyncEnd2endServerTryCancelTest,
1941  false)));
1942 
1943 } // namespace
1944 } // namespace testing
1945 } // namespace grpc
1946 
1947 int main(int argc, char** argv) {
1948  // Change the backup poll interval from 5s to 100ms to speed up the
1949  // ReconnectChannel test
1950  GPR_GLOBAL_CONFIG_SET(grpc_client_channel_backup_poll_interval_ms, 100);
1951  grpc::testing::TestEnvironment env(&argc, argv);
1952  ::testing::InitGoogleTest(&argc, argv);
1953  int ret = RUN_ALL_TESTS();
1954  return ret;
1955 }
EXPECT_FALSE
#define EXPECT_FALSE(condition)
Definition: bloaty/third_party/googletest/googletest/include/gtest/gtest.h:1970
test_credentials_provider.h
GPR_TIMESPAN
@ GPR_TIMESPAN
Definition: gpr_types.h:45
check_grpcio_tools.content
content
Definition: check_grpcio_tools.py:26
absl::time_internal::cctz::seconds
std::chrono::duration< std::int_fast64_t > seconds
Definition: abseil-cpp/absl/time/internal/cctz/include/cctz/time_zone.h:40
testing
Definition: aws_request_signer_test.cc:25
gen_build_yaml.out
dictionary out
Definition: src/benchmark/gen_build_yaml.py:24
now
static double now(void)
Definition: test/core/fling/client.cc:130
regen-readme.it
it
Definition: regen-readme.py:15
log.h
grpc::testing::CredentialsProvider::GetChannelCredentials
virtual std::shared_ptr< ChannelCredentials > GetChannelCredentials(const std::string &type, ChannelArguments *args)=0
port.h
grpc::Status::CANCELLED
static const Status & CANCELLED
A CANCELLED pre-defined instance.
Definition: include/grpcpp/impl/codegen/status.h:115
generate.env
env
Definition: generate.py:37
grpc
Definition: grpcpp/alarm.h:33
fix_build_deps.c
list c
Definition: fix_build_deps.py:490
grpc::CompletionQueue::GOT_EVENT
@ GOT_EVENT
Definition: include/grpcpp/impl/codegen/completion_queue.h:126
GPR_GLOBAL_CONFIG_GET
#define GPR_GLOBAL_CONFIG_GET(name)
Definition: global_config_generic.h:24
false
#define false
Definition: setup_once.h:323
testing::internal::Log
GTEST_API_ void Log(LogSeverity severity, const std::string &message, int stack_frames_to_skip)
Definition: bloaty/third_party/googletest/googlemock/src/gmock-internal-utils.cc:149
absl::time_internal::cctz::time_point
std::chrono::time_point< std::chrono::system_clock, D > time_point
Definition: abseil-cpp/absl/time/internal/cctz/include/cctz/time_zone.h:39
send_request
Definition: ares_private.h:147
grpc_recycle_unused_port
void grpc_recycle_unused_port(int port)
testing::internal::string
::std::string string
Definition: bloaty/third_party/protobuf/third_party/googletest/googletest/include/gtest/internal/gtest-port.h:881
framework.rpc.grpc_channelz.Server
Server
Definition: grpc_channelz.py:42
time_now
static uint64_t time_now()
Definition: speed.cc:134
grpc::testing::CANCEL_AFTER_PROCESSING
@ CANCEL_AFTER_PROCESSING
Definition: test_service_impl.h:56
grpc::CompletionQueue::NextStatus
NextStatus
Tri-state return for AsyncNext: SHUTDOWN, GOT_EVENT, TIMEOUT.
Definition: include/grpcpp/impl/codegen/completion_queue.h:124
grpc::testing::DO_NOT_CANCEL
@ DO_NOT_CANCEL
Definition: test_service_impl.h:53
stub_
std::unique_ptr< grpc::testing::EchoTestService::Stub > stub_
Definition: async_end2end_test.cc:353
time.h
xds_manager.p
p
Definition: xds_manager.py:60
async_greeter_client.stub
stub
Definition: hellostreamingworld/async_greeter_client.py:26
setup.k
k
Definition: third_party/bloaty/third_party/capstone/bindings/python/setup.py:42
absl::base_internal::Next
static AllocList * Next(int i, AllocList *prev, LowLevelAlloc::Arena *arena)
Definition: abseil-cpp/absl/base/internal/low_level_alloc.cc:453
T
#define T(upbtypeconst, upbtype, ctype, default_value)
scenario
Definition: test/core/fling/client.cc:135
grpc::testing::detag
int detag(void *p)
Definition: interceptors_util.h:232
testing::TestWithParam
Definition: bloaty/third_party/googletest/googletest/include/gtest/gtest.h:1883
message_content
const std::string message_content
Definition: async_end2end_test.cc:238
grpc::testing::Verifier::Verifier
Verifier()
Definition: interceptors_util.h:238
grpc::testing::kInsecureCredentialsType
const char kInsecureCredentialsType[]
Definition: test_credentials_provider.h:31
health_check_service
bool health_check_service
Definition: async_end2end_test.cc:236
grpc::testing::CredentialsProvider::GetServerCredentials
virtual std::shared_ptr< ServerCredentials > GetServerCredentials(const std::string &type)=0
profile_analyzer.builder
builder
Definition: profile_analyzer.py:159
channel
wrapped_grpc_channel * channel
Definition: src/php/ext/grpc/call.h:33
asyncio_get_stats.args
args
Definition: asyncio_get_stats.py:40
absl::move
constexpr absl::remove_reference_t< T > && move(T &&t) noexcept
Definition: abseil-cpp/absl/utility/utility.h:221
GPR_ASSERT
#define GPR_ASSERT(x)
Definition: include/grpc/impl/codegen/log.h:94
ev_posix.h
max
int max
Definition: bloaty/third_party/zlib/examples/enough.c:170
server_address_
std::ostringstream server_address_
Definition: async_end2end_test.cc:357
gpr_log
GPRAPI void gpr_log(const char *file, int line, gpr_log_severity severity, const char *format,...) GPR_PRINT_FORMAT_CHECK(4
Verify
static int Verify(X509 *leaf, const std::vector< X509 * > &roots, const std::vector< X509 * > &intermediates, const std::vector< X509_CRL * > &crls, unsigned long flags=0, std::function< void(X509_VERIFY_PARAM *)> configure_callback=nullptr, int(*verify_callback)(int, X509_STORE_CTX *)=nullptr)
Definition: x509_test.cc:1111
grpc::testing::Verifier::ExpectMaybe
Verifier & ExpectMaybe(int i, bool expect_ok, bool *seen)
Definition: interceptors_util.h:254
gpr_sleep_until
GPRAPI void gpr_sleep_until(gpr_timespec until)
grpc.h
expectations_
std::map< void *, bool > expectations_
Definition: async_end2end_test.cc:201
grpc::testing::Verifier::GotTag
void GotTag(void *got_tag, bool ok, bool ignore_ok)
Definition: interceptors_util.h:319
grpc_test_slowdown_factor
int64_t grpc_test_slowdown_factor()
Definition: test/core/util/test_config.cc:76
channel.h
grpc::testing::ServerTryCancelRequestPhase
ServerTryCancelRequestPhase
Definition: test_service_impl.h:52
grpc::testing::INSTANTIATE_TEST_SUITE_P
INSTANTIATE_TEST_SUITE_P(HistogramTestCases, HistogramTest, ::testing::Range< int >(0, GRPC_STATS_HISTOGRAM_COUNT))
backup_poller.h
intptr_t
_W64 signed int intptr_t
Definition: stdint-msvc2008.h:118
grpc::Status::OK
static const Status & OK
An OK pre-defined instance.
Definition: include/grpcpp/impl/codegen/status.h:113
health_check_service_server_builder_option.h
scenarios
static const scenario scenarios[]
Definition: test/core/fling/client.cc:141
grpc::StatusCode
StatusCode
Definition: grpcpp/impl/codegen/status_code_enum.h:26
main
int main(int argc, char **argv)
Definition: async_end2end_test.cc:1947
RUN_ALL_TESTS
int RUN_ALL_TESTS() GTEST_MUST_USE_RESULT_
Definition: bloaty/third_party/googletest/googletest/include/gtest/gtest.h:2471
GPR_ERROR
#define GPR_ERROR
Definition: include/grpc/impl/codegen/log.h:57
grpc::ServerBuilderOption
Interface to pass an option to a ServerBuilder.
Definition: grpcpp/impl/server_builder_option.h:31
grpc_pick_unused_port_or_die
int grpc_pick_unused_port_or_die(void)
grpc::testing::Verifier::MaybeExpect::seen
bool * seen
Definition: interceptors_util.h:345
gpr_now
GPRAPI gpr_timespec gpr_now(gpr_clock_type clock)
grpc_core::UniquePtr
std::unique_ptr< T, DefaultDeleteChar > UniquePtr
Definition: src/core/lib/gprpp/memory.h:43
GPR_GLOBAL_CONFIG_SET
#define GPR_GLOBAL_CONFIG_SET(name, value)
Definition: global_config_generic.h:26
grpc.StatusCode.UNIMPLEMENTED
tuple UNIMPLEMENTED
Definition: src/python/grpcio/grpc/__init__.py:276
msg
std::string msg
Definition: client_interceptors_end2end_test.cc:372
grpc::testing::tag
static void * tag(intptr_t t)
Definition: h2_ssl_cert_test.cc:263
GRPC_DEFAULT_MAX_RECV_MESSAGE_LENGTH
#define GRPC_DEFAULT_MAX_RECV_MESSAGE_LENGTH
Definition: grpc_types.h:504
test_config.h
server_
std::unique_ptr< Server > server_
Definition: async_end2end_test.cc:354
port_
int port_
Definition: async_end2end_test.cc:358
client_context.h
testing::InitGoogleTest
GTEST_API_ void InitGoogleTest(int *argc, char **argv)
Definition: bloaty/third_party/googletest/googletest/src/gtest.cc:6106
grpc::testing::Verifier::DoOnceThenAsyncNext
CompletionQueue::NextStatus DoOnceThenAsyncNext(CompletionQueue *cq, void **got_tag, bool *ok, T deadline, std::function< void(void)> lambda)
Definition: interceptors_util.h:272
inproc
bool inproc
Definition: async_end2end_test.cc:235
gpr_time_add
GPRAPI gpr_timespec gpr_time_add(gpr_timespec a, gpr_timespec b)
Definition: src/core/lib/gpr/time.cc:135
server
Definition: examples/python/async_streaming/server.py:1
grpc::testing::GetCredentialsProvider
CredentialsProvider * GetCredentialsProvider()
Definition: test_credentials_provider.cc:169
BuiltUnderMsan
bool BuiltUnderMsan()
Definition: build.cc:55
port.h
lambda_run_
bool lambda_run_
Definition: async_end2end_test.cc:203
absl::str_format_internal::LengthMod::t
@ t
ret
UniquePtr< SSL_SESSION > ret
Definition: ssl_x509.cc:1029
grpc::testing::ToString
std::string ToString(const grpc::string_ref &r)
Definition: string_ref_helper.cc:24
alloc.h
service_
std::unique_ptr< grpc::testing::EchoTestService::AsyncService > service_
Definition: async_end2end_test.cc:355
credentials_type
const std::string credentials_type
Definition: async_end2end_test.cc:237
server_context.h
grpc::CompletionQueue::TIMEOUT
@ TIMEOUT
deadline was reached.
Definition: include/grpcpp/impl/codegen/completion_queue.h:128
grpc::testing::TestEnvironment
Definition: test/core/util/test_config.h:54
grpc::testing::Verifier::MaybeExpect::ok
bool ok
Definition: interceptors_util.h:344
grpc::CreateCustomChannel
std::shared_ptr< Channel > CreateCustomChannel(const grpc::string &target, const std::shared_ptr< ChannelCredentials > &creds, const ChannelArguments &args)
grpc::protobuf::util::Status
GRPC_CUSTOM_UTIL_STATUS Status
Definition: include/grpcpp/impl/codegen/config_protobuf.h:93
health_check_
HealthCheck health_check_
Definition: async_end2end_test.cc:356
CreateTestScenarios
std::vector< std::string > CreateTestScenarios()
Definition: time_jump_test.cc:84
ok
bool ok
Definition: async_end2end_test.cc:197
tls.h
grpc::testing::EXPECT_EQ
EXPECT_EQ(options.token_exchange_service_uri, "https://foo/exchange")
grpc::testing::CredentialsProvider::GetSecureCredentialsTypeList
virtual std::vector< std::string > GetSecureCredentialsTypeList()=0
EXPECT_GE
#define EXPECT_GE(val1, val2)
Definition: bloaty/third_party/googletest/googletest/include/gtest/gtest.h:2034
grpc::testing::CANCEL_DURING_PROCESSING
@ CANCEL_DURING_PROCESSING
Definition: test_service_impl.h:55
gpr_time_from_millis
GPRAPI gpr_timespec gpr_time_from_millis(int64_t ms, gpr_clock_type clock_type)
Definition: src/core/lib/gpr/time.cc:119
grpc.StatusCode.CANCELLED
tuple CANCELLED
Definition: src/python/grpcio/grpc/__init__.py:261
GPR_DEBUG
#define GPR_DEBUG
Definition: include/grpc/impl/codegen/log.h:55
maybe_expectations_
std::map< void *, MaybeExpect > maybe_expectations_
Definition: async_end2end_test.cc:202
grpc::testing::TEST_P
TEST_P(HistogramTest, IncHistogram)
Definition: stats_test.cc:87
grpc::testing::EXPECT_TRUE
EXPECT_TRUE(grpc::experimental::StsCredentialsOptionsFromJson(minimum_valid_json, &options) .ok())
testing::ValuesIn
internal::ParamGenerator< typename std::iterator_traits< ForwardIterator >::value_type > ValuesIn(ForwardIterator begin, ForwardIterator end)
Definition: bloaty/third_party/googletest/googletest/include/gtest/gtest-param-test.h:297
server.h
grpc::ServerAsyncResponseWriter
Definition: grpcpp/impl/codegen/async_unary_call.h:295
function
std::function< bool(GrpcTool *, int, const char **, const CliCredentials &, GrpcToolOutputCallback)> function
Definition: grpc_tool.cc:250
grpc::operator<<
std::ostream & operator<<(std::ostream &out, const string_ref &string)
Definition: grpcpp/impl/codegen/string_ref.h:145
GPR_CLOCK_REALTIME
@ GPR_CLOCK_REALTIME
Definition: gpr_types.h:39
seen
bool * seen
Definition: async_end2end_test.cc:198
t1
Table t1
Definition: abseil-cpp/absl/container/internal/raw_hash_set_allocator_test.cc:185
string_ref_helper.h
to_string
static bool to_string(zval *from)
Definition: protobuf/php/ext/google/protobuf/convert.c:333
grpc::testing::SendRpc
static void SendRpc(grpc::testing::EchoTestService::Stub *stub, int num_rpcs, bool allow_exhaustion, gpr_atm *errors)
Definition: thread_stress_test.cc:277
grpc::testing::Verifier::ExpectUnless
Verifier & ExpectUnless(int i, bool expect_ok, bool seen)
Definition: interceptors_util.h:245
grpc::testing::CANCEL_BEFORE_PROCESSING
@ CANCEL_BEFORE_PROCESSING
Definition: test_service_impl.h:54
thread
static uv_thread_t thread
Definition: test-async-null-cb.c:29
server_builder.h
testing::internal::Expect
void Expect(bool condition, const char *file, int line, const std::string &msg)
Definition: bloaty/third_party/googletest/googlemock/include/gmock/internal/gmock-internal-utils.h:282
cq
static grpc_completion_queue * cq
Definition: test/core/fling/client.cc:37
i
uint64_t i
Definition: abseil-cpp/absl/container/btree_benchmark.cc:230
create_channel.h
cq_
std::unique_ptr< ServerCompletionQueue > cq_
Definition: async_end2end_test.cc:352


grpc
Author(s):
autogenerated on Thu Mar 13 2025 02:58:35