24 #include <gtest/gtest.h>
31 #include "src/proto/grpc/testing/echo.grpc.pb.h"
34 using std::chrono::system_clock;
54 const std::string& expected_transport_security_type,
57 std::vector<grpc::string_ref> tst =
58 auth_ctx->FindPropertyValues(
"transport_security_type");
61 if (expected_client_identity.empty()) {
62 EXPECT_TRUE(auth_ctx->GetPeerIdentityPropertyName().empty());
66 auto identity = auth_ctx->GetPeerIdentity();
69 EXPECT_EQ(expected_client_identity, identity[0]);
76 const std::multimap<grpc::string_ref, grpc::string_ref>&
metadata,
79 for (
const auto& metadatum :
metadata) {
90 const std::multimap<grpc::string_ref, grpc::string_ref>&
metadata,
103 const std::multimap<grpc::string_ref, grpc::string_ref>&
metadata,
113 while (!
context->IsCancelled()) {
123 "Server called TryCancelNonblocking() to cancel the request");
140 async_cancel_check_ =
std::thread([
this] { (void)
ctx_->IsCancelled(); });
145 request->param().server_notify_client_when_started()) {
146 service->signaller_.SignalClientThatRpcStarted();
151 service_->signaller_.ServerWaitToContinue();
160 if (
req_->has_param() &&
req_->param().server_sleep_us() > 0) {
166 [
this](
bool ok) { NonDelayed(ok); });
171 void OnSendInitialMetadataDone(
bool ok)
override {
173 initial_metadata_sent_ =
true;
175 void OnCancel()
override {
178 on_cancel_invoked_ =
true;
179 std::lock_guard<std::mutex> l(cancel_mu_);
180 cancel_cv_.notify_one();
182 void OnDone()
override {
183 if (
req_->has_param() &&
req_->param().echo_metadata_initially()) {
188 if (
req_->has_param() &&
req_->param().has_expected_error()) {
191 async_cancel_check_.join();
192 if (rpc_wait_thread_.joinable()) {
193 rpc_wait_thread_.join();
195 if (finish_when_cancelled_.joinable()) {
196 finish_when_cancelled_.join();
202 void NonDelayed(
bool ok) {
208 if (
req_->has_param() &&
req_->param().server_die()) {
212 if (
req_->has_param() &&
req_->param().has_expected_error()) {
213 const auto&
error =
req_->param().expected_error();
215 error.error_message(),
error.binary_error_details()));
228 FinishWhenCancelledAsync();
235 }
else if (
req_->has_param() &&
236 req_->param().echo_host_from_authority_header()) {
237 auto authority =
ctx_->ExperimentalGetAuthority();
238 std::string authority_str(authority.data(), authority.size());
241 if (
req_->has_param() &&
req_->param().client_cancel_after_us()) {
243 std::unique_lock<std::mutex> lock(
service_->mu_);
246 FinishWhenCancelledAsync();
248 }
else if (
req_->has_param() &&
req_->param().server_cancel_after_us()) {
251 req_->param().server_cancel_after_us(),
253 [
this](
bool) { Finish(Status::CANCELLED); });
255 }
else if (!
req_->has_param() || !
req_->param().skip_cancelled_check()) {
259 if (
req_->has_param() &&
req_->param().echo_metadata_initially()) {
260 const std::multimap<grpc::string_ref, grpc::string_ref>&
261 client_metadata =
ctx_->client_metadata();
262 for (
const auto& metadatum : client_metadata) {
266 StartSendInitialMetadata();
269 if (
req_->has_param() &&
req_->param().echo_metadata()) {
270 const std::multimap<grpc::string_ref, grpc::string_ref>&
271 client_metadata =
ctx_->client_metadata();
272 for (
const auto& metadatum : client_metadata) {
277 if (
req_->param().debug_info().stack_entries_size() ||
278 !
req_->param().debug_info().detail().empty()) {
280 req_->param().debug_info().SerializeAsString();
282 serialized_debug_info);
287 if (
req_->has_param() &&
288 (
req_->param().expected_client_identity().length() > 0 ||
289 req_->param().check_auth_context())) {
291 ctx_,
req_->param().expected_transport_security_type(),
292 req_->param().expected_client_identity());
294 if (
req_->has_param() &&
req_->param().response_message_length() > 0) {
298 if (
req_->has_param() &&
req_->param().echo_peer()) {
299 resp_->mutable_param()->set_peer(
ctx_->peer());
303 void FinishWhenCancelledAsync() {
305 std::unique_lock<std::mutex> l(cancel_mu_);
306 cancel_cv_.wait(l, [
this] {
return ctx_->IsCancelled(); });
311 CallbackTestServiceImpl*
const service_;
313 const EchoRequest*
const req_;
314 EchoResponse*
const resp_;
317 std::condition_variable cancel_cv_;
318 bool initial_metadata_sent_ =
false;
320 bool on_cancel_invoked_ =
false;
342 void OnDone()
override {
delete this; }
369 int server_try_cancel)
383 void OnDone()
override {
delete this; }
384 void OnCancel()
override {
389 void OnReadDone(
bool ok)
override {
410 void FinishOnce(
const Status& s) {
411 std::lock_guard<std::mutex> l(finish_mu_);
421 int num_msgs_read_{0};
424 bool finished_{
false};
425 bool setup_done_{
false};
452 int server_try_cancel)
469 void OnDone()
override {
delete this; }
470 void OnCancel()
override {
475 void OnWriteDone(
bool )
override {
478 }
else if (server_coalescing_api_ != 0) {
490 void FinishOnce(
const Status& s) {
491 std::lock_guard<std::mutex> l(finish_mu_);
502 server_coalescing_api_ != 0) {
504 std::lock_guard<std::mutex> l(finish_mu_);
513 std::lock_guard<std::mutex> l(finish_mu_);
525 int server_coalescing_api_;
526 int server_responses_to_send_;
528 bool finished_{
false};
529 bool setup_done_{
false};
563 void OnDone()
override {
566 std::lock_guard<std::mutex> l(finish_mu_);
568 finish_thread_.join();
572 void OnCancel()
override {
577 void OnReadDone(
bool ok)
override {
581 std::lock_guard<std::mutex> l(finish_mu_);
583 if (num_msgs_read_ == server_write_last_) {
591 }
else if (client_try_cancel_) {
603 void OnWriteDone(
bool )
override {
604 std::lock_guard<std::mutex> l(finish_mu_);
611 void FinishOnce(
const Status& s) {
612 std::lock_guard<std::mutex> l(finish_mu_);
617 std::lock_guard<std::mutex> l(finish_mu_);
626 int num_msgs_read_{0};
628 int server_write_last_;
630 bool finished_{
false};
631 bool setup_done_{
false};
633 bool client_try_cancel_ =
false;