reconnect_interop_server.cc
Go to the documentation of this file.
1 /*
2  *
3  * Copyright 2015 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
19 // Test description at doc/connection-backoff-interop-test-description.md
20 
21 #include <signal.h>
22 
23 #include <condition_variable>
24 #include <memory>
25 #include <mutex>
26 #include <sstream>
27 
28 #include "absl/flags/flag.h"
29 
30 #include <grpc/grpc.h>
31 #include <grpc/support/log.h>
32 #include <grpcpp/server.h>
33 #include <grpcpp/server_builder.h>
34 #include <grpcpp/server_context.h>
35 
36 #include "src/proto/grpc/testing/empty.pb.h"
37 #include "src/proto/grpc/testing/messages.pb.h"
38 #include "src/proto/grpc/testing/test.grpc.pb.h"
41 
42 ABSL_FLAG(int32_t, control_port, 0, "Server port for controlling the server.");
43 ABSL_FLAG(int32_t, retry_port, 0,
44  "Server port for raw tcp connections. All incoming "
45  "connections will be closed immediately.");
46 
47 using grpc::Server;
50 using grpc::Status;
51 using grpc::testing::Empty;
54 using grpc::testing::ReconnectService;
55 
56 static bool got_sigint = false;
57 
58 class ReconnectServiceImpl : public ReconnectService::Service {
59  public:
60  explicit ReconnectServiceImpl(int retry_port)
61  : retry_port_(retry_port),
62  serving_(false),
64  shutdown_(false) {
66  }
67 
68  ~ReconnectServiceImpl() override {
69  if (server_started_) {
71  }
72  }
73 
75 
77  Empty* /*response*/) override {
78  bool start_server = true;
79  std::unique_lock<std::mutex> lock(mu_);
80  while (serving_ && !shutdown_) {
81  cv_.wait(lock);
82  }
83  if (shutdown_) {
84  return Status(grpc::StatusCode::UNAVAILABLE, "shutting down");
85  }
86  serving_ = true;
87  if (server_started_) {
88  start_server = false;
89  } else {
91  request->max_reconnect_backoff_ms();
92  server_started_ = true;
93  }
94  lock.unlock();
95 
96  if (start_server) {
98  } else {
100  }
101  return Status::OK;
102  }
103 
104  Status Stop(ServerContext* /*context*/, const Empty* /*request*/,
105  ReconnectInfo* response) override {
106  // extract timestamps and set response
107  Verify(response);
109  std::lock_guard<std::mutex> lock(mu_);
110  serving_ = false;
111  cv_.notify_one();
112  return Status::OK;
113  }
114 
116  double expected_backoff = 1000.0;
117  const double kTransmissionDelay = 100.0;
118  const double kBackoffMultiplier = 1.6;
119  const double kJitterFactor = 0.2;
120  const int kMaxBackoffMs = tcp_server_.max_reconnect_backoff_ms
122  : 120 * 1000;
123  bool passed = true;
124  for (timestamp_list* cur = tcp_server_.head; cur && cur->next;
125  cur = cur->next) {
126  double backoff = gpr_time_to_millis(
127  gpr_time_sub(cur->next->timestamp, cur->timestamp));
128  double min_backoff = expected_backoff * (1 - kJitterFactor);
129  double max_backoff = expected_backoff * (1 + kJitterFactor);
130  if (backoff < min_backoff - kTransmissionDelay ||
131  backoff > max_backoff + kTransmissionDelay) {
132  passed = false;
133  }
134  response->add_backoff_ms(static_cast<int32_t>(backoff));
135  expected_backoff *= kBackoffMultiplier;
136  expected_backoff =
137  expected_backoff > kMaxBackoffMs ? kMaxBackoffMs : expected_backoff;
138  }
139  response->set_passed(passed);
140  }
141 
142  void Shutdown() {
143  std::lock_guard<std::mutex> lock(mu_);
144  shutdown_ = true;
145  cv_.notify_all();
146  }
147 
148  private:
151  bool serving_;
153  bool shutdown_;
155  std::condition_variable cv_;
156 };
157 
158 void RunServer() {
159  std::ostringstream server_address;
160  server_address << "0.0.0.0:" << absl::GetFlag(FLAGS_control_port);
161  ReconnectServiceImpl service(absl::GetFlag(FLAGS_retry_port));
162 
164  builder.RegisterService(&service);
165  builder.AddListeningPort(server_address.str(),
167  std::unique_ptr<Server> server(builder.BuildAndStart());
168  gpr_log(GPR_INFO, "Server listening on %s", server_address.str().c_str());
169  while (!got_sigint) {
170  service.Poll(5);
171  }
172  service.Shutdown();
173 }
174 
175 static void sigint_handler(int /*x*/) { got_sigint = true; }
176 
177 int main(int argc, char** argv) {
178  grpc::testing::InitTest(&argc, &argv, true);
179  signal(SIGINT, sigint_handler);
180 
181  GPR_ASSERT(absl::GetFlag(FLAGS_control_port) != 0);
182  GPR_ASSERT(absl::GetFlag(FLAGS_retry_port) != 0);
183  RunServer();
184 
185  return 0;
186 }
reconnect_server::head
timestamp_list * head
Definition: reconnect_server.h:34
grpc::testing::InitTest
void InitTest(int *argc, char ***argv, bool remove_flags)
Definition: test_config_cc.cc:28
absl::time_internal::cctz::seconds
std::chrono::duration< std::int_fast64_t > seconds
Definition: abseil-cpp/absl/time/internal/cctz/include/cctz/time_zone.h:40
GPR_INFO
#define GPR_INFO
Definition: include/grpc/impl/codegen/log.h:56
reconnect_server_destroy
void reconnect_server_destroy(reconnect_server *server)
Definition: reconnect_server.cc:128
grpc::ServerContext
Definition: grpcpp/impl/codegen/server_context.h:566
log.h
main
int main(int argc, char **argv)
Definition: reconnect_interop_server.cc:177
ReconnectServiceImpl::Shutdown
void Shutdown()
Definition: reconnect_interop_server.cc:142
false
#define false
Definition: setup_once.h:323
mutex
static uv_mutex_t mutex
Definition: threadpool.c:34
benchmark.request
request
Definition: benchmark.py:77
reconnect_server_clear_timestamps
void reconnect_server_clear_timestamps(reconnect_server *server)
Definition: reconnect_server.cc:116
ReconnectServiceImpl::shutdown_
bool shutdown_
Definition: reconnect_interop_server.cc:153
framework.rpc.grpc_channelz.Server
Server
Definition: grpc_channelz.py:42
start_server
static void start_server(void)
Definition: test-delayed-accept.c:100
OK
@ OK
Definition: cronet_status.h:43
got_sigint
static bool got_sigint
Definition: reconnect_interop_server.cc:56
ReconnectServiceImpl::mu_
std::mutex mu_
Definition: reconnect_interop_server.cc:154
server_address
std::string server_address("0.0.0.0:10000")
ReconnectServiceImpl::tcp_server_
reconnect_server tcp_server_
Definition: reconnect_interop_server.cc:150
server
std::unique_ptr< Server > server
Definition: channelz_service_test.cc:330
profile_analyzer.builder
builder
Definition: profile_analyzer.py:159
GPR_ASSERT
#define GPR_ASSERT(x)
Definition: include/grpc/impl/codegen/log.h:94
signal
static void signal(notification *n)
Definition: alts_tsi_handshaker_test.cc:107
grpc::ServerBuilder
A builder class for the creation and startup of grpc::Server instances.
Definition: grpcpp/server_builder.h:86
gpr_time_sub
GPRAPI gpr_timespec gpr_time_sub(gpr_timespec a, gpr_timespec b)
Definition: src/core/lib/gpr/time.cc:168
ReconnectServiceImpl::Verify
void Verify(ReconnectInfo *response)
Definition: reconnect_interop_server.cc:115
ReconnectServiceImpl::server_started_
bool server_started_
Definition: reconnect_interop_server.cc:152
gpr_log
GPRAPI void gpr_log(const char *file, int line, gpr_log_severity severity, const char *format,...) GPR_PRINT_FORMAT_CHECK(4
ABSL_FLAG
ABSL_FLAG(int32_t, control_port, 0, "Server port for controlling the server.")
reconnect_server::max_reconnect_backoff_ms
int max_reconnect_backoff_ms
Definition: reconnect_server.h:37
grpc.h
reconnect_server
Definition: reconnect_server.h:32
timestamp_list
Definition: reconnect_server.h:27
gpr_time_to_millis
GPRAPI int32_t gpr_time_to_millis(gpr_timespec timespec)
Definition: src/core/lib/gpr/time.cc:221
ReconnectServiceImpl
Definition: reconnect_interop_server.cc:58
ReconnectServiceImpl::Start
Status Start(ServerContext *, const ReconnectParams *request, Empty *) override
Definition: reconnect_interop_server.cc:76
absl::GetFlag
ABSL_MUST_USE_RESULT T GetFlag(const absl::Flag< T > &flag)
Definition: abseil-cpp/absl/flags/flag.h:98
messages_pb2.ReconnectParams
ReconnectParams
Definition: messages_pb2.py:646
RunServer
void RunServer()
Definition: reconnect_interop_server.cc:158
sigint_handler
static void sigint_handler(int)
Definition: reconnect_interop_server.cc:175
Empty
Definition: abseil-cpp/absl/container/internal/compressed_tuple_test.cc:33
ReconnectServiceImpl::serving_
bool serving_
Definition: reconnect_interop_server.cc:151
memory_diff.cur
def cur
Definition: memory_diff.py:83
reconnect_server.h
server_context.h
asyncio_get_stats.response
response
Definition: asyncio_get_stats.py:28
reconnect_server_poll
void reconnect_server_poll(reconnect_server *server, int seconds)
Definition: reconnect_server.cc:112
ReconnectServiceImpl::Stop
Status Stop(ServerContext *, const Empty *, ReconnectInfo *response) override
Definition: reconnect_interop_server.cc:104
grpc::protobuf::util::Status
GRPC_CUSTOM_UTIL_STATUS Status
Definition: include/grpcpp/impl/codegen/config_protobuf.h:93
ReconnectServiceImpl::retry_port_
int retry_port_
Definition: reconnect_interop_server.cc:149
grpc::Status
Definition: include/grpcpp/impl/codegen/status.h:35
reconnect_server_start
void reconnect_server_start(reconnect_server *server, int port)
Definition: reconnect_server.cc:108
test_config.h
ReconnectServiceImpl::Poll
void Poll(int seconds)
Definition: reconnect_interop_server.cc:74
grpc.StatusCode.UNAVAILABLE
tuple UNAVAILABLE
Definition: src/python/grpcio/grpc/__init__.py:278
ReconnectServiceImpl::~ReconnectServiceImpl
~ReconnectServiceImpl() override
Definition: reconnect_interop_server.cc:68
grpc::InsecureServerCredentials
std::shared_ptr< ServerCredentials > InsecureServerCredentials()
Definition: insecure_server_credentials.cc:52
ReconnectServiceImpl::ReconnectServiceImpl
ReconnectServiceImpl(int retry_port)
Definition: reconnect_interop_server.cc:60
reconnect_server_init
void reconnect_server_init(reconnect_server *server)
Definition: reconnect_server.cc:100
server.h
service
__attribute__((deprecated("Please use GRPCProtoMethod."))) @interface ProtoMethod NSString * service
Definition: ProtoMethod.h:25
int32_t
signed int int32_t
Definition: stdint-msvc2008.h:77
ReconnectServiceImpl::cv_
std::condition_variable cv_
Definition: reconnect_interop_server.cc:155
server_builder.h
messages_pb2.ReconnectInfo
ReconnectInfo
Definition: messages_pb2.py:653


grpc
Author(s):
autogenerated on Fri May 16 2025 03:00:00