message_allocator_end2end_test.cc
Go to the documentation of this file.
1 /*
2  *
3  * Copyright 2019 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
19 #include <algorithm>
20 #include <atomic>
21 #include <condition_variable>
22 #include <functional>
23 #include <memory>
24 #include <mutex>
25 #include <sstream>
26 #include <thread>
27 
28 #include <google/protobuf/arena.h>
29 #include <gtest/gtest.h>
30 
31 #include <grpc/impl/codegen/log.h>
32 #include <grpcpp/channel.h>
33 #include <grpcpp/client_context.h>
34 #include <grpcpp/create_channel.h>
35 #include <grpcpp/server.h>
36 #include <grpcpp/server_builder.h>
37 #include <grpcpp/server_context.h>
40 
42 #include "src/proto/grpc/testing/echo.grpc.pb.h"
43 #include "test/core/util/port.h"
46 
47 namespace grpc {
48 namespace testing {
49 namespace {
50 
51 class CallbackTestServiceImpl : public EchoTestService::CallbackService {
52  public:
53  explicit CallbackTestServiceImpl() {}
54 
55  void SetAllocatorMutator(
56  std::function<void(RpcAllocatorState* allocator_state,
57  const EchoRequest* req, EchoResponse* resp)>
58  mutator) {
59  allocator_mutator_ = std::move(mutator);
60  }
61 
62  ServerUnaryReactor* Echo(CallbackServerContext* context,
63  const EchoRequest* request,
64  EchoResponse* response) override {
65  response->set_message(request->message());
66  if (allocator_mutator_) {
67  allocator_mutator_(context->GetRpcAllocatorState(), request, response);
68  }
69  auto* reactor = context->DefaultReactor();
70  reactor->Finish(Status::OK);
71  return reactor;
72  }
73 
74  private:
75  std::function<void(RpcAllocatorState* allocator_state, const EchoRequest* req,
76  EchoResponse* resp)>
78 };
79 
80 enum class Protocol { INPROC, TCP };
81 
82 class TestScenario {
83  public:
84  TestScenario(Protocol protocol, const std::string& creds_type)
85  : protocol(protocol), credentials_type(creds_type) {}
86  void Log() const;
87  Protocol protocol;
89 };
90 
91 std::ostream& operator<<(std::ostream& out, const TestScenario& scenario) {
92  return out << "TestScenario{protocol="
93  << (scenario.protocol == Protocol::INPROC ? "INPROC" : "TCP")
94  << "," << scenario.credentials_type << "}";
95 }
96 
97 void TestScenario::Log() const {
98  std::ostringstream out;
99  out << *this;
100  gpr_log(GPR_INFO, "%s", out.str().c_str());
101 }
102 
103 class MessageAllocatorEnd2endTestBase
104  : public ::testing::TestWithParam<TestScenario> {
105  protected:
106  MessageAllocatorEnd2endTestBase() { GetParam().Log(); }
107 
108  ~MessageAllocatorEnd2endTestBase() override = default;
109 
110  void CreateServer(MessageAllocator<EchoRequest, EchoResponse>* allocator) {
111  ServerBuilder builder;
112 
113  auto server_creds = GetCredentialsProvider()->GetServerCredentials(
114  GetParam().credentials_type);
115  if (GetParam().protocol == Protocol::TCP) {
117  server_address_ << "localhost:" << picked_port_;
118  builder.AddListeningPort(server_address_.str(), server_creds);
119  }
120  callback_service_.SetMessageAllocatorFor_Echo(allocator);
121  builder.RegisterService(&callback_service_);
122 
123  server_ = builder.BuildAndStart();
124  }
125 
126  void DestroyServer() {
127  if (server_) {
128  server_->Shutdown();
129  server_.reset();
130  }
131  }
132 
133  void ResetStub() {
134  ChannelArguments args;
135  auto channel_creds = GetCredentialsProvider()->GetChannelCredentials(
136  GetParam().credentials_type, &args);
137  switch (GetParam().protocol) {
138  case Protocol::TCP:
140  channel_creds, args);
141  break;
142  case Protocol::INPROC:
143  channel_ = server_->InProcessChannel(args);
144  break;
145  default:
146  assert(false);
147  }
148  stub_ = EchoTestService::NewStub(channel_);
149  }
150 
151  void TearDown() override {
152  DestroyServer();
153  if (picked_port_ > 0) {
155  }
156  }
157 
158  void SendRpcs(int num_rpcs) {
159  std::string test_string("");
160  for (int i = 0; i < num_rpcs; i++) {
161  EchoRequest request;
162  EchoResponse response;
163  ClientContext cli_ctx;
164 
165  test_string += std::string(1024, 'x');
166  request.set_message(test_string);
167  std::string val;
168  cli_ctx.set_compression_algorithm(GRPC_COMPRESS_GZIP);
169 
170  std::mutex mu;
171  std::condition_variable cv;
172  bool done = false;
173  stub_->async()->Echo(
174  &cli_ctx, &request, &response,
175  [&request, &response, &done, &mu, &cv, val](Status s) {
176  GPR_ASSERT(s.ok());
177 
178  EXPECT_EQ(request.message(), response.message());
179  std::lock_guard<std::mutex> l(mu);
180  done = true;
181  cv.notify_one();
182  });
183  std::unique_lock<std::mutex> l(mu);
184  while (!done) {
185  cv.wait(l);
186  }
187  }
188  }
189 
190  int picked_port_{0};
191  std::shared_ptr<Channel> channel_;
192  std::unique_ptr<EchoTestService::Stub> stub_;
193  CallbackTestServiceImpl callback_service_;
194  std::unique_ptr<Server> server_;
195  std::ostringstream server_address_;
196 };
197 
198 class NullAllocatorTest : public MessageAllocatorEnd2endTestBase {};
199 
200 TEST_P(NullAllocatorTest, SimpleRpc) {
201  CreateServer(nullptr);
202  ResetStub();
203  SendRpcs(1);
204 }
205 
206 class SimpleAllocatorTest : public MessageAllocatorEnd2endTestBase {
207  public:
208  class SimpleAllocator : public MessageAllocator<EchoRequest, EchoResponse> {
209  public:
210  class MessageHolderImpl : public MessageHolder<EchoRequest, EchoResponse> {
211  public:
212  MessageHolderImpl(std::atomic_int* request_deallocation_count,
213  std::atomic_int* messages_deallocation_count)
216  set_request(new EchoRequest);
217  set_response(new EchoResponse);
218  }
219  void Release() override {
220  (*messages_deallocation_count_)++;
221  delete request();
222  delete response();
223  delete this;
224  }
225  void FreeRequest() override {
226  (*request_deallocation_count_)++;
227  delete request();
228  set_request(nullptr);
229  }
230 
231  EchoRequest* ReleaseRequest() {
232  auto* ret = request();
233  set_request(nullptr);
234  return ret;
235  }
236 
237  private:
238  std::atomic_int* const request_deallocation_count_;
239  std::atomic_int* const messages_deallocation_count_;
240  };
241  MessageHolder<EchoRequest, EchoResponse>* AllocateMessages() override {
243  return new MessageHolderImpl(&request_deallocation_count,
245  }
247  std::atomic_int request_deallocation_count{0};
248  std::atomic_int messages_deallocation_count{0};
249  };
250 };
251 
252 TEST_P(SimpleAllocatorTest, SimpleRpc) {
253  const int kRpcCount = 10;
254  std::unique_ptr<SimpleAllocator> allocator(new SimpleAllocator);
255  CreateServer(allocator.get());
256  ResetStub();
257  SendRpcs(kRpcCount);
258  // messages_deallocaton_count is updated in Release after server side OnDone.
259  // Destroy server to make sure it has been updated.
260  DestroyServer();
261  EXPECT_EQ(kRpcCount, allocator->allocation_count);
262  EXPECT_EQ(kRpcCount, allocator->messages_deallocation_count);
263  EXPECT_EQ(0, allocator->request_deallocation_count);
264 }
265 
266 TEST_P(SimpleAllocatorTest, RpcWithEarlyFreeRequest) {
267  const int kRpcCount = 10;
268  std::unique_ptr<SimpleAllocator> allocator(new SimpleAllocator);
269  auto mutator = [](RpcAllocatorState* allocator_state, const EchoRequest* req,
270  EchoResponse* resp) {
271  auto* info =
272  static_cast<SimpleAllocator::MessageHolderImpl*>(allocator_state);
273  EXPECT_EQ(req, info->request());
274  EXPECT_EQ(resp, info->response());
275  allocator_state->FreeRequest();
276  EXPECT_EQ(nullptr, info->request());
277  };
278  callback_service_.SetAllocatorMutator(mutator);
279  CreateServer(allocator.get());
280  ResetStub();
281  SendRpcs(kRpcCount);
282  // messages_deallocaton_count is updated in Release after server side OnDone.
283  // Destroy server to make sure it has been updated.
284  DestroyServer();
285  EXPECT_EQ(kRpcCount, allocator->allocation_count);
286  EXPECT_EQ(kRpcCount, allocator->messages_deallocation_count);
287  EXPECT_EQ(kRpcCount, allocator->request_deallocation_count);
288 }
289 
290 TEST_P(SimpleAllocatorTest, RpcWithReleaseRequest) {
291  const int kRpcCount = 10;
292  std::unique_ptr<SimpleAllocator> allocator(new SimpleAllocator);
293  std::vector<EchoRequest*> released_requests;
294  auto mutator = [&released_requests](RpcAllocatorState* allocator_state,
295  const EchoRequest* req,
296  EchoResponse* resp) {
297  auto* info =
298  static_cast<SimpleAllocator::MessageHolderImpl*>(allocator_state);
299  EXPECT_EQ(req, info->request());
300  EXPECT_EQ(resp, info->response());
301  released_requests.push_back(info->ReleaseRequest());
302  EXPECT_EQ(nullptr, info->request());
303  };
304  callback_service_.SetAllocatorMutator(mutator);
305  CreateServer(allocator.get());
306  ResetStub();
307  SendRpcs(kRpcCount);
308  // messages_deallocaton_count is updated in Release after server side OnDone.
309  // Destroy server to make sure it has been updated.
310  DestroyServer();
311  EXPECT_EQ(kRpcCount, allocator->allocation_count);
312  EXPECT_EQ(kRpcCount, allocator->messages_deallocation_count);
313  EXPECT_EQ(0, allocator->request_deallocation_count);
314  EXPECT_EQ(static_cast<unsigned>(kRpcCount), released_requests.size());
315  for (auto* req : released_requests) {
316  delete req;
317  }
318 }
319 
320 class ArenaAllocatorTest : public MessageAllocatorEnd2endTestBase {
321  public:
322  class ArenaAllocator : public MessageAllocator<EchoRequest, EchoResponse> {
323  public:
324  class MessageHolderImpl : public MessageHolder<EchoRequest, EchoResponse> {
325  public:
326  MessageHolderImpl() {
327  set_request(
328  google::protobuf::Arena::CreateMessage<EchoRequest>(&arena_));
329  set_response(
330  google::protobuf::Arena::CreateMessage<EchoResponse>(&arena_));
331  }
332  void Release() override { delete this; }
333  void FreeRequest() override { GPR_ASSERT(0); }
334 
335  private:
337  };
338  MessageHolder<EchoRequest, EchoResponse>* AllocateMessages() override {
340  return new MessageHolderImpl;
341  }
342  int allocation_count = 0;
343  };
344 };
345 
346 TEST_P(ArenaAllocatorTest, SimpleRpc) {
347  const int kRpcCount = 10;
348  std::unique_ptr<ArenaAllocator> allocator(new ArenaAllocator);
349  CreateServer(allocator.get());
350  ResetStub();
351  SendRpcs(kRpcCount);
352  EXPECT_EQ(kRpcCount, allocator->allocation_count);
353 }
354 
355 std::vector<TestScenario> CreateTestScenarios(bool test_insecure) {
356  std::vector<TestScenario> scenarios;
357  std::vector<std::string> credentials_types{
359  auto insec_ok = [] {
360  // Only allow insecure credentials type when it is registered with the
361  // provider. User may create providers that do not have insecure.
363  kInsecureCredentialsType, nullptr) != nullptr;
364  };
365  if (test_insecure && insec_ok()) {
366  credentials_types.push_back(kInsecureCredentialsType);
367  }
368  GPR_ASSERT(!credentials_types.empty());
369 
370  Protocol parr[]{Protocol::INPROC, Protocol::TCP};
371  for (Protocol p : parr) {
372  for (const auto& cred : credentials_types) {
373  // TODO(vjpai): Test inproc with secure credentials when feasible
374  if (p == Protocol::INPROC &&
375  (cred != kInsecureCredentialsType || !insec_ok())) {
376  continue;
377  }
378  scenarios.emplace_back(p, cred);
379  }
380  }
381  return scenarios;
382 }
383 
384 INSTANTIATE_TEST_SUITE_P(NullAllocatorTest, NullAllocatorTest,
386 INSTANTIATE_TEST_SUITE_P(SimpleAllocatorTest, SimpleAllocatorTest,
388 INSTANTIATE_TEST_SUITE_P(ArenaAllocatorTest, ArenaAllocatorTest,
390 
391 } // namespace
392 } // namespace testing
393 } // namespace grpc
394 
395 int main(int argc, char** argv) {
396  grpc::testing::TestEnvironment env(&argc, argv);
397  ::testing::InitGoogleTest(&argc, argv);
398  int ret = RUN_ALL_TESTS();
399  return ret;
400 }
test_credentials_provider.h
GPR_INFO
#define GPR_INFO
Definition: include/grpc/impl/codegen/log.h:56
testing
Definition: aws_request_signer_test.cc:25
iomgr.h
gen_build_yaml.out
dictionary out
Definition: src/benchmark/gen_build_yaml.py:24
Arena
struct Arena Arena
Definition: third_party/bloaty/third_party/protobuf/src/google/protobuf/arena.h:189
grpc::testing::CredentialsProvider::GetChannelCredentials
virtual std::shared_ptr< ChannelCredentials > GetChannelCredentials(const std::string &type, ChannelArguments *args)=0
port.h
picked_port_
int picked_port_
Definition: message_allocator_end2end_test.cc:190
message_allocator.h
generate.env
env
Definition: generate.py:37
grpc
Definition: grpcpp/alarm.h:33
testing::internal::Log
GTEST_API_ void Log(LogSeverity severity, const std::string &message, int stack_frames_to_skip)
Definition: bloaty/third_party/googletest/googlemock/src/gmock-internal-utils.cc:149
mutex
static uv_mutex_t mutex
Definition: threadpool.c:34
server_address_
std::ostringstream server_address_
Definition: message_allocator_end2end_test.cc:195
benchmark.request
request
Definition: benchmark.py:77
grpc_recycle_unused_port
void grpc_recycle_unused_port(int port)
testing::internal::string
::std::string string
Definition: bloaty/third_party/protobuf/third_party/googletest/googletest/include/gtest/internal/gtest-port.h:881
grpc::testing::CallbackTestServiceImpl::CallbackTestServiceImpl
CallbackTestServiceImpl()
Definition: test_service_impl.h:461
log.h
absl::FormatConversionChar::s
@ s
main
int main(int argc, char **argv)
Definition: message_allocator_end2end_test.cc:395
channel_
std::shared_ptr< Channel > channel_
Definition: message_allocator_end2end_test.cc:191
request_deallocation_count
std::atomic_int request_deallocation_count
Definition: message_allocator_end2end_test.cc:247
scenario
Definition: test/core/fling/client.cc:135
testing::TestWithParam
Definition: bloaty/third_party/googletest/googletest/include/gtest/gtest.h:1883
grpc::testing::CallbackTestServiceImpl::Echo
ServerUnaryReactor * Echo(CallbackServerContext *context, const EchoRequest *request, EchoResponse *response) override
Definition: test_service_impl.cc:128
client_callback.h
grpc::testing::kInsecureCredentialsType
const char kInsecureCredentialsType[]
Definition: test_credentials_provider.h:31
messages_deallocation_count
std::atomic_int messages_deallocation_count
Definition: message_allocator_end2end_test.cc:248
grpc::testing::CredentialsProvider::GetServerCredentials
virtual std::shared_ptr< ServerCredentials > GetServerCredentials(const std::string &type)=0
allocation_count
int allocation_count
Definition: message_allocator_end2end_test.cc:246
profile_analyzer.builder
builder
Definition: profile_analyzer.py:159
asyncio_get_stats.args
args
Definition: asyncio_get_stats.py:40
absl::move
constexpr absl::remove_reference_t< T > && move(T &&t) noexcept
Definition: abseil-cpp/absl/utility/utility.h:221
GPR_ASSERT
#define GPR_ASSERT(x)
Definition: include/grpc/impl/codegen/log.h:94
allocator_mutator_
std::function< void(RpcAllocatorState *allocator_state, const EchoRequest *req, EchoResponse *resp)> allocator_mutator_
Definition: message_allocator_end2end_test.cc:77
req
static uv_connect_t req
Definition: test-connection-fail.c:30
gpr_log
GPRAPI void gpr_log(const char *file, int line, gpr_log_severity severity, const char *format,...) GPR_PRINT_FORMAT_CHECK(4
http2_server_health_check.resp
resp
Definition: http2_server_health_check.py:31
credentials_type
const std::string credentials_type
Definition: message_allocator_end2end_test.cc:88
done
struct tab * done
Definition: bloaty/third_party/zlib/examples/enough.c:176
messages_deallocation_count_
std::atomic_int *const messages_deallocation_count_
Definition: message_allocator_end2end_test.cc:239
channel.h
protocol
Protocol protocol
Definition: message_allocator_end2end_test.cc:87
grpc::testing::INSTANTIATE_TEST_SUITE_P
INSTANTIATE_TEST_SUITE_P(HistogramTestCases, HistogramTest, ::testing::Range< int >(0, GRPC_STATS_HISTOGRAM_COUNT))
grpc::Status::OK
static const Status & OK
An OK pre-defined instance.
Definition: include/grpcpp/impl/codegen/status.h:113
scenarios
static const scenario scenarios[]
Definition: test/core/fling/client.cc:141
TCP
@ TCP
Definition: task.h:82
RUN_ALL_TESTS
int RUN_ALL_TESTS() GTEST_MUST_USE_RESULT_
Definition: bloaty/third_party/googletest/googletest/include/gtest/gtest.h:2471
arena_
google::protobuf::Arena arena_
Definition: message_allocator_end2end_test.cc:336
grpc_pick_unused_port_or_die
int grpc_pick_unused_port_or_die(void)
stub_
std::unique_ptr< EchoTestService::Stub > stub_
Definition: message_allocator_end2end_test.cc:192
test_config.h
client_context.h
testing::InitGoogleTest
GTEST_API_ void InitGoogleTest(int *argc, char **argv)
Definition: bloaty/third_party/googletest/googletest/src/gtest.cc:6106
grpc::testing::cv
static gpr_cv cv
Definition: bm_cq.cc:163
grpc::testing::GetCredentialsProvider
CredentialsProvider * GetCredentialsProvider()
Definition: test_credentials_provider.cc:169
performance.scenario_config.INPROC
string INPROC
Definition: scenario_config.py:25
server_
std::unique_ptr< Server > server_
Definition: message_allocator_end2end_test.cc:194
ret
UniquePtr< SSL_SESSION > ret
Definition: ssl_x509.cc:1029
server_context.h
asyncio_get_stats.response
response
Definition: asyncio_get_stats.py:28
grpc::testing::TestEnvironment
Definition: test/core/util/test_config.h:54
GRPC_COMPRESS_GZIP
@ GRPC_COMPRESS_GZIP
Definition: compression_types.h:63
grpc::CreateCustomChannel
std::shared_ptr< Channel > CreateCustomChannel(const grpc::string &target, const std::shared_ptr< ChannelCredentials > &creds, const ChannelArguments &args)
grpc::protobuf::util::Status
GRPC_CUSTOM_UTIL_STATUS Status
Definition: include/grpcpp/impl/codegen/config_protobuf.h:93
CreateTestScenarios
std::vector< std::string > CreateTestScenarios()
Definition: time_jump_test.cc:84
grpc::testing::EXPECT_EQ
EXPECT_EQ(options.token_exchange_service_uri, "https://foo/exchange")
grpc::testing::CredentialsProvider::GetSecureCredentialsTypeList
virtual std::vector< std::string > GetSecureCredentialsTypeList()=0
request_deallocation_count_
std::atomic_int *const request_deallocation_count_
Definition: message_allocator_end2end_test.cc:238
context
grpc::ClientContext context
Definition: istio_echo_server_lib.cc:61
grpc::testing::TEST_P
TEST_P(HistogramTest, IncHistogram)
Definition: stats_test.cc:87
testing::ValuesIn
internal::ParamGenerator< typename std::iterator_traits< ForwardIterator >::value_type > ValuesIn(ForwardIterator begin, ForwardIterator end)
Definition: bloaty/third_party/googletest/googletest/include/gtest/gtest-param-test.h:297
server.h
function
std::function< bool(GrpcTool *, int, const char **, const CliCredentials &, GrpcToolOutputCallback)> function
Definition: grpc_tool.cc:250
run_grpclb_interop_tests.l
dictionary l
Definition: run_grpclb_interop_tests.py:410
grpc::operator<<
std::ostream & operator<<(std::ostream &out, const string_ref &string)
Definition: grpcpp/impl/codegen/string_ref.h:145
grpc::testing::CreateServer
static std::unique_ptr< Server > CreateServer(const ServerConfig &config)
Definition: qps_worker.cc:72
callback_service_
CallbackTestServiceImpl callback_service_
Definition: message_allocator_end2end_test.cc:193
server_builder.h
i
uint64_t i
Definition: abseil-cpp/absl/container/btree_benchmark.cc:230
create_channel.h
grpc::testing::mu
static gpr_mu mu
Definition: bm_cq.cc:162


grpc
Author(s):
autogenerated on Thu Mar 13 2025 03:00:37