client_test.cc
Go to the documentation of this file.
1 // Copyright 2022 gRPC authors.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #include <random>
16 #include <string>
17 #include <thread>
18 #include <vector>
19 
20 #include "absl/status/status.h"
21 #include "absl/strings/str_cat.h"
22 
25 #include <grpc/support/log.h>
26 
35 #include "test/core/util/port.h"
36 
38 
39 namespace {
40 
41 using ::grpc_event_engine::experimental::ChannelArgsEndpointConfig;
42 using ::grpc_event_engine::experimental::EventEngine;
47 using namespace std::chrono_literals;
48 
49 constexpr int kMinMessageSize = 1024;
50 constexpr int kMaxMessageSize = 4096;
51 constexpr int kNumExchangedMessages = 100;
52 
53 // Returns a random message with bounded length.
54 std::string GetNextSendMessage() {
55  static const char alphanum[] =
56  "0123456789"
57  "ABCDEFGHIJKLMNOPQRSTUVWXYZ"
58  "abcdefghijklmnopqrstuvwxyz";
59  static std::random_device rd;
60  static std::seed_seq seed{rd()};
61  static std::mt19937 gen(seed);
62  static std::uniform_real_distribution<> dis(kMinMessageSize, kMaxMessageSize);
63  static grpc_core::Mutex g_mu;
64  std::string tmp_s;
65  int len;
66  {
68  len = dis(gen);
69  }
70  tmp_s.reserve(len);
71  for (int i = 0; i < len; ++i) {
72  tmp_s += alphanum[rand() % (sizeof(alphanum) - 1)];
73  }
74  return tmp_s;
75 }
76 
77 } // namespace
78 
79 // Create a connection using the test EventEngine to a non-existent listener
80 // and verify that the connection fails.
81 TEST_F(EventEngineClientTest, ConnectToNonExistentListenerTest) {
83  auto test_ee = this->NewEventEngine();
84  Promise<std::unique_ptr<EventEngine::Endpoint>> client_endpoint_promise;
85  auto memory_quota = std::make_unique<grpc_core::MemoryQuota>("bar");
86  // Create a test EventEngine client endpoint and connect to a non existent
87  // listener.
88  test_ee->Connect(
89  [&client_endpoint_promise](
90  absl::StatusOr<std::unique_ptr<Endpoint>> status) {
91  // Connect should fail.
93  client_endpoint_promise.Set(nullptr);
94  },
95  URIToResolvedAddress("ipv6:[::1]:7000"),
96  ChannelArgsEndpointConfig(nullptr),
97  memory_quota->CreateMemoryAllocator("conn-1"), 24h);
98 
99  auto client_endpoint = std::move(client_endpoint_promise.Get());
100  EXPECT_EQ(client_endpoint, nullptr);
101 }
102 
103 // Create a connection using the test EventEngine to a listener created
104 // by the oracle EventEngine and exchange bi-di data over the connection.
105 // For each data transfer, verify that data written at one end of the stream
106 // equals data read at the other end of the stream.
107 TEST_F(EventEngineClientTest, ConnectExchangeBidiDataTransferTest) {
109  auto oracle_ee = this->NewOracleEventEngine();
110  auto test_ee = this->NewEventEngine();
111  auto memory_quota = std::make_unique<grpc_core::MemoryQuota>("bar");
112  std::string target_addr = absl::StrCat(
113  "ipv6:[::1]:", std::to_string(grpc_pick_unused_port_or_die()));
114  Promise<std::unique_ptr<EventEngine::Endpoint>> client_endpoint_promise;
115  Promise<std::unique_ptr<EventEngine::Endpoint>> server_endpoint_promise;
116 
117  Listener::AcceptCallback accept_cb =
118  [&server_endpoint_promise](
119  std::unique_ptr<Endpoint> ep,
120  grpc_core::MemoryAllocator /*memory_allocator*/) {
121  server_endpoint_promise.Set(std::move(ep));
122  };
123 
124  auto status = oracle_ee->CreateListener(
125  std::move(accept_cb),
126  [](absl::Status status) { GPR_ASSERT(status.ok()); },
127  ChannelArgsEndpointConfig(nullptr),
128  std::make_unique<grpc_core::MemoryQuota>("foo"));
129  EXPECT_TRUE(status.ok());
130 
131  std::unique_ptr<Listener> listener = std::move(*status);
132  EXPECT_TRUE(listener->Bind(URIToResolvedAddress(target_addr)).ok());
133  EXPECT_TRUE(listener->Start().ok());
134 
135  test_ee->Connect(
136  [&client_endpoint_promise](
137  absl::StatusOr<std::unique_ptr<Endpoint>> status) {
138  if (!status.ok()) {
139  gpr_log(GPR_ERROR, "Connect failed: %s",
140  status.status().ToString().c_str());
141  client_endpoint_promise.Set(nullptr);
142  } else {
143  client_endpoint_promise.Set(std::move(*status));
144  }
145  },
146  URIToResolvedAddress(target_addr), ChannelArgsEndpointConfig(nullptr),
147  memory_quota->CreateMemoryAllocator("conn-1"), 24h);
148 
149  auto client_endpoint = std::move(client_endpoint_promise.Get());
150  auto server_endpoint = std::move(server_endpoint_promise.Get());
151  EXPECT_TRUE(client_endpoint != nullptr);
152  EXPECT_TRUE(server_endpoint != nullptr);
153 
154  // Alternate message exchanges between client -- server and server -- client.
155  for (int i = 0; i < kNumExchangedMessages; i++) {
156  // Send from client to server and verify data read at the server.
157  EXPECT_TRUE(SendValidatePayload(GetNextSendMessage(), client_endpoint.get(),
158  server_endpoint.get())
159  .ok());
160 
161  // Send from server to client and verify data read at the client.
162  EXPECT_TRUE(SendValidatePayload(GetNextSendMessage(), server_endpoint.get(),
163  client_endpoint.get())
164  .ok());
165  }
166 }
167 
168 // Create 1 listener bound to N IPv6 addresses and M connections where M > N and
169 // exchange and verify random number of messages over each connection.
170 TEST_F(EventEngineClientTest, MultipleIPv6ConnectionsToOneOracleListenerTest) {
172  static constexpr int kNumListenerAddresses = 10; // N
173  static constexpr int kNumConnections = 100; // M
174  auto oracle_ee = this->NewOracleEventEngine();
175  auto test_ee = this->NewEventEngine();
176  auto memory_quota = std::make_unique<grpc_core::MemoryQuota>("bar");
177  Promise<std::unique_ptr<EventEngine::Endpoint>> client_endpoint_promise;
178  Promise<std::unique_ptr<EventEngine::Endpoint>> server_endpoint_promise;
179  std::vector<std::string> target_addrs;
180  std::vector<std::tuple<std::unique_ptr<Endpoint>, std::unique_ptr<Endpoint>>>
181  connections;
182 
183  Listener::AcceptCallback accept_cb =
184  [&server_endpoint_promise](
185  std::unique_ptr<Endpoint> ep,
186  grpc_core::MemoryAllocator /*memory_allocator*/) {
187  server_endpoint_promise.Set(std::move(ep));
188  };
189  auto status = oracle_ee->CreateListener(
190  std::move(accept_cb),
191  [](absl::Status status) { GPR_ASSERT(status.ok()); },
192  ChannelArgsEndpointConfig(nullptr),
193  std::make_unique<grpc_core::MemoryQuota>("foo"));
194  EXPECT_TRUE(status.ok());
195  std::unique_ptr<Listener> listener = std::move(*status);
196 
197  target_addrs.reserve(kNumListenerAddresses);
198  for (int i = 0; i < kNumListenerAddresses; i++) {
199  std::string target_addr = absl::StrCat(
200  "ipv6:[::1]:", std::to_string(grpc_pick_unused_port_or_die()));
201  EXPECT_TRUE(listener->Bind(URIToResolvedAddress(target_addr)).ok());
202  target_addrs.push_back(target_addr);
203  }
204  EXPECT_TRUE(listener->Start().ok());
206  for (int i = 0; i < kNumConnections; i++) {
207  // Create a test EventEngine client endpoint and connect to a one of the
208  // addresses bound to the oracle listener. Verify that the connection
209  // succeeds.
210  test_ee->Connect(
211  [&client_endpoint_promise](
212  absl::StatusOr<std::unique_ptr<Endpoint>> status) {
213  if (!status.ok()) {
214  gpr_log(GPR_ERROR, "Connect failed: %s",
215  status.status().ToString().c_str());
216  client_endpoint_promise.Set(nullptr);
217  } else {
218  client_endpoint_promise.Set(std::move(*status));
219  }
220  },
221  URIToResolvedAddress(target_addrs[i % kNumListenerAddresses]),
222  ChannelArgsEndpointConfig(nullptr),
223  memory_quota->CreateMemoryAllocator(
224  absl::StrCat("conn-", std::to_string(i))),
225  24h);
226 
227  auto client_endpoint = std::move(client_endpoint_promise.Get());
228  auto server_endpoint = std::move(server_endpoint_promise.Get());
229  EXPECT_TRUE(client_endpoint != nullptr);
230  EXPECT_TRUE(server_endpoint != nullptr);
231  connections.push_back(std::make_tuple(std::move(client_endpoint),
232  std::move(server_endpoint)));
233  client_endpoint_promise.Reset();
234  server_endpoint_promise.Reset();
235  }
236 
237  std::vector<std::thread> threads;
238  // Create one thread for each connection. For each connection, create
239  // 2 more worker threads: to exchange and verify bi-directional data transfer.
240  threads.reserve(kNumConnections);
241  for (int i = 0; i < kNumConnections; i++) {
242  // For each connection, simulate a parallel bi-directional data transfer.
243  // All bi-directional transfers are run in parallel across all connections.
244  // Each bi-directional data transfer uses a random number of messages.
245  threads.emplace_back([client_endpoint =
246  std::move(std::get<0>(connections[i])),
247  server_endpoint =
248  std::move(std::get<1>(connections[i]))]() {
249  std::vector<std::thread> workers;
250  workers.reserve(2);
251  auto worker = [client_endpoint = client_endpoint.get(),
252  server_endpoint =
253  server_endpoint.get()](bool client_to_server) {
254  grpc_core::ExecCtx ctx;
255  for (int i = 0; i < kNumExchangedMessages; i++) {
256  // If client_to_server is true, send from client to server and
257  // verify data read at the server. Otherwise send data from server
258  // to client and verify data read at client.
259  if (client_to_server) {
260  EXPECT_TRUE(SendValidatePayload(GetNextSendMessage(),
261  client_endpoint, server_endpoint)
262  .ok());
263  } else {
264  EXPECT_TRUE(SendValidatePayload(GetNextSendMessage(),
265  server_endpoint, client_endpoint)
266  .ok());
267  }
268  }
269  };
270  // worker[0] simulates a flow from client to server endpoint
271  workers.emplace_back([&worker]() { worker(true); });
272  // worker[1] simulates a flow from server to client endpoint
273  workers.emplace_back([&worker]() { worker(false); });
274  workers[0].join();
275  workers[1].join();
276  });
277  }
278  for (auto& t : threads) {
279  t.join();
280  }
281 }
282 
283 // TODO(vigneshbabu): Add more tests which create listeners bound to a mix
284 // Ipv6 and other type of addresses (UDS) in the same test.
EXPECT_FALSE
#define EXPECT_FALSE(condition)
Definition: bloaty/third_party/googletest/googletest/include/gtest/gtest.h:1970
log.h
port.h
TEST_F
TEST_F(EventEngineClientTest, ConnectToNonExistentListenerTest)
Definition: client_test.cc:81
grpc_event_engine::experimental::EventEngine::Listener::AcceptCallback
std::function< void(std::unique_ptr< Endpoint >, MemoryAllocator memory_allocator)> AcceptCallback
Called when the listener has accepted a new client connection.
Definition: event_engine.h:232
absl::StrCat
std::string StrCat(const AlphaNum &a, const AlphaNum &b)
Definition: abseil-cpp/absl/strings/str_cat.cc:98
grpc_event_engine::experimental::MemoryAllocator
Definition: memory_allocator.h:35
connections
static uv_pipe_t connections[NUM_CLIENTS]
Definition: test-pipe-connect-multiple.c:41
std::tr1::make_tuple
tuple make_tuple()
Definition: cares/cares/test/gmock-1.8.0/gtest/gtest.h:1619
Listener
::grpc_event_engine::experimental::EventEngine::Listener Listener
Definition: event_engine_test_utils.cc:42
event_engine.h
worker
static void worker(void *arg)
Definition: threadpool.c:57
grpc_core::MutexLock
Definition: src/core/lib/gprpp/sync.h:88
grpc_event_engine::experimental::URIToResolvedAddress
EventEngine::ResolvedAddress URIToResolvedAddress(std::string address_str)
Definition: event_engine_test_utils.cc:47
Listener
Definition: transport_common.h:31
seed
static const uint8_t seed[20]
Definition: dsa_test.cc:79
channel_args_endpoint_config.h
testing::internal::string
::std::string string
Definition: bloaty/third_party/protobuf/third_party/googletest/googletest/include/gtest/internal/gtest-port.h:881
grpc_event_engine::experimental::EventEngine::Endpoint
Definition: event_engine.h:141
status
absl::Status status
Definition: rls.cc:251
ctx
static struct test_ctx ctx
Definition: test-ipc-send-recv.c:65
absl::SleepFor
void SleepFor(absl::Duration duration)
Definition: abseil-cpp/absl/time/clock.h:70
threads
static uv_thread_t * threads
Definition: threadpool.c:38
EXPECT_EQ
#define EXPECT_EQ(a, b)
Definition: iomgr/time_averaged_stats_test.cc:27
EventEngineClientTest
Definition: client_test.cc:37
event_engine_test_utils.h
absl::Milliseconds
constexpr Duration Milliseconds(T n)
Definition: third_party/abseil-cpp/absl/time/time.h:415
parse_address.h
absl::move
constexpr absl::remove_reference_t< T > && move(T &&t) noexcept
Definition: abseil-cpp/absl/utility/utility.h:221
GPR_ASSERT
#define GPR_ASSERT(x)
Definition: include/grpc/impl/codegen/log.h:94
worker
Definition: worker.py:1
promise.h
gen
OPENSSL_EXPORT GENERAL_NAME * gen
Definition: x509v3.h:495
event_engine_test.h
Endpoint
::grpc_event_engine::experimental::EventEngine::Endpoint Endpoint
Definition: event_engine_test_utils.cc:41
grpc_pick_unused_port_or_die
int grpc_pick_unused_port_or_die(void)
grpc_core::ExecCtx
Definition: exec_ctx.h:97
grpc_event_engine::experimental::SendValidatePayload
absl::Status SendValidatePayload(std::string data, Endpoint *send_endpoint, Endpoint *receive_endpoint)
Definition: event_engine_test_utils.cc:75
g_mu
static gpr_mu g_mu
Definition: iomgr.cc:55
grpc_core::Mutex
Definition: src/core/lib/gprpp/sync.h:61
grpc_core::Promise
std::function< Poll< T >()> Promise
Definition: promise/promise.h:37
absl::Status
Definition: third_party/abseil-cpp/absl/status/status.h:424
absl::str_format_internal::LengthMod::t
@ t
ok
bool ok
Definition: async_end2end_test.cc:197
exec_ctx.h
absl::Status::ok
ABSL_MUST_USE_RESULT bool ok() const
Definition: third_party/abseil-cpp/absl/status/status.h:802
EXPECT_TRUE
#define EXPECT_TRUE(condition)
Definition: bloaty/third_party/googletest/googletest/include/gtest/gtest.h:1967
workers
struct child_worker * workers
memory_allocator.h
EventEngineTest
Definition: event_engine_test.h:60
absl::StatusOr
Definition: abseil-cpp/absl/status/statusor.h:187
uri_parser.h
len
int len
Definition: abseil-cpp/absl/base/internal/low_level_alloc_test.cc:46
to_string
static bool to_string(zval *from)
Definition: protobuf/php/ext/google/protobuf/convert.c:333
sync.h
i
uint64_t i
Definition: abseil-cpp/absl/container/btree_benchmark.cc:230


grpc
Author(s):
autogenerated on Thu Mar 13 2025 02:58:47