27 #include "absl/memory/memory.h"
28 #include "absl/strings/str_cat.h"
50 #include "src/proto/grpc/lb/v1/load_balancer.grpc.pb.h"
51 #include "src/proto/grpc/testing/echo.grpc.pb.h"
56 using grpc::lb::v1::LoadBalancer;
57 using grpc::lb::v1::LoadBalanceRequest;
58 using grpc::lb::v1::LoadBalanceResponse;
64 const size_t kNumBackends = 10;
65 const size_t kNumBalancers = 5;
66 const size_t kNumClientThreads = 100;
67 const int kResolutionUpdateIntervalMs = 50;
68 const int kServerlistUpdateIntervalMs = 10;
69 const int kTestDurationSec = 30;
73 class BalancerServiceImpl :
public LoadBalancer::Service {
75 using Stream = ServerReaderWriter<LoadBalanceResponse, LoadBalanceRequest>;
77 explicit BalancerServiceImpl(
const std::vector<int>& all_backend_ports)
85 stream->Write(BuildRandomResponseForBackends());
86 std::this_thread::sleep_for(
87 std::chrono::milliseconds(kServerlistUpdateIntervalMs));
98 GPR_ASSERT(inet_pton(AF_INET, ip_str, &ip4) == 1);
99 return std::string(
reinterpret_cast<const char*
>(&ip4),
sizeof(ip4));
102 LoadBalanceResponse BuildRandomResponseForBackends() {
106 size_t num_non_drop_entry =
109 std::vector<int> random_backend_indices;
110 for (
size_t i = 0;
i < num_non_drop_entry; ++
i) {
113 for (
size_t i = 0;
i < num_drop_entry; ++
i) {
114 random_backend_indices.push_back(-1);
116 std::shuffle(random_backend_indices.begin(), random_backend_indices.end(),
117 std::mt19937(std::random_device()()));
120 for (
int index : random_backend_indices) {
124 server->set_load_balance_token(
"load_balancing");
126 server->set_ip_address(Ip4ToPackedString(
"127.0.0.1"));
137 class ClientChannelStressTest {
143 const auto wait_duration =
144 std::chrono::milliseconds(kResolutionUpdateIntervalMs);
145 std::vector<AddressData> addresses;
148 if (std::chrono::duration_cast<std::chrono::seconds>(
150 .
count() > kTestDurationSec) {
157 if (std::rand() % 10 < 8) {
158 addresses.emplace_back(AddressData{balancer_server.port_,
""});
161 std::shuffle(addresses.begin(), addresses.end(),
162 std::mt19937(std::random_device()()));
163 SetNextResolution(addresses);
164 std::this_thread::sleep_for(wait_duration);
171 template <
typename T>
172 struct ServerThread {
183 thread_ = absl::make_unique<std::thread>(
224 const std::vector<AddressData>& address_data) {
226 for (
const auto&
addr : address_data) {
234 const_cast<char*
>(
addr.balancer_name.c_str()));
237 addresses.emplace_back(address.
addr, address.
len,
args);
243 const std::vector<AddressData>& balancer_address_data) {
247 nullptr,
"{\"loadBalancingConfig\":[{\"grpclb\":{}}]}", &
error);
250 CreateAddressListFromAddressDataList(balancer_address_data);
256 void SetNextResolution(
const std::vector<AddressData>& address_data) {
262 void KeepSendingRequests() {
279 ChannelArguments
args;
281 grpc_core::MakeRefCounted<grpc_core::FakeResolverResponseGenerator>();
284 std::ostringstream uri;
285 uri <<
"fake:///servername_not_used";
293 std::vector<int> backend_ports;
294 for (
size_t i = 0;
i < kNumBackends; ++
i) {
295 backends_.emplace_back(
new BackendServiceImpl());
301 for (
size_t i = 0;
i < kNumBalancers; ++
i) {
302 balancers_.emplace_back(
new BalancerServiceImpl(backend_ports));
308 for (
size_t i = 0;
i < kNumClientThreads; ++
i) {
310 std::thread(&ClientChannelStressTest::KeepSendingRequests,
this));
331 std::unique_ptr<grpc::testing::EchoTestService::Stub>
stub_;
333 std::vector<std::unique_ptr<BackendServiceImpl>>
backends_;
334 std::vector<std::unique_ptr<BalancerServiceImpl>>
balancers_;
346 int main(
int argc,
char** argv) {
348 grpc::testing::ClientChannelStressTest
test;