20 #include "absl/status/status.h"
21 #include "absl/strings/str_cat.h"
41 using ::grpc_event_engine::experimental::ChannelArgsEndpointConfig;
42 using ::grpc_event_engine::experimental::EventEngine;
47 using namespace std::chrono_literals;
49 constexpr
int kMinMessageSize = 1024;
50 constexpr
int kMaxMessageSize = 4096;
51 constexpr
int kNumExchangedMessages = 100;
55 static const char alphanum[] =
57 "ABCDEFGHIJKLMNOPQRSTUVWXYZ"
58 "abcdefghijklmnopqrstuvwxyz";
59 static std::random_device rd;
60 static std::seed_seq
seed{rd()};
62 static std::uniform_real_distribution<> dis(kMinMessageSize, kMaxMessageSize);
71 for (
int i = 0;
i <
len; ++
i) {
72 tmp_s += alphanum[rand() % (
sizeof(alphanum) - 1)];
83 auto test_ee = this->NewEventEngine();
84 Promise<std::unique_ptr<EventEngine::Endpoint>> client_endpoint_promise;
85 auto memory_quota = std::make_unique<grpc_core::MemoryQuota>(
"bar");
89 [&client_endpoint_promise](
93 client_endpoint_promise.Set(
nullptr);
96 ChannelArgsEndpointConfig(
nullptr),
97 memory_quota->CreateMemoryAllocator(
"conn-1"), 24h);
99 auto client_endpoint =
std::move(client_endpoint_promise.Get());
109 auto oracle_ee = this->NewOracleEventEngine();
110 auto test_ee = this->NewEventEngine();
111 auto memory_quota = std::make_unique<grpc_core::MemoryQuota>(
"bar");
114 Promise<std::unique_ptr<EventEngine::Endpoint>> client_endpoint_promise;
115 Promise<std::unique_ptr<EventEngine::Endpoint>> server_endpoint_promise;
118 [&server_endpoint_promise](
119 std::unique_ptr<Endpoint> ep,
121 server_endpoint_promise.Set(
std::move(ep));
124 auto status = oracle_ee->CreateListener(
127 ChannelArgsEndpointConfig(
nullptr),
128 std::make_unique<grpc_core::MemoryQuota>(
"foo"));
136 [&client_endpoint_promise](
139 gpr_log(GPR_ERROR,
"Connect failed: %s",
140 status.status().ToString().c_str());
141 client_endpoint_promise.Set(nullptr);
143 client_endpoint_promise.Set(std::move(*status));
147 memory_quota->CreateMemoryAllocator(
"conn-1"), 24h);
149 auto client_endpoint =
std::move(client_endpoint_promise.Get());
150 auto server_endpoint =
std::move(server_endpoint_promise.Get());
155 for (
int i = 0;
i < kNumExchangedMessages;
i++) {
158 server_endpoint.get())
163 client_endpoint.get())
172 static constexpr
int kNumListenerAddresses = 10;
173 static constexpr
int kNumConnections = 100;
174 auto oracle_ee = this->NewOracleEventEngine();
175 auto test_ee = this->NewEventEngine();
176 auto memory_quota = std::make_unique<grpc_core::MemoryQuota>(
"bar");
177 Promise<std::unique_ptr<EventEngine::Endpoint>> client_endpoint_promise;
178 Promise<std::unique_ptr<EventEngine::Endpoint>> server_endpoint_promise;
179 std::vector<std::string> target_addrs;
180 std::vector<std::tuple<std::unique_ptr<Endpoint>, std::unique_ptr<Endpoint>>>
184 [&server_endpoint_promise](
185 std::unique_ptr<Endpoint> ep,
187 server_endpoint_promise.Set(
std::move(ep));
189 auto status = oracle_ee->CreateListener(
192 ChannelArgsEndpointConfig(
nullptr),
193 std::make_unique<grpc_core::MemoryQuota>(
"foo"));
197 target_addrs.reserve(kNumListenerAddresses);
198 for (
int i = 0;
i < kNumListenerAddresses;
i++) {
202 target_addrs.push_back(target_addr);
206 for (
int i = 0;
i < kNumConnections;
i++) {
211 [&client_endpoint_promise](
214 gpr_log(GPR_ERROR,
"Connect failed: %s",
215 status.status().ToString().c_str());
216 client_endpoint_promise.Set(nullptr);
218 client_endpoint_promise.Set(std::move(*status));
222 ChannelArgsEndpointConfig(
nullptr),
223 memory_quota->CreateMemoryAllocator(
227 auto client_endpoint =
std::move(client_endpoint_promise.Get());
228 auto server_endpoint =
std::move(server_endpoint_promise.Get());
233 client_endpoint_promise.Reset();
234 server_endpoint_promise.Reset();
237 std::vector<std::thread>
threads;
240 threads.reserve(kNumConnections);
241 for (
int i = 0;
i < kNumConnections;
i++) {
245 threads.emplace_back([client_endpoint =
249 std::vector<std::thread>
workers;
251 auto worker = [client_endpoint = client_endpoint.get(),
253 server_endpoint.get()](
bool client_to_server) {
254 grpc_core::ExecCtx ctx;
255 for (int i = 0; i < kNumExchangedMessages; i++) {
259 if (client_to_server) {
260 EXPECT_TRUE(SendValidatePayload(GetNextSendMessage(),
261 client_endpoint, server_endpoint)
264 EXPECT_TRUE(SendValidatePayload(GetNextSendMessage(),
265 server_endpoint, client_endpoint)