19 #include <sys/socket.h>
26 #include "absl/status/status.h"
27 #include "absl/strings/str_cat.h"
28 #include "absl/synchronization/mutex.h"
29 #include "absl/time/clock.h"
30 #include "absl/time/time.h"
40 namespace experimental {
44 const char* kStopMessage =
"STOP";
47 const EventEngine::ResolvedAddress& ra) {
49 memcpy(grpc_addr.
addr, ra.address(), ra.size());
50 grpc_addr.
len = ra.size();
66 rv = poll(pfds, nfds, -1);
68 const int saved_errno = errno;
70 if (rv >= 0 || errno != EINTR) {
96 return PollFds(&pfd, 1,
timeout);
105 std::string TryReadBytes(
int sockfd,
int& saved_errno,
int num_expected_bytes) {
107 static constexpr
int kDefaultNumExpectedBytes = 1024;
108 if (num_expected_bytes <= 0) {
109 num_expected_bytes = kDefaultNumExpectedBytes;
112 char*
buffer =
const_cast<char*
>(read_data.c_str());
113 int pending_bytes = num_expected_bytes;
116 ret =
read(sockfd,
buffer + num_expected_bytes - pending_bytes,
119 pending_bytes -=
ret;
121 }
while (pending_bytes > 0 && ((
ret > 0) || (
ret < 0 && errno == EINTR)));
123 return read_data.substr(0, num_expected_bytes - pending_bytes);
135 read_data += TryReadBytes(sockfd, saved_errno,
136 num_expected_bytes - read_data.length());
137 if (saved_errno == EAGAIN &&
138 read_data.length() <
static_cast<size_t>(num_expected_bytes)) {
140 }
else if (saved_errno != 0 && num_expected_bytes > 0) {
144 }
while (read_data.length() <
static_cast<size_t>(num_expected_bytes));
150 int TryWriteBytes(
int sockfd,
int& saved_errno,
std::string write_bytes) {
152 int pending_bytes = write_bytes.length();
156 write_bytes.c_str() + write_bytes.length() - pending_bytes,
159 pending_bytes -=
ret;
161 }
while (pending_bytes > 0 && ((
ret > 0) || (
ret < 0 && errno == EINTR)));
163 return write_bytes.length() - pending_bytes;
171 int WriteBytes(
int sockfd,
int& saved_errno,
std::string write_bytes) {
173 int original_write_length = write_bytes.length();
176 ret = TryWriteBytes(sockfd, saved_errno, write_bytes);
177 if (saved_errno == EAGAIN &&
ret <
static_cast<int>(write_bytes.length())) {
180 }
else if (saved_errno != 0) {
184 write_bytes = write_bytes.substr(
ret, std::string::npos);
185 }
while (write_bytes.length() > 0);
186 return original_write_length;
191 : socket_fd_(socket_fd) {
221 return std::make_unique<PosixOracleEndpoint>(socket_fd);
232 int read_hint_bytes =
259 "Read failed with error = ",
260 std::strerror(saved_errno)))
280 "Write failed with error = ", std::strerror(saved_errno)))
289 std::unique_ptr<MemoryAllocatorFactory> memory_allocator_factory)
291 on_shutdown_(
std::
move(on_shutdown)),
292 memory_allocator_factory_(
std::
move(memory_allocator_factory)) {
336 struct pollfd* pfds =
338 memset(pfds, 0,
sizeof(
struct pollfd) * nfds);
340 for (
int i = 0;
i < nfds;
i++) {
342 pfds[
i].events = POLLIN;
349 if ((pfds[nfds - 1].revents & POLLIN) &&
354 for (
int i = 0;
i < nfds - 1;
i++) {
355 if (!(pfds[
i].revents & POLLIN)) {
359 int client_sock_fd = accept(pfds[
i].fd,
nullptr,
nullptr);
360 if (client_sock_fd < 0) {
362 "Error accepting new connection: %s. Ignoring connection "
364 std::strerror(errno));
382 if (scheme ==
nullptr || strcmp(scheme,
"ipv6") != 0) {
384 "Unsupported bind address type. Only IPV6 addresses are supported "
385 "currently by the PosixOracleListener ...");
391 absl::StrCat(
"Error creating socket: ", std::strerror(errno)));
395 if (setsockopt(
new_socket, SOL_SOCKET, SO_REUSEADDR, &opt,
sizeof(opt))) {
397 absl::StrCat(
"Error setsockopt(SO_REUSEADDR): ", std::strerror(errno)));
399 if (setsockopt(
new_socket, SOL_SOCKET, SO_REUSEPORT, &opt,
sizeof(opt))) {
401 absl::StrCat(
"Error setsockopt(SO_REUSEPORT): ", std::strerror(errno)));
429 if (scheme ==
nullptr || strcmp(scheme,
"ipv6") != 0) {
432 "addresses are currently supported."));
437 "Connect failed: socket creation error: ", std::strerror(errno))));
442 static constexpr
int kMaxRetries = 5;
444 err = connect(client_sock_fd,
const_cast<struct sockaddr*
>(
addr.address()),
446 if (
err < 0 && (errno == EINPROGRESS || errno == EWOULDBLOCK)) {
447 auto status = BlockUntilWritableWithTimeout(
454 }
else if (
err < 0) {
455 if (errno != ECONNREFUSED || ++num_retries > kMaxRetries) {