fake_udp_and_tcp_server.cc
Go to the documentation of this file.
1 //
2 // Copyright 2018 gRPC authors.
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 // http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 //
16 
18 
20 
21 #include "absl/strings/match.h"
22 #include "absl/strings/str_cat.h"
23 
25 #include "test/core/util/port.h"
26 
27 #ifdef GPR_WINDOWS
31 #define BAD_SOCKET_RETURN_VAL INVALID_SOCKET
32 #define CLOSE_SOCKET closesocket
33 #define ERRNO WSAGetLastError()
34 #else
35 #include <fcntl.h>
36 
38 #define BAD_SOCKET_RETURN_VAL (-1)
39 #define CLOSE_SOCKET close
40 #define ERRNO errno
41 #endif
42 
43 namespace grpc_core {
44 namespace testing {
45 
46 namespace {
47 
48 bool ErrorIsRetryable(int error) {
49 #ifdef GPR_WINDOWS
50  return error == WSAEWOULDBLOCK || error == WSAEINPROGRESS;
51 #else
52  return error == EWOULDBLOCK || error == EAGAIN;
53 #endif
54 }
55 
56 } // namespace
57 
59  AcceptMode accept_mode,
61  process_read_cb)
62  : accept_mode_(accept_mode), process_read_cb_(std::move(process_read_cb)) {
64  udp_socket_ = socket(AF_INET6, SOCK_DGRAM, 0);
66  gpr_log(GPR_ERROR, "Failed to create UDP ipv6 socket: %d", ERRNO);
67  GPR_ASSERT(0);
68  }
69  accept_socket_ = socket(AF_INET6, SOCK_STREAM, 0);
70  address_ = absl::StrCat("[::1]:", port_);
72  gpr_log(GPR_ERROR, "Failed to create TCP IPv6 socket: %d", ERRNO);
73  GPR_ASSERT(0);
74  }
75 #ifdef GPR_WINDOWS
76  char val = 1;
77  if (setsockopt(accept_socket_, SOL_SOCKET, SO_REUSEADDR, &val, sizeof(val)) ==
78  SOCKET_ERROR) {
80  "Failed to set SO_REUSEADDR on TCP ipv6 socket to [::1]:%d, "
81  "errno: %d",
82  port_, ERRNO);
83  GPR_ASSERT(0);
84  }
85  grpc_error_handle set_non_block_error;
86  set_non_block_error = grpc_tcp_set_non_block(udp_socket_);
87  if (!GRPC_ERROR_IS_NONE(set_non_block_error)) {
88  gpr_log(GPR_ERROR, "Failed to configure non-blocking socket: %s",
89  grpc_error_std_string(set_non_block_error).c_str());
90  GPR_ASSERT(0);
91  }
92  set_non_block_error = grpc_tcp_set_non_block(accept_socket_);
93  if (!GRPC_ERROR_IS_NONE(set_non_block_error)) {
94  gpr_log(GPR_ERROR, "Failed to configure non-blocking socket: %s",
95  grpc_error_std_string(set_non_block_error).c_str());
96  GPR_ASSERT(0);
97  }
98 #else
99  int val = 1;
100  if (setsockopt(accept_socket_, SOL_SOCKET, SO_REUSEADDR, &val, sizeof(val)) !=
101  0) {
102  gpr_log(GPR_ERROR, "Failed to set SO_REUSEADDR on socket [::1]:%d", port_);
103  GPR_ASSERT(0);
104  }
105  if (fcntl(udp_socket_, F_SETFL, O_NONBLOCK) != 0) {
106  gpr_log(GPR_ERROR, "Failed to set O_NONBLOCK on socket: %d", ERRNO);
107  GPR_ASSERT(0);
108  }
109  if (fcntl(accept_socket_, F_SETFL, O_NONBLOCK) != 0) {
110  gpr_log(GPR_ERROR, "Failed to set O_NONBLOCK on socket: %d", ERRNO);
111  GPR_ASSERT(0);
112  }
113 #endif
115  memset(&addr, 0, sizeof(addr));
116  addr.sin6_family = AF_INET6;
117  addr.sin6_port = htons(port_);
118  (reinterpret_cast<char*>(&addr.sin6_addr))[15] = 1;
119  grpc_resolved_address resolved_addr;
120  memcpy(resolved_addr.addr, &addr, sizeof(addr));
121  resolved_addr.len = sizeof(addr);
122  std::string addr_str = grpc_sockaddr_to_string(&resolved_addr, false).value();
123  gpr_log(GPR_INFO, "Fake UDP and TCP server listening on %s",
124  addr_str.c_str());
125  if (bind(udp_socket_, reinterpret_cast<const sockaddr*>(&addr),
126  sizeof(addr)) != 0) {
127  gpr_log(GPR_ERROR, "Failed to bind UDP socket to [::1]:%d", port_);
128  GPR_ASSERT(0);
129  }
130  if (bind(accept_socket_, reinterpret_cast<const sockaddr*>(&addr),
131  sizeof(addr)) != 0) {
132  gpr_log(GPR_ERROR, "Failed to bind TCP socket to [::1]:%d : %d", port_,
133  ERRNO);
134  GPR_ASSERT(0);
135  }
136  if (listen(accept_socket_, 100)) {
137  gpr_log(GPR_ERROR, "Failed to listen on socket bound to [::1]:%d : %d",
138  port_, ERRNO);
139  GPR_ASSERT(0);
140  }
142  run_server_loop_thd_ = absl::make_unique<std::thread>(
143  std::bind(&FakeUdpAndTcpServer::RunServerLoop, this));
144 }
145 
148  "FakeUdpAndTcpServer stop and "
149  "join server thread");
150  gpr_event_set(&stop_ev_, reinterpret_cast<void*>(1));
151  run_server_loop_thd_->join();
153  "FakeUdpAndTcpServer join server "
154  "thread complete");
157 }
158 
161  int bytes_received_size, int read_error, int s) {
162  if (bytes_received_size < 0 && !ErrorIsRetryable(read_error)) {
163  gpr_log(GPR_ERROR, "Failed to receive from peer socket: %d. errno: %d", s,
164  read_error);
165  GPR_ASSERT(0);
166  }
167  if (bytes_received_size >= 0) {
169  "Fake TCP server received %d bytes from peer socket: %d. Close "
170  "the "
171  "connection.",
172  bytes_received_size, s);
174  }
176 }
177 
180  int read_error, int s) {
181  if (bytes_received_size < 0 && !ErrorIsRetryable(read_error)) {
182  gpr_log(GPR_ERROR, "Failed to receive from peer socket: %d. errno: %d", s,
183  read_error);
184  GPR_ASSERT(0);
185  }
186  if (bytes_received_size == 0) {
187  // The peer has shut down the connection.
189  "Fake TCP server received 0 bytes from peer socket: %d. Close "
190  "the "
191  "connection.",
192  s);
194  }
196 }
197 
199  : fd_(fd) {}
200 
202  CLOSE_SOCKET(fd_);
203 }
204 
207  // https://tools.ietf.org/html/rfc7540#section-4.1
208  const std::vector<char> kEmptyHttp2SettingsFrame = {
209  0x00, 0x00, 0x00, // length
210  0x04, // settings type
211  0x00, // flags
212  0x00, 0x00, 0x00, 0x00 // stream identifier
213  };
214  if (total_bytes_sent_ < int(kEmptyHttp2SettingsFrame.size())) {
215  int bytes_to_send = kEmptyHttp2SettingsFrame.size() - total_bytes_sent_;
216  int bytes_sent =
217  send(fd_, kEmptyHttp2SettingsFrame.data() + total_bytes_sent_,
218  bytes_to_send, 0);
219  if (bytes_sent < 0 && !ErrorIsRetryable(ERRNO)) {
221  "Fake TCP server encountered unexpected error:%d "
222  "sending %d bytes on fd:%d",
223  ERRNO, bytes_to_send, fd_);
224  GPR_ASSERT(0);
225  } else if (bytes_sent > 0) {
226  total_bytes_sent_ += bytes_sent;
227  GPR_ASSERT(total_bytes_sent_ <= int(kEmptyHttp2SettingsFrame.size()));
228  }
229  }
230 }
231 
233  char buf[100];
234  recvfrom(udp_socket_, buf, sizeof(buf), 0, nullptr, nullptr);
235 }
236 
238  std::set<std::unique_ptr<FakeUdpAndTcpServerPeer>> peers;
239  while (!gpr_event_get(&stop_ev_)) {
240  // handle TCP connections
241  int p = accept(accept_socket_, nullptr, nullptr);
242  if (p != BAD_SOCKET_RETURN_VAL) {
243  gpr_log(GPR_DEBUG, "accepted peer socket: %d", p);
244 #ifdef GPR_WINDOWS
245  grpc_error_handle set_non_block_error;
246  set_non_block_error = grpc_tcp_set_non_block(p);
247  if (!GRPC_ERROR_IS_NONE(set_non_block_error)) {
248  gpr_log(GPR_ERROR, "Failed to configure non-blocking socket: %s",
249  grpc_error_std_string(set_non_block_error).c_str());
250  GPR_ASSERT(0);
251  }
252 #else
253  if (fcntl(p, F_SETFL, O_NONBLOCK) != 0) {
254  gpr_log(GPR_ERROR, "Failed to configure non-blocking socket, errno: %d",
255  ERRNO);
256  GPR_ASSERT(0);
257  }
258 #endif
259  peers.insert(absl::make_unique<FakeUdpAndTcpServerPeer>(p));
260  }
261  auto it = peers.begin();
262  while (it != peers.end()) {
263  FakeUdpAndTcpServerPeer* peer = (*it).get();
266  }
267  char buf[100];
268  int bytes_received_size = recv(peer->fd(), buf, 100, 0);
270  process_read_cb_(bytes_received_size, ERRNO, peer->fd());
272  it = peers.erase(it);
273  } else {
274  GPR_ASSERT(r ==
276  it++;
277  }
278  }
279  // read from the UDP socket
283  }
284 }
285 
286 } // namespace testing
287 } // namespace grpc_core
GPR_TIMESPAN
@ GPR_TIMESPAN
Definition: gpr_types.h:45
GPR_INFO
#define GPR_INFO
Definition: include/grpc/impl/codegen/log.h:56
testing
Definition: aws_request_signer_test.cc:25
regen-readme.it
it
Definition: regen-readme.py:15
port.h
sockaddr_utils.h
AF_INET6
#define AF_INET6
Definition: ares_setup.h:208
gpr_event_get
GPRAPI void * gpr_event_get(gpr_event *ev)
Definition: sync.cc:69
absl::StrCat
std::string StrCat(const AlphaNum &a, const AlphaNum &b)
Definition: abseil-cpp/absl/strings/str_cat.cc:98
grpc_core::testing::FakeUdpAndTcpServer::FakeUdpAndTcpServerPeer::MaybeContinueSendingSettings
void MaybeContinueSendingSettings()
Definition: fake_udp_and_tcp_server.cc:206
BAD_SOCKET_RETURN_VAL
#define BAD_SOCKET_RETURN_VAL
Definition: fake_udp_and_tcp_server.cc:38
memset
return memset(p, 0, total)
grpc_core::testing::FakeUdpAndTcpServer::address_
std::string address_
Definition: fake_udp_and_tcp_server.h:130
grpc_core::testing::FakeUdpAndTcpServer::ReadFromUdpSocket
void ReadFromUdpSocket()
Definition: fake_udp_and_tcp_server.cc:232
grpc_core::testing::FakeUdpAndTcpServer::CloseSocketUponReceivingBytesFromPeer
static ProcessReadResult CloseSocketUponReceivingBytesFromPeer(int bytes_received_size, int read_error, int s)
Definition: fake_udp_and_tcp_server.cc:160
grpc_core::testing::FakeUdpAndTcpServer::process_read_cb_
std::function< ProcessReadResult(int, int, int)> process_read_cb_
Definition: fake_udp_and_tcp_server.h:133
grpc_core
Definition: call_metric_recorder.h:31
buf
voidpf void * buf
Definition: bloaty/third_party/zlib/contrib/minizip/ioapi.h:136
gpr_event_set
GPRAPI void gpr_event_set(gpr_event *ev, void *value)
Definition: sync.cc:59
testing::internal::string
::std::string string
Definition: bloaty/third_party/protobuf/third_party/googletest/googletest/include/gtest/internal/gtest-port.h:881
error
grpc_error_handle error
Definition: retry_filter.cc:499
grpc_core::testing::FakeUdpAndTcpServer::FakeUdpAndTcpServerPeer::fd
int fd()
Definition: fake_udp_and_tcp_server.h:119
CLOSE_SOCKET
#define CLOSE_SOCKET
Definition: fake_udp_and_tcp_server.cc:39
grpc_resolved_address
Definition: resolved_address.h:34
socket_windows.h
grpc_sockaddr_to_string
absl::StatusOr< std::string > grpc_sockaddr_to_string(const grpc_resolved_address *resolved_addr, bool normalize)
Definition: sockaddr_utils.cc:194
grpc_core::testing::FakeUdpAndTcpServer::ProcessReadResult
ProcessReadResult
Definition: fake_udp_and_tcp_server.h:74
memcpy
memcpy(mem, inblock.get(), min(CONTAINING_RECORD(inblock.get(), MEMBLOCK, data) ->size, size))
grpc_core::testing::FakeUdpAndTcpServer::accept_mode_
const AcceptMode accept_mode_
Definition: fake_udp_and_tcp_server.h:132
absl::move
constexpr absl::remove_reference_t< T > && move(T &&t) noexcept
Definition: abseil-cpp/absl/utility/utility.h:221
GPR_ASSERT
#define GPR_ASSERT(x)
Definition: include/grpc/impl/codegen/log.h:94
sockaddr_in6
Definition: ares_ipv6.h:25
gen_stats_data.c_str
def c_str(s, encoding='ascii')
Definition: gen_stats_data.py:38
grpc_core::testing::FakeUdpAndTcpServer::udp_socket_
int udp_socket_
Definition: fake_udp_and_tcp_server.h:127
gpr_log
GPRAPI void gpr_log(const char *file, int line, gpr_log_severity severity, const char *format,...) GPR_PRINT_FORMAT_CHECK(4
grpc_core::testing::FakeUdpAndTcpServer::FakeUdpAndTcpServerPeer::~FakeUdpAndTcpServerPeer
~FakeUdpAndTcpServerPeer()
Definition: fake_udp_and_tcp_server.cc:201
gpr_sleep_until
GPRAPI void gpr_sleep_until(gpr_timespec until)
grpc_core::testing::FakeUdpAndTcpServer::accept_socket_
int accept_socket_
Definition: fake_udp_and_tcp_server.h:126
grpc_core::testing::FakeUdpAndTcpServer::run_server_loop_thd_
std::unique_ptr< std::thread > run_server_loop_thd_
Definition: fake_udp_and_tcp_server.h:131
grpc_core::testing::FakeUdpAndTcpServer::ProcessReadResult::kCloseSocket
@ kCloseSocket
grpc_core::testing::FakeUdpAndTcpServer::stop_ev_
gpr_event stop_ev_
Definition: fake_udp_and_tcp_server.h:129
GPR_CLOCK_MONOTONIC
@ GPR_CLOCK_MONOTONIC
Definition: gpr_types.h:36
ERRNO
#define ERRNO
Definition: fake_udp_and_tcp_server.cc:40
grpc_resolved_address::len
socklen_t len
Definition: resolved_address.h:36
gpr_event_init
GPRAPI void gpr_event_init(gpr_event *ev)
Definition: sync.cc:54
grpc_core::testing::FakeUdpAndTcpServer::FakeUdpAndTcpServerPeer::FakeUdpAndTcpServerPeer
FakeUdpAndTcpServerPeer(int fd)
Definition: fake_udp_and_tcp_server.cc:198
GPR_ERROR
#define GPR_ERROR
Definition: include/grpc/impl/codegen/log.h:57
grpc_core::testing::FakeUdpAndTcpServer::AcceptMode
AcceptMode
Definition: fake_udp_and_tcp_server.h:79
grpc_pick_unused_port_or_die
int grpc_pick_unused_port_or_die(void)
fake_udp_and_tcp_server.h
gpr_now
GPRAPI gpr_timespec gpr_now(gpr_clock_type clock)
tcp_windows.h
grpc_core::testing::FakeUdpAndTcpServer::port_
int port_
Definition: fake_udp_and_tcp_server.h:128
gpr_time_add
GPRAPI gpr_timespec gpr_time_add(gpr_timespec a, gpr_timespec b)
Definition: src/core/lib/gpr/time.cc:135
grpc_core::testing::FakeUdpAndTcpServer::AcceptMode::kEagerlySendSettings
@ kEagerlySendSettings
grpc_core::testing::FakeUdpAndTcpServer::ProcessReadResult::kContinueReading
@ kContinueReading
grpc_error_std_string
std::string grpc_error_std_string(grpc_error_handle error)
Definition: error.cc:944
http2_test_server.listen
def listen(endpoint, test_case)
Definition: http2_test_server.py:87
grpc_core::testing::FakeUdpAndTcpServer::FakeUdpAndTcpServer
FakeUdpAndTcpServer(AcceptMode accept_mode, std::function< ProcessReadResult(int, int, int)> process_read_cb)
Definition: fake_udp_and_tcp_server.cc:58
fix_build_deps.r
r
Definition: fix_build_deps.py:491
std
Definition: grpcpp/impl/codegen/async_unary_call.h:407
grpc_core::testing::FakeUdpAndTcpServer::~FakeUdpAndTcpServer
~FakeUdpAndTcpServer()
Definition: fake_udp_and_tcp_server.cc:146
sockaddr_windows.h
grpc_core::testing::FakeUdpAndTcpServer::RunServerLoop
void RunServerLoop()
Definition: fake_udp_and_tcp_server.cc:237
gpr_time_from_millis
GPRAPI gpr_timespec gpr_time_from_millis(int64_t ms, gpr_clock_type clock_type)
Definition: src/core/lib/gpr/time.cc:119
grpc_core::testing::FakeUdpAndTcpServer::CloseSocketUponCloseFromPeer
static ProcessReadResult CloseSocketUponCloseFromPeer(int bytes_received_size, int read_error, int s)
Definition: fake_udp_and_tcp_server.cc:179
GPR_DEBUG
#define GPR_DEBUG
Definition: include/grpc/impl/codegen/log.h:55
test_server.socket
socket
Definition: test_server.py:65
absl::StatusOr::value
const T & value() const &ABSL_ATTRIBUTE_LIFETIME_BOUND
Definition: abseil-cpp/absl/status/statusor.h:687
grpc_error
Definition: error_internal.h:42
function
std::function< bool(GrpcTool *, int, const char **, const CliCredentials &, GrpcToolOutputCallback)> function
Definition: grpc_tool.cc:250
sockaddr_posix.h
grpc_resolved_address::addr
char addr[GRPC_MAX_SOCKADDR_SIZE]
Definition: resolved_address.h:35
bytes_sent
static size_t bytes_sent
Definition: test-tcp-writealot.c:44
addr
struct sockaddr_in addr
Definition: libuv/docs/code/tcp-echo-server/main.c:10
grpc_core::testing::FakeUdpAndTcpServer::FakeUdpAndTcpServerPeer
Definition: fake_udp_and_tcp_server.h:111
GRPC_ERROR_IS_NONE
#define GRPC_ERROR_IS_NONE(err)
Definition: error.h:241
port_platform.h


grpc
Author(s):
autogenerated on Fri May 16 2025 02:58:22