25 #include <gtest/gtest.h>
43 #include "src/proto/grpc/testing/echo.grpc.pb.h"
51 using grpc::testing::EchoRequest;
52 using grpc::testing::EchoResponse;
53 using grpc::testing::RequestParams;
54 using std::chrono::system_clock;
72 ipv4_address_(
"10.0.0.1") {}
75 std::ostringstream
cmd;
78 <<
" ' | sudo tee -a /etc/hosts";
79 std::system(
cmd.str().c_str());
83 std::ostringstream
cmd;
86 std::system(
cmd.str().c_str());
90 std::ostringstream
cmd;
91 cmd <<
"sudo /sbin/ifconfig " << interface_ <<
" alias " << ipv4_address_;
92 std::system(
cmd.str().c_str());
95 void InterfaceDown() {
96 std::ostringstream
cmd;
97 cmd <<
"sudo /sbin/ifconfig " << interface_ <<
" -alias " << ipv4_address_;
98 std::system(
cmd.str().c_str());
113 void SetUp()
override {
119 void TearDown()
override {
130 void StopServer() {
server_->Shutdown(); }
132 std::unique_ptr<grpc::testing::EchoTestService::Stub> BuildStub(
133 const std::shared_ptr<Channel>&
channel) {
134 return grpc::testing::EchoTestService::NewStub(
channel);
137 std::shared_ptr<Channel> BuildChannel() {
140 ChannelArguments
args;
147 const std::unique_ptr<grpc::testing::EchoTestService::Stub>&
stub,
148 bool expect_success =
false) {
149 auto response = std::unique_ptr<EchoResponse>(
new EchoResponse());
151 auto&
msg = GetParam().message_content;
161 if (expect_success) {
166 const std::unique_ptr<grpc::testing::EchoTestService::Stub>&
stub,
167 RequestParams param = RequestParams()) {
173 call->response_reader =
176 call->response_reader->StartCall();
180 void ShutdownCQ() {
cq_.Shutdown(); }
182 bool CQNext(
void**
tag,
bool*
ok) {
205 if (!
channel->WaitForStateChange(
state, deadline))
return false;
216 if (!
channel->WaitForStateChange(
state, deadline))
return false;
225 std::unique_ptr<ClientAsyncResponseReader<EchoResponse>> response_reader;
232 std::unique_ptr<Server>
server_;
234 std::unique_ptr<std::thread>
thread_;
235 bool server_ready_ =
false;
243 std::unique_lock<std::mutex> lock(
mu);
244 std::condition_variable
cond;
247 cond.wait(lock, [
this] {
return server_ready_; });
248 server_ready_ =
false;
253 std::condition_variable*
cond) {
262 std::lock_guard<std::mutex> lock(*
mu);
263 server_ready_ =
true;
277 std::unique_ptr<ServerData>
server_;
283 std::vector<std::string> credentials_types;
284 std::vector<std::string> messages;
288 for (
auto sec = sec_list.begin(); sec != sec_list.end(); sec++) {
289 credentials_types.push_back(*sec);
292 messages.push_back(
"🖖");
295 for (
size_t i = 0;
i <
k * 1024; ++
i) {
296 char c =
'a' + (
i % 26);
299 messages.push_back(big_msg);
301 for (
auto cred = credentials_types.begin(); cred != credentials_types.end();
303 for (
auto msg = messages.begin();
msg != messages.end();
msg++) {
316 TEST_P(CFStreamTest, NetworkTransition) {
323 std::atomic_bool shutdown{
false};
326 if (shutdown.load()) {
330 std::this_thread::sleep_for(std::chrono::milliseconds(1000));
341 std::this_thread::sleep_for(std::chrono::milliseconds(1000));
347 shutdown.store(
true);
352 TEST_P(CFStreamTest, NetworkFlapRpcsInFlight) {
355 std::atomic_int rpcs_sent{0};
358 for (
int i = 0;
i < 10; ++
i) {
360 param.set_skip_cancelled_check(
true);
361 SendAsyncRpc(
stub, param);
372 bool network_down =
true;
373 int total_completions = 0;
375 while (CQNext(&got_tag, &
ok)) {
379 if (!
call->status.ok()) {
381 call->status.error_message().c_str());
385 network_down =
false;
398 for (
int i = 0;
i < 100; ++
i) {
400 param.set_skip_cancelled_check(
true);
401 SendAsyncRpc(
stub, param);
402 std::this_thread::sleep_for(std::chrono::milliseconds(10));
413 TEST_P(CFStreamTest, ConcurrentRpc) {
416 std::atomic_int rpcs_sent{0};
420 int total_completions = 0;
422 while (CQNext(&got_tag, &
ok)) {
426 if (!
call->status.ok()) {
428 call->status.error_message().c_str());
441 for (
int i = 0;
i < 10; ++
i) {
444 ErrorStatus*
error = param.mutable_expected_error();
446 error->set_error_message(
"internal error");
447 SendAsyncRpc(
stub, param);
448 }
else if (i % 5 == 0) {
450 param.set_echo_metadata(
true);
451 DebugInfo* info = param.mutable_debug_info();
452 info->add_stack_entries(
"stack_entry1");
453 info->add_stack_entries(
"stack_entry2");
454 info->set_detail(
"detailed debug info");
455 SendAsyncRpc(
stub, param);
470 #endif // GRPC_CFSTREAM
472 int main(
int argc,
char** argv) {