nonblocking_test.cc
Go to the documentation of this file.
1 /*
2  *
3  * Copyright 2018 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
19 #include <memory>
20 
21 #include "absl/memory/memory.h"
22 
23 #include <grpcpp/channel.h>
24 #include <grpcpp/client_context.h>
25 #include <grpcpp/create_channel.h>
26 #include <grpcpp/server.h>
27 #include <grpcpp/server_builder.h>
28 #include <grpcpp/server_context.h>
29 
30 #include "src/core/lib/gpr/tls.h"
32 #include "src/proto/grpc/testing/echo.grpc.pb.h"
33 #include "test/core/util/port.h"
35 
36 #ifdef GRPC_POSIX_SOCKET
38 #endif // GRPC_POSIX_SOCKET
39 
40 #include <gtest/gtest.h>
41 
42 #ifdef GRPC_POSIX_SOCKET
43 // Thread-local variable to so that only polls from this test assert
44 // non-blocking (not polls from resolver, timer thread, etc), and only when the
45 // thread is waiting on polls caused by CompletionQueue::AsyncNext (not for
46 // picking a port or other reasons).
47 static GPR_THREAD_LOCAL(bool) g_is_nonblocking_poll;
48 
49 namespace {
50 
51 int maybe_assert_non_blocking_poll(struct pollfd* pfds, nfds_t nfds,
52  int timeout) {
53  // Only assert that this poll should have zero timeout if we're in the
54  // middle of a zero-timeout CQ Next.
55  if (g_is_nonblocking_poll) {
56  GPR_ASSERT(timeout == 0);
57  }
58  return poll(pfds, nfds, timeout);
59 }
60 
61 } // namespace
62 
63 namespace grpc {
64 namespace testing {
65 namespace {
66 
67 void* tag(int i) { return reinterpret_cast<void*>(static_cast<intptr_t>(i)); }
68 int detag(void* p) { return static_cast<int>(reinterpret_cast<intptr_t>(p)); }
69 
70 class NonblockingTest : public ::testing::Test {
71  protected:
72  NonblockingTest() {}
73 
74  void SetUp() override {
76  server_address_ << "localhost:" << port_;
77 
78  // Setup server
79  BuildAndStartServer();
80  }
81 
82  bool LoopForTag(void** tag, bool* ok) {
83  // Temporarily set the thread-local nonblocking poll flag so that the polls
84  // caused by this loop are indeed sent by the library with zero timeout.
85  bool orig_val = g_is_nonblocking_poll;
86  g_is_nonblocking_poll = true;
87  for (;;) {
88  auto r = cq_->AsyncNext(tag, ok, gpr_time_0(GPR_CLOCK_REALTIME));
90  g_is_nonblocking_poll = orig_val;
91  return false;
92  } else if (r == CompletionQueue::GOT_EVENT) {
93  g_is_nonblocking_poll = orig_val;
94  return true;
95  }
96  }
97  }
98 
99  void TearDown() override {
100  server_->Shutdown();
101  void* ignored_tag;
102  bool ignored_ok;
103  cq_->Shutdown();
104  while (LoopForTag(&ignored_tag, &ignored_ok)) {
105  }
106  stub_.reset();
108  }
109 
110  void BuildAndStartServer() {
111  ServerBuilder builder;
112  builder.AddListeningPort(server_address_.str(),
114  service_ =
115  absl::make_unique<grpc::testing::EchoTestService::AsyncService>();
116  builder.RegisterService(service_.get());
117  cq_ = builder.AddCompletionQueue();
118  server_ = builder.BuildAndStart();
119  }
120 
121  void ResetStub() {
122  std::shared_ptr<Channel> channel = grpc::CreateChannel(
124  stub_ = grpc::testing::EchoTestService::NewStub(channel);
125  }
126 
127  void SendRpc(int num_rpcs) {
128  for (int i = 0; i < num_rpcs; i++) {
129  EchoRequest send_request;
130  EchoRequest recv_request;
131  EchoResponse send_response;
132  EchoResponse recv_response;
133  Status recv_status;
134 
135  ClientContext cli_ctx;
136  ServerContext srv_ctx;
137  grpc::ServerAsyncResponseWriter<EchoResponse> response_writer(&srv_ctx);
138 
139  send_request.set_message("hello non-blocking world");
140  std::unique_ptr<ClientAsyncResponseReader<EchoResponse>> response_reader(
141  stub_->PrepareAsyncEcho(&cli_ctx, send_request, cq_.get()));
142 
143  response_reader->StartCall();
144  response_reader->Finish(&recv_response, &recv_status, tag(4));
145 
146  service_->RequestEcho(&srv_ctx, &recv_request, &response_writer,
147  cq_.get(), cq_.get(), tag(2));
148 
149  void* got_tag;
150  bool ok;
151  EXPECT_TRUE(LoopForTag(&got_tag, &ok));
152  EXPECT_TRUE(ok);
153  EXPECT_EQ(detag(got_tag), 2);
154  EXPECT_EQ(send_request.message(), recv_request.message());
155 
156  send_response.set_message(recv_request.message());
157  response_writer.Finish(send_response, Status::OK, tag(3));
158 
159  int tagsum = 0;
160  int tagprod = 1;
161  EXPECT_TRUE(LoopForTag(&got_tag, &ok));
162  EXPECT_TRUE(ok);
163  tagsum += detag(got_tag);
164  tagprod *= detag(got_tag);
165 
166  EXPECT_TRUE(LoopForTag(&got_tag, &ok));
167  EXPECT_TRUE(ok);
168  tagsum += detag(got_tag);
169  tagprod *= detag(got_tag);
170 
171  EXPECT_EQ(tagsum, 7);
172  EXPECT_EQ(tagprod, 12);
173  EXPECT_EQ(send_response.message(), recv_response.message());
174  EXPECT_TRUE(recv_status.ok());
175  }
176  }
177 
178  std::unique_ptr<ServerCompletionQueue> cq_;
179  std::unique_ptr<grpc::testing::EchoTestService::Stub> stub_;
180  std::unique_ptr<Server> server_;
181  std::unique_ptr<grpc::testing::EchoTestService::AsyncService> service_;
182  std::ostringstream server_address_;
183  int port_;
184 };
185 
186 TEST_F(NonblockingTest, SimpleRpc) {
187  ResetStub();
188  SendRpc(10);
189 }
190 
191 } // namespace
192 } // namespace testing
193 } // namespace grpc
194 
195 #endif // GRPC_POSIX_SOCKET
196 
197 int main(int argc, char** argv) {
198 #ifdef GRPC_POSIX_SOCKET
199  // Override the poll function before anything else can happen
200  grpc_poll_function = maybe_assert_non_blocking_poll;
201 
202  grpc::testing::TestEnvironment env(&argc, argv);
203  ::testing::InitGoogleTest(&argc, argv);
204 
205  // Start the nonblocking poll thread-local variable as false because the
206  // thread that issues RPCs starts by picking a port (which has non-zero
207  // timeout).
208  g_is_nonblocking_poll = false;
209 
210  int ret = RUN_ALL_TESTS();
211 
212  return ret;
213 #else // GRPC_POSIX_SOCKET
214  (void)argc;
215  (void)argv;
216  return 0;
217 #endif // GRPC_POSIX_SOCKET
218 }
cq_
grpc_completion_queue * cq_
Definition: channel_connectivity.cc:210
testing
Definition: aws_request_signer_test.cc:25
grpc::CompletionQueue::SHUTDOWN
@ SHUTDOWN
The completion queue has been shutdown and fully-drained.
Definition: include/grpcpp/impl/codegen/completion_queue.h:125
stub_
std::unique_ptr< grpc::testing::EchoTestService::Stub > stub_
Definition: client_channel_stress_test.cc:331
port.h
generate.env
env
Definition: generate.py:37
grpc
Definition: grpcpp/alarm.h:33
grpc::CompletionQueue::GOT_EVENT
@ GOT_EVENT
Definition: include/grpcpp/impl/codegen/completion_queue.h:126
gpr_time_0
GPRAPI gpr_timespec gpr_time_0(gpr_clock_type type)
Definition: src/core/lib/gpr/time.cc:47
send_request
Definition: ares_private.h:147
grpc_recycle_unused_port
void grpc_recycle_unused_port(int port)
xds_manager.p
p
Definition: xds_manager.py:60
testing::Test
Definition: bloaty/third_party/googletest/googletest/include/gtest/gtest.h:402
server_
Server *const server_
Definition: chttp2_server.cc:260
grpc::testing::detag
int detag(void *p)
Definition: interceptors_util.h:232
profile_analyzer.builder
builder
Definition: profile_analyzer.py:159
channel
wrapped_grpc_channel * channel
Definition: src/php/ext/grpc/call.h:33
GPR_ASSERT
#define GPR_ASSERT(x)
Definition: include/grpc/impl/codegen/log.h:94
ev_posix.h
channel.h
intptr_t
_W64 signed int intptr_t
Definition: stdint-msvc2008.h:118
grpc::Status::OK
static const Status & OK
An OK pre-defined instance.
Definition: include/grpcpp/impl/codegen/status.h:113
grpc::CreateChannel
std::shared_ptr< Channel > CreateChannel(const grpc::string &target, const std::shared_ptr< ChannelCredentials > &creds)
RUN_ALL_TESTS
int RUN_ALL_TESTS() GTEST_MUST_USE_RESULT_
Definition: bloaty/third_party/googletest/googletest/include/gtest/gtest.h:2471
GPR_THREAD_LOCAL
#define GPR_THREAD_LOCAL(type)
Definition: tls.h:151
grpc_pick_unused_port_or_die
int grpc_pick_unused_port_or_die(void)
grpc::testing::tag
static void * tag(intptr_t t)
Definition: h2_ssl_cert_test.cc:263
main
int main(int argc, char **argv)
Definition: nonblocking_test.cc:197
test_config.h
client_context.h
testing::InitGoogleTest
GTEST_API_ void InitGoogleTest(int *argc, char **argv)
Definition: bloaty/third_party/googletest/googletest/src/gtest.cc:6106
grpc::testing::TEST_F
TEST_F(ChannelArgumentsTest, SetInt)
Definition: channel_arguments_test.cc:134
port.h
ret
UniquePtr< SSL_SESSION > ret
Definition: ssl_x509.cc:1029
server_context.h
fix_build_deps.r
r
Definition: fix_build_deps.py:491
grpc::testing::TestEnvironment
Definition: test/core/util/test_config.h:54
grpc::protobuf::util::Status
GRPC_CUSTOM_UTIL_STATUS Status
Definition: include/grpcpp/impl/codegen/config_protobuf.h:93
ok
bool ok
Definition: async_end2end_test.cc:197
server_address_
const char * server_address_
Definition: settings_timeout_test.cc:231
tls.h
grpc::testing::EXPECT_EQ
EXPECT_EQ(options.token_exchange_service_uri, "https://foo/exchange")
grpc::InsecureServerCredentials
std::shared_ptr< ServerCredentials > InsecureServerCredentials()
Definition: insecure_server_credentials.cc:52
grpc::testing::EXPECT_TRUE
EXPECT_TRUE(grpc::experimental::StsCredentialsOptionsFromJson(minimum_valid_json, &options) .ok())
server.h
grpc::ServerAsyncResponseWriter
Definition: grpcpp/impl/codegen/async_unary_call.h:295
grpc::InsecureChannelCredentials
std::shared_ptr< ChannelCredentials > InsecureChannelCredentials()
Credentials for an unencrypted, unauthenticated channel.
Definition: cpp/client/insecure_credentials.cc:69
GPR_CLOCK_REALTIME
@ GPR_CLOCK_REALTIME
Definition: gpr_types.h:39
port_
int port_
Definition: streams_not_seen_test.cc:377
grpc_poll_function
grpc_poll_function_type grpc_poll_function
grpc::testing::SendRpc
static void SendRpc(grpc::testing::EchoTestService::Stub *stub, int num_rpcs, bool allow_exhaustion, gpr_atm *errors)
Definition: thread_stress_test.cc:277
server_builder.h
timeout
uv_timer_t timeout
Definition: libuv/docs/code/uvwget/main.c:9
i
uint64_t i
Definition: abseil-cpp/absl/container/btree_benchmark.cc:230
create_channel.h
service_
std::unique_ptr< grpc::testing::TestServiceImpl > service_
Definition: end2end_binder_transport_test.cc:71


grpc
Author(s):
autogenerated on Thu Mar 13 2025 03:00:43