22 #include <condition_variable>
28 #include <gtest/gtest.h>
30 #include "absl/memory/memory.h"
47 #include "src/proto/grpc/testing/echo.grpc.pb.h"
54 using grpc::testing::EchoRequest;
55 using grpc::testing::EchoResponse;
73 ipv4_address_(
"10.0.0.1"),
77 std::ostringstream
cmd;
79 cmd <<
"ip addr add " << ipv4_address_ << netmask_ <<
" dev " << interface_;
80 std::system(
cmd.str().c_str());
83 void InterfaceDown() {
84 std::ostringstream
cmd;
86 cmd <<
"ip addr del " << ipv4_address_ << netmask_ <<
" dev " << interface_;
87 std::system(
cmd.str().c_str());
91 std::ostringstream
cmd;
95 std::system(
cmd.str().c_str());
99 std::ostringstream
cmd;
106 std::system(
cmd.str().c_str());
111 cmd <<
"cat /etc/hosts.orig > /etc/hosts";
112 std::system(
cmd.str().c_str());
116 std::ostringstream
cmd;
118 cmd <<
"iptables -A INPUT -s " << ipv4_address_ <<
" -j DROP";
120 std::system(
cmd.str().c_str());
125 cmd <<
"iptables -A INPUT -d " << ipv4_address_ <<
" -j DROP";
128 void RestoreNetwork() {
129 std::ostringstream
cmd;
131 cmd <<
"iptables -D INPUT -s " << ipv4_address_ <<
" -j DROP";
132 std::system(
cmd.str().c_str());
136 cmd <<
"iptables -D INPUT -d " << ipv4_address_ <<
" -j DROP";
139 void FlakeNetwork() {
140 std::ostringstream
cmd;
143 cmd <<
"tc qdisc replace dev " << interface_
144 <<
" root netem delay 100ms 20ms distribution normal loss 0.1% "
146 "0.1% corrupt 0.01% ";
147 std::system(
cmd.str().c_str());
150 void UnflakeNetwork() {
152 std::ostringstream
cmd;
153 cmd <<
"tc qdisc del dev " << interface_ <<
" root netem";
154 std::system(
cmd.str().c_str());
167 void SetUp()
override {
173 void TearDown()
override {
189 void StopServer() {
server_->Shutdown(); }
191 std::unique_ptr<grpc::testing::EchoTestService::Stub> BuildStub(
192 const std::shared_ptr<Channel>&
channel) {
193 return grpc::testing::EchoTestService::NewStub(
channel);
196 std::shared_ptr<Channel> BuildChannel(
198 ChannelArguments
args = ChannelArguments()) {
199 if (!lb_policy_name.empty()) {
200 args.SetLoadBalancingPolicyName(lb_policy_name);
210 const std::unique_ptr<grpc::testing::EchoTestService::Stub>&
stub,
212 auto response = absl::make_unique<EchoResponse>();
214 auto&
msg = GetParam().message_content;
221 request.mutable_param()->set_skip_cancelled_check(
true);
241 std::unique_ptr<Server>
server_;
243 std::unique_ptr<std::thread>
thread_;
244 bool server_ready_ =
false;
252 std::unique_lock<std::mutex> lock(
mu);
253 std::condition_variable
cond;
254 thread_ = absl::make_unique<std::thread>(
256 cond.wait(lock, [
this] {
return server_ready_; });
257 server_ready_ =
false;
262 std::condition_variable*
cond) {
271 std::lock_guard<std::mutex> lock(*
mu);
272 server_ready_ =
true;
288 if (!
channel->WaitForStateChange(
state, deadline))
return false;
299 if (!
channel->WaitForStateChange(
state, deadline))
return false;
309 std::unique_ptr<grpc::testing::EchoTestService::Stub>
stub_;
310 std::unique_ptr<ServerData>
server_;
317 std::vector<std::string> credentials_types;
318 std::vector<std::string> messages;
322 for (
auto sec = sec_list.begin(); sec != sec_list.end(); sec++) {
323 credentials_types.push_back(*sec);
326 messages.push_back(
"🖖");
329 for (
size_t i = 0;
i <
k * 1024; ++
i) {
330 char c =
'a' + (
i % 26);
333 messages.push_back(big_msg);
335 for (
auto cred = credentials_types.begin(); cred != credentials_types.end();
337 for (
auto msg = messages.begin();
msg != messages.end();
msg++) {
349 TEST_P(FlakyNetworkTest, NetworkTransition) {
350 const int kKeepAliveTimeMs = 1000;
351 const int kKeepAliveTimeoutMs = 1000;
352 ChannelArguments
args;
364 std::atomic_bool shutdown{
false};
367 if (shutdown.load()) {
371 std::this_thread::sleep_for(std::chrono::milliseconds(1000));
380 std::this_thread::sleep_for(std::chrono::milliseconds(1000));
385 shutdown.store(
true);
390 TEST_P(FlakyNetworkTest, ServerUnreachableWithKeepalive) {
391 const int kKeepAliveTimeMs = 1000;
392 const int kKeepAliveTimeoutMs = 1000;
393 const int kReconnectBackoffMs = 1000;
394 ChannelArguments
args;
404 gpr_log(
GPR_DEBUG,
"FlakyNetworkTest.ServerUnreachableWithKeepalive start");
411 std::atomic_bool shutdown{
false};
414 if (shutdown.load()) {
418 std::this_thread::sleep_for(std::chrono::milliseconds(1000));
425 std::this_thread::sleep_for(std::chrono::milliseconds(10000));
432 shutdown.store(
true);
439 TEST_P(FlakyNetworkTest, ServerUnreachableNoKeepalive) {
440 auto channel = BuildChannel(
"pick_first", ChannelArguments());
456 std::this_thread::sleep_for(std::chrono::milliseconds(2000));
465 TEST_P(FlakyNetworkTest, FlakyNetwork) {
466 const int kKeepAliveTimeMs = 1000;
467 const int kKeepAliveTimeoutMs = 1000;
468 const int kMessageCount = 100;
469 ChannelArguments
args;
483 for (
int i = 0;
i < kMessageCount; ++
i) {
492 TEST_P(FlakyNetworkTest, ServerRestartKeepaliveEnabled) {
493 const int kKeepAliveTimeMs = 1000;
494 const int kKeepAliveTimeoutMs = 1000;
495 ChannelArguments
args;
513 std::this_thread::sleep_for(std::chrono::milliseconds(1000));
522 TEST_P(FlakyNetworkTest, ServerRestartKeepaliveDisabled) {
523 auto channel = BuildChannel(
"pick_first", ChannelArguments());
531 std::this_thread::sleep_for(std::chrono::milliseconds(1000));
535 std::this_thread::sleep_for(std::chrono::milliseconds(1000));
547 int main(
int argc,
char** argv) {