oracle_event_engine_posix.cc
Go to the documentation of this file.
1 // Copyright 2022 gRPC authors.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
16 
17 #include <poll.h>
18 #include <sys/poll.h>
19 #include <sys/socket.h>
20 
21 #include <algorithm>
22 #include <cerrno>
23 #include <cstring>
24 #include <memory>
25 
26 #include "absl/status/status.h"
27 #include "absl/strings/str_cat.h"
28 #include "absl/synchronization/mutex.h"
29 #include "absl/time/clock.h"
30 #include "absl/time/time.h"
31 
33 #include <grpc/support/alloc.h>
34 #include <grpc/support/log.h>
35 
38 
39 namespace grpc_event_engine {
40 namespace experimental {
41 
42 namespace {
43 
44 const char* kStopMessage = "STOP";
45 
46 grpc_resolved_address CreateGRPCResolvedAddress(
47  const EventEngine::ResolvedAddress& ra) {
48  grpc_resolved_address grpc_addr;
49  memcpy(grpc_addr.addr, ra.address(), ra.size());
50  grpc_addr.len = ra.size();
51  return grpc_addr;
52 }
53 
54 // Blocks until poll(2) indicates that one of the fds has pending I/O
55 // the deadline is reached whichever comes first. Returns an OK
56 // status a valid I/O event is available for at least one of the fds, a Status
57 // with canonical code DEADLINE_EXCEEDED if the deadline expired and a non-OK
58 // Status if any other error occurred.
59 absl::Status PollFds(struct pollfd* pfds, int nfds, absl::Duration timeout) {
60  int rv;
61  while (true) {
63  rv = poll(pfds, nfds,
64  static_cast<int>(absl::ToInt64Milliseconds(timeout)));
65  } else {
66  rv = poll(pfds, nfds, /* timeout = */ -1);
67  }
68  const int saved_errno = errno;
69  errno = saved_errno;
70  if (rv >= 0 || errno != EINTR) {
71  break;
72  }
73  }
74  if (rv < 0) {
75  return absl::UnknownError(std::strerror(errno));
76  }
77  if (rv == 0) {
78  return absl::CancelledError("Deadline exceeded");
79  }
80  return absl::OkStatus();
81 }
82 
83 absl::Status BlockUntilReadable(int fd) {
84  struct pollfd pfd;
85  pfd.fd = fd;
86  pfd.events = POLLIN;
87  pfd.revents = 0;
88  return PollFds(&pfd, 1, absl::InfiniteDuration());
89 }
90 
91 absl::Status BlockUntilWritableWithTimeout(int fd, absl::Duration timeout) {
92  struct pollfd pfd;
93  pfd.fd = fd;
94  pfd.events = POLLOUT;
95  pfd.revents = 0;
96  return PollFds(&pfd, 1, timeout);
97 }
98 
99 absl::Status BlockUntilWritable(int fd) {
100  return BlockUntilWritableWithTimeout(fd, absl::InfiniteDuration());
101 }
102 
103 // Tries to read upto num_expected_bytes from the socket. It returns early if
104 // specified data is not yet available.
105 std::string TryReadBytes(int sockfd, int& saved_errno, int num_expected_bytes) {
106  int ret = 0;
107  static constexpr int kDefaultNumExpectedBytes = 1024;
108  if (num_expected_bytes <= 0) {
109  num_expected_bytes = kDefaultNumExpectedBytes;
110  }
111  std::string read_data = std::string(num_expected_bytes, '\0');
112  char* buffer = const_cast<char*>(read_data.c_str());
113  int pending_bytes = num_expected_bytes;
114  do {
115  errno = 0;
116  ret = read(sockfd, buffer + num_expected_bytes - pending_bytes,
117  pending_bytes);
118  if (ret > 0) {
119  pending_bytes -= ret;
120  }
121  } while (pending_bytes > 0 && ((ret > 0) || (ret < 0 && errno == EINTR)));
122  saved_errno = errno;
123  return read_data.substr(0, num_expected_bytes - pending_bytes);
124 }
125 
126 // Blocks calling thread until the specified number of bytes have been
127 // read from the provided socket or it encounters an unrecoverable error. It
128 // puts the read bytes into a string and returns the string. If it encounters an
129 // error, it returns an empty string and updates saved_errno with the
130 // appropriate errno.
131 std::string ReadBytes(int sockfd, int& saved_errno, int num_expected_bytes) {
132  std::string read_data;
133  do {
134  saved_errno = 0;
135  read_data += TryReadBytes(sockfd, saved_errno,
136  num_expected_bytes - read_data.length());
137  if (saved_errno == EAGAIN &&
138  read_data.length() < static_cast<size_t>(num_expected_bytes)) {
139  GPR_ASSERT(BlockUntilReadable(sockfd).ok());
140  } else if (saved_errno != 0 && num_expected_bytes > 0) {
141  read_data.clear();
142  break;
143  }
144  } while (read_data.length() < static_cast<size_t>(num_expected_bytes));
145  return read_data;
146 }
147 
148 // Tries to write the specified bytes over the socket. It returns the number of
149 // bytes actually written.
150 int TryWriteBytes(int sockfd, int& saved_errno, std::string write_bytes) {
151  int ret = 0;
152  int pending_bytes = write_bytes.length();
153  do {
154  errno = 0;
155  ret = write(sockfd,
156  write_bytes.c_str() + write_bytes.length() - pending_bytes,
157  pending_bytes);
158  if (ret > 0) {
159  pending_bytes -= ret;
160  }
161  } while (pending_bytes > 0 && ((ret > 0) || (ret < 0 && errno == EINTR)));
162  saved_errno = errno;
163  return write_bytes.length() - pending_bytes;
164 }
165 
166 // Blocks calling thread until the specified number of bytes have been
167 // written over the provided socket or it encounters an unrecoverable error. The
168 // bytes to write are specified as a string. If it encounters an error, it
169 // returns an empty string and updates saved_errno with the appropriate errno
170 // and returns a value less than zero.
171 int WriteBytes(int sockfd, int& saved_errno, std::string write_bytes) {
172  int ret = 0;
173  int original_write_length = write_bytes.length();
174  do {
175  saved_errno = 0;
176  ret = TryWriteBytes(sockfd, saved_errno, write_bytes);
177  if (saved_errno == EAGAIN && ret < static_cast<int>(write_bytes.length())) {
178  GPR_ASSERT(ret >= 0);
179  GPR_ASSERT(BlockUntilWritable(sockfd).ok());
180  } else if (saved_errno != 0) {
181  GPR_ASSERT(ret < 0);
182  return ret;
183  }
184  write_bytes = write_bytes.substr(ret, std::string::npos);
185  } while (write_bytes.length() > 0);
186  return original_write_length;
187 }
188 } // namespace
189 
191  : socket_fd_(socket_fd) {
192  read_ops_ = grpc_core::Thread(
193  "read_ops_thread",
194  [](void* arg) {
196  },
197  this);
198  write_ops_ = grpc_core::Thread(
199  "write_ops_thread",
200  [](void* arg) {
202  },
203  this);
204  read_ops_.Start();
205  write_ops_.Start();
206 }
207 
209  absl::MutexLock lock(&mu_);
210  if (absl::exchange(is_shutdown_, true)) {
211  return;
212  }
215  read_ops_.Join();
216  write_ops_.Join();
217 }
218 
219 std::unique_ptr<PosixOracleEndpoint> PosixOracleEndpoint::Create(
220  int socket_fd) {
221  return std::make_unique<PosixOracleEndpoint>(socket_fd);
222 }
223 
225  Shutdown();
226  close(socket_fd_);
227 }
228 
230  SliceBuffer* buffer, const ReadArgs* args) {
231  GPR_ASSERT(buffer != nullptr);
232  int read_hint_bytes =
233  args != nullptr ? std::max(1, static_cast<int>(args->read_hint_bytes))
234  : 0;
235  read_ops_channel_.Set(
236  ReadOperation(read_hint_bytes, buffer, std::move(on_read)));
237 }
238 
240  SliceBuffer* data, const WriteArgs* /*args*/) {
241  GPR_ASSERT(data != nullptr);
242  write_ops_channel_.Set(WriteOperation(data, std::move(on_writable)));
243 }
244 
246  gpr_log(GPR_INFO, "Starting thread to process read ops ...");
247  while (true) {
249  read_op = read_ops_channel_.Get();
250  read_ops_channel_.Reset();
251  if (!read_op.IsValid()) {
253  break;
254  }
255  int saved_errno;
256  std::string read_data =
257  ReadBytes(socket_fd_, saved_errno, read_op.GetNumBytesToRead());
258  read_op(read_data, read_data.empty() ? absl::CancelledError(absl::StrCat(
259  "Read failed with error = ",
260  std::strerror(saved_errno)))
261  : absl::OkStatus());
262  }
263  gpr_log(GPR_INFO, "Shutting down read ops thread ...");
264 }
265 
267  gpr_log(GPR_INFO, "Starting thread to process write ops ...");
268  while (true) {
271  write_ops_channel_.Reset();
272  if (!write_op.IsValid()) {
273  write_op(absl::CancelledError("Closed"));
274  break;
275  }
276  int saved_errno;
277  int ret = WriteBytes(socket_fd_, saved_errno, write_op.GetBytesToWrite());
278  write_op(
280  "Write failed with error = ", std::strerror(saved_errno)))
281  : absl::OkStatus());
282  }
283  gpr_log(GPR_INFO, "Shutting down write ops thread ...");
284 }
285 
288  std::function<void(absl::Status)> on_shutdown,
289  std::unique_ptr<MemoryAllocatorFactory> memory_allocator_factory)
290  : on_accept_(std::move(on_accept)),
291  on_shutdown_(std::move(on_shutdown)),
292  memory_allocator_factory_(std::move(memory_allocator_factory)) {
293  if (pipe(pipefd_) == -1) {
294  gpr_log(GPR_ERROR, "Error creating pipe: %s", std::strerror(errno));
295  abort();
296  }
297 }
298 
300  absl::MutexLock lock(&mu_);
301  GPR_ASSERT(!listener_fds_.empty());
302  if (absl::exchange(is_started_, true)) {
303  return absl::InternalError("Cannot start listener more than once ...");
304  }
306  "accept_thread",
307  [](void* arg) {
309  },
310  this);
311  serve_.Start();
312  return absl::OkStatus();
313 }
314 
316  absl::MutexLock lock(&mu_);
317  if (!is_started_) {
318  serve_.Join();
319  return;
320  }
321  for (int i = 0; i < static_cast<int>(listener_fds_.size()); i++) {
322  shutdown(listener_fds_[i], SHUT_RDWR);
323  }
324  // Send a STOP message over the pipe.
325  write(pipefd_[1], kStopMessage, strlen(kStopMessage));
326  serve_.Join();
328 }
329 
331  gpr_log(GPR_INFO, "Starting accept thread ...");
332  GPR_ASSERT(!listener_fds_.empty());
333  int nfds = listener_fds_.size();
334  // Add one extra file descriptor to poll the pipe fd.
335  ++nfds;
336  struct pollfd* pfds =
337  static_cast<struct pollfd*>(gpr_malloc(sizeof(struct pollfd) * nfds));
338  memset(pfds, 0, sizeof(struct pollfd) * nfds);
339  while (true) {
340  for (int i = 0; i < nfds; i++) {
341  pfds[i].fd = i == nfds - 1 ? pipefd_[0] : listener_fds_[i];
342  pfds[i].events = POLLIN;
343  pfds[i].revents = 0;
344  }
345  if (!PollFds(pfds, nfds, absl::InfiniteDuration()).ok()) {
346  break;
347  }
348  int saved_errno = 0;
349  if ((pfds[nfds - 1].revents & POLLIN) &&
350  ReadBytes(pipefd_[0], saved_errno, strlen(kStopMessage)) ==
351  std::string(kStopMessage)) {
352  break;
353  }
354  for (int i = 0; i < nfds - 1; i++) {
355  if (!(pfds[i].revents & POLLIN)) {
356  continue;
357  }
358  // pfds[i].fd has a readable event.
359  int client_sock_fd = accept(pfds[i].fd, nullptr, nullptr);
360  if (client_sock_fd < 0) {
362  "Error accepting new connection: %s. Ignoring connection "
363  "attempt ...",
364  std::strerror(errno));
365  continue;
366  }
367  on_accept_(PosixOracleEndpoint::Create(client_sock_fd),
368  memory_allocator_factory_->CreateMemoryAllocator("test"));
369  }
370  }
371  gpr_log(GPR_INFO, "Shutting down accept thread ...");
372  gpr_free(pfds);
373 }
374 
377  absl::MutexLock lock(&mu_);
378  int new_socket;
379  int opt = -1;
380  grpc_resolved_address address = CreateGRPCResolvedAddress(addr);
381  const char* scheme = grpc_sockaddr_get_uri_scheme(&address);
382  if (scheme == nullptr || strcmp(scheme, "ipv6") != 0) {
384  "Unsupported bind address type. Only IPV6 addresses are supported "
385  "currently by the PosixOracleListener ...");
386  }
387 
388  // Creating a new socket file descriptor.
389  if ((new_socket = socket(AF_INET6, SOCK_STREAM, 0)) <= 0) {
390  return absl::UnknownError(
391  absl::StrCat("Error creating socket: ", std::strerror(errno)));
392  }
393  // MacOS biulds fail if SO_REUSEADDR and SO_REUSEPORT are set in the same
394  // setsockopt syscall. So they are set separately one after the other.
395  if (setsockopt(new_socket, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt))) {
396  return absl::UnknownError(
397  absl::StrCat("Error setsockopt(SO_REUSEADDR): ", std::strerror(errno)));
398  }
399  if (setsockopt(new_socket, SOL_SOCKET, SO_REUSEPORT, &opt, sizeof(opt))) {
400  return absl::UnknownError(
401  absl::StrCat("Error setsockopt(SO_REUSEPORT): ", std::strerror(errno)));
402  }
403 
404  // Forcefully bind the new socket.
405  if (bind(new_socket, reinterpret_cast<const struct sockaddr*>(addr.address()),
406  address.len) < 0) {
407  return absl::UnknownError(
408  absl::StrCat("Error bind: ", std::strerror(errno)));
409  }
410  // Set the new socket to listen for one active connection at a time.
411  if (listen(new_socket, 1) < 0) {
412  return absl::UnknownError(
413  absl::StrCat("Error listen: ", std::strerror(errno)));
414  }
415  listener_fds_.push_back(new_socket);
416  return 0;
417 }
418 
419 // PosixOracleEventEngine implements blocking connect. It blocks the calling
420 // thread until either connect succeeds or fails with timeout.
423  const EndpointConfig& /*args*/, MemoryAllocator /*memory_allocator*/,
425  int client_sock_fd;
427  grpc_resolved_address address = CreateGRPCResolvedAddress(addr);
428  const char* scheme = grpc_sockaddr_get_uri_scheme(&address);
429  if (scheme == nullptr || strcmp(scheme, "ipv6") != 0) {
430  on_connect(
431  absl::CancelledError("Unsupported bind address type. Only ipv6 "
432  "addresses are currently supported."));
433  return {};
434  }
435  if ((client_sock_fd = socket(AF_INET6, SOCK_STREAM, 0)) < 0) {
437  "Connect failed: socket creation error: ", std::strerror(errno))));
438  return {};
439  }
440  int err;
441  int num_retries = 0;
442  static constexpr int kMaxRetries = 5;
443  do {
444  err = connect(client_sock_fd, const_cast<struct sockaddr*>(addr.address()),
445  address.len);
446  if (err < 0 && (errno == EINPROGRESS || errno == EWOULDBLOCK)) {
447  auto status = BlockUntilWritableWithTimeout(
448  client_sock_fd,
449  std::max(deadline - absl::Now(), absl::ZeroDuration()));
450  if (!status.ok()) {
452  return {};
453  }
454  } else if (err < 0) {
455  if (errno != ECONNREFUSED || ++num_retries > kMaxRetries) {
456  on_connect(absl::CancelledError("Connect failed."));
457  return {};
458  }
459  // If ECONNREFUSED && num_retries < kMaxRetries, wait a while and try
460  // again.
462  }
463  } while (err < 0 && absl::Now() < deadline);
464  if (err < 0 && absl::Now() >= deadline) {
465  on_connect(absl::CancelledError("Deadline exceeded"));
466  } else {
467  on_connect(PosixOracleEndpoint::Create(client_sock_fd));
468  }
469  return {};
470 }
471 
472 } // namespace experimental
473 } // namespace grpc_event_engine
GPR_INFO
#define GPR_INFO
Definition: include/grpc/impl/codegen/log.h:56
absl::FromChrono
Time FromChrono(const std::chrono::system_clock::time_point &tp)
Definition: third_party/abseil-cpp/absl/time/time.cc:334
log.h
sockaddr_utils.h
AF_INET6
#define AF_INET6
Definition: ares_setup.h:208
grpc_event_engine::experimental::EventEngine::Listener::AcceptCallback
std::function< void(std::unique_ptr< Endpoint >, MemoryAllocator memory_allocator)> AcceptCallback
Called when the listener has accepted a new client connection.
Definition: event_engine.h:232
oracle_event_engine_posix.h
absl::StrCat
std::string StrCat(const AlphaNum &a, const AlphaNum &b)
Definition: abseil-cpp/absl/strings/str_cat.cc:98
absl::ZeroDuration
constexpr Duration ZeroDuration()
Definition: third_party/abseil-cpp/absl/time/time.h:308
memset
return memset(p, 0, total)
grpc_event_engine::experimental::MemoryAllocator
Definition: memory_allocator.h:35
absl::Time
Definition: third_party/abseil-cpp/absl/time/time.h:642
grpc_event_engine::experimental::PosixOracleListener::serve_
grpc_core::Thread serve_
Definition: oracle_event_engine_posix.h:135
grpc_event_engine::experimental::PosixOracleEndpoint::Shutdown
void Shutdown()
Definition: oracle_event_engine_posix.cc:208
write
#define write
Definition: test-fs.c:47
grpc_event_engine::experimental::PosixOracleListener::Start
absl::Status Start() override
Definition: oracle_event_engine_posix.cc:299
event_engine.h
grpc_event_engine::experimental::PosixOracleEndpoint::mu_
absl::Mutex mu_
Definition: oracle_event_engine_posix.h:109
grpc_event_engine::experimental::PosixOracleEndpoint::socket_fd_
int socket_fd_
Definition: oracle_event_engine_posix.h:111
absl::CancelledError
Status CancelledError(absl::string_view message)
Definition: third_party/abseil-cpp/absl/status/status.cc:331
grpc_event_engine::experimental::PosixOracleEndpoint::Write
void Write(std::function< void(absl::Status)> on_writable, SliceBuffer *data, const WriteArgs *args) override
Definition: oracle_event_engine_posix.cc:239
gpr_free
GPRAPI void gpr_free(void *ptr)
Definition: alloc.cc:51
testing::internal::string
::std::string string
Definition: bloaty/third_party/protobuf/third_party/googletest/googletest/include/gtest/internal/gtest-port.h:881
grpc_event_engine::experimental::PosixOracleListener
Definition: oracle_event_engine_posix.h:118
error_ref_leak.err
err
Definition: error_ref_leak.py:35
grpc_event_engine::experimental::EventEngine::Duration
std::chrono::duration< int64_t, std::nano > Duration
Definition: event_engine.h:80
grpc_event_engine::experimental::PosixOracleListener::on_shutdown_
std::function< void(absl::Status)> on_shutdown_
Definition: oracle_event_engine_posix.h:133
grpc_resolved_address
Definition: resolved_address.h:34
absl::OkStatus
Status OkStatus()
Definition: third_party/abseil-cpp/absl/status/status.h:882
gpr_malloc
GPRAPI void * gpr_malloc(size_t size)
Definition: alloc.cc:29
grpc_event_engine::experimental::PosixOracleEventEngine::Connect
ConnectionHandle Connect(OnConnectCallback on_connect, const ResolvedAddress &addr, const EndpointConfig &args, MemoryAllocator memory_allocator, EventEngine::Duration timeout) override
Definition: oracle_event_engine_posix.cc:421
status
absl::Status status
Definition: rls.cc:251
absl::InternalError
Status InternalError(absl::string_view message)
Definition: third_party/abseil-cpp/absl/status/status.cc:347
absl::SleepFor
void SleepFor(absl::Duration duration)
Definition: abseil-cpp/absl/time/clock.h:70
resolved_address.h
grpc_event_engine::experimental::PosixOracleListener::mu_
absl::Mutex mu_
Definition: oracle_event_engine_posix.h:131
grpc_event_engine::experimental::PosixOracleListener::listener_fds_
std::vector< int > listener_fds_
Definition: oracle_event_engine_posix.h:138
grpc_event_engine::experimental::PosixOracleListener::is_started_
bool is_started_
Definition: oracle_event_engine_posix.h:137
new_socket
static int new_socket(uv_tcp_t *handle, int domain, unsigned long flags)
Definition: unix/tcp.c:31
gen_build_yaml.struct
def struct(**kwargs)
Definition: test/core/end2end/gen_build_yaml.py:30
grpc_event_engine::experimental::EndpointConfig
Definition: endpoint_config.h:31
grpc_event_engine::experimental::EventEngine::ResolvedAddress
Definition: event_engine.h:118
absl::ToInt64Milliseconds
int64_t ToInt64Milliseconds(Duration d)
Definition: abseil-cpp/absl/time/duration.cc:560
absl::Milliseconds
constexpr Duration Milliseconds(T n)
Definition: third_party/abseil-cpp/absl/time/time.h:415
absl::MutexLock
Definition: abseil-cpp/absl/synchronization/mutex.h:525
grpc_event_engine::experimental::PosixOracleEndpoint::PosixOracleEndpoint
PosixOracleEndpoint(int socket_fd)
Definition: oracle_event_engine_posix.cc:190
memcpy
memcpy(mem, inblock.get(), min(CONTAINING_RECORD(inblock.get(), MEMBLOCK, data) ->size, size))
grpc_event_engine::experimental::PosixOracleListener::Bind
absl::StatusOr< int > Bind(const EventEngine::ResolvedAddress &addr) override
Definition: oracle_event_engine_posix.cc:375
asyncio_get_stats.args
args
Definition: asyncio_get_stats.py:40
on_read
static grpc_closure on_read
Definition: bad_server_response_test.cc:88
absl::move
constexpr absl::remove_reference_t< T > && move(T &&t) noexcept
Definition: abseil-cpp/absl/utility/utility.h:221
GPR_ASSERT
#define GPR_ASSERT(x)
Definition: include/grpc/impl/codegen/log.h:94
max
int max
Definition: bloaty/third_party/zlib/examples/enough.c:170
grpc_event_engine::experimental::EventEngine::Endpoint::WriteArgs
Definition: event_engine.h:181
gpr_log
GPRAPI void gpr_log(const char *file, int line, gpr_log_severity severity, const char *format,...) GPR_PRINT_FORMAT_CHECK(4
absl::UnknownError
Status UnknownError(absl::string_view message)
Definition: third_party/abseil-cpp/absl/status/status.cc:383
grpc_event_engine::experimental::PosixOracleListener::pipefd_
int pipefd_[2]
Definition: oracle_event_engine_posix.h:136
grpc_event_engine::experimental::EventEngine::ConnectionHandle
Definition: event_engine.h:108
grpc_event_engine::experimental::PosixOracleEndpoint::is_shutdown_
bool is_shutdown_
Definition: oracle_event_engine_posix.h:110
grpc_event_engine::experimental::PosixOracleEndpoint::ReadOperation
Definition: oracle_event_engine_posix.h:60
arg
Definition: cmdline.cc:40
absl::Duration
Definition: third_party/abseil-cpp/absl/time/time.h:159
close
#define close
Definition: test-fs.c:48
grpc_core::Thread::Join
void Join()
Definition: thd.h:141
write_op
static grpc_op write_op
Definition: test/core/fling/server.cc:56
grpc_resolved_address::len
socklen_t len
Definition: resolved_address.h:36
data
char data[kBufferLength]
Definition: abseil-cpp/absl/strings/internal/str_format/float_conversion.cc:1006
grpc_event_engine::experimental::PosixOracleEndpoint::Create
static std::unique_ptr< PosixOracleEndpoint > Create(int socket_fd)
Definition: oracle_event_engine_posix.cc:219
buffer
char buffer[1024]
Definition: libuv/docs/code/idle-compute/main.c:8
grpc_event_engine::experimental::EventEngine::Endpoint::ReadArgs
Definition: event_engine.h:150
GPR_ERROR
#define GPR_ERROR
Definition: include/grpc/impl/codegen/log.h:57
grpc_event_engine::experimental::PosixOracleEndpoint
Definition: oracle_event_engine_posix.h:40
grpc_core::Thread::Start
void Start()
Definition: thd.h:125
on_connect
void on_connect(uv_connect_t *req, int status)
Definition: libuv/docs/code/dns/main.c:32
grpc_event_engine::experimental::PosixOracleEndpoint::ProcessWriteOperations
void ProcessWriteOperations()
Definition: oracle_event_engine_posix.cc:266
grpc_event_engine::experimental::PosixOracleListener::HandleIncomingConnections
void HandleIncomingConnections()
Definition: oracle_event_engine_posix.cc:330
grpc_event_engine::experimental::EventEngine::OnConnectCallback
std::function< void(absl::StatusOr< std::unique_ptr< Endpoint > >)> OnConnectCallback
Definition: event_engine.h:224
absl::Now
ABSL_NAMESPACE_BEGIN Time Now()
Definition: abseil-cpp/absl/time/clock.cc:39
read_op
static grpc_op read_op
Definition: test/core/fling/server.cc:54
read
int read(izstream &zs, T *x, Items items)
Definition: bloaty/third_party/zlib/contrib/iostream2/zstream.h:115
grpc_event_engine::experimental::PosixOracleEndpoint::ProcessReadOperations
void ProcessReadOperations()
Definition: oracle_event_engine_posix.cc:245
grpc_event_engine::experimental::PosixOracleListener::PosixOracleListener
PosixOracleListener(EventEngine::Listener::AcceptCallback on_accept, std::function< void(absl::Status)> on_shutdown, std::unique_ptr< MemoryAllocatorFactory > memory_allocator_factory)
Definition: oracle_event_engine_posix.cc:286
http2_test_server.listen
def listen(endpoint, test_case)
Definition: http2_test_server.py:87
poll.h
absl::Status
Definition: third_party/abseil-cpp/absl/status/status.h:424
grpc_event_engine::experimental::PosixOracleListener::memory_allocator_factory_
std::unique_ptr< MemoryAllocatorFactory > memory_allocator_factory_
Definition: oracle_event_engine_posix.h:134
ret
UniquePtr< SSL_SESSION > ret
Definition: ssl_x509.cc:1029
alloc.h
on_accept
static void on_accept(void *arg, grpc_endpoint *endpoint, grpc_pollset *, grpc_tcp_server_acceptor *acceptor)
Definition: http_proxy_fixture.cc:567
grpc_event_engine::experimental::SliceBuffer
Definition: include/grpc/event_engine/slice_buffer.h:51
std
Definition: grpcpp/impl/codegen/async_unary_call.h:407
grpc_event_engine
Definition: endpoint_config.h:24
grpc_event_engine::experimental::PosixOracleListener::on_accept_
EventEngine::Listener::AcceptCallback on_accept_
Definition: oracle_event_engine_posix.h:132
ok
bool ok
Definition: async_end2end_test.cc:197
arg
struct arg arg
absl::Status::ok
ABSL_MUST_USE_RESULT bool ok() const
Definition: third_party/abseil-cpp/absl/status/status.h:802
grpc_core::Thread
Definition: thd.h:43
absl::UnimplementedError
Status UnimplementedError(absl::string_view message)
Definition: third_party/abseil-cpp/absl/status/status.cc:379
test_server.socket
socket
Definition: test_server.py:65
grpc_sockaddr_get_uri_scheme
const char * grpc_sockaddr_get_uri_scheme(const grpc_resolved_address *resolved_addr)
Definition: sockaddr_utils.cc:282
grpc_event_engine::experimental::PosixOracleEndpoint::read_ops_channel_
Promise< ReadOperation > read_ops_channel_
Definition: oracle_event_engine_posix.h:112
grpc_event_engine::experimental::PosixOracleEndpoint::write_ops_channel_
Promise< WriteOperation > write_ops_channel_
Definition: oracle_event_engine_posix.h:113
absl::StatusOr
Definition: abseil-cpp/absl/status/statusor.h:187
function
std::function< bool(GrpcTool *, int, const char **, const CliCredentials &, GrpcToolOutputCallback)> function
Definition: grpc_tool.cc:250
grpc_event_engine::experimental::PosixOracleEndpoint::Read
void Read(std::function< void(absl::Status)> on_read, SliceBuffer *buffer, const ReadArgs *args) override
Definition: oracle_event_engine_posix.cc:229
absl::InfiniteDuration
constexpr Duration InfiniteDuration()
Definition: third_party/abseil-cpp/absl/time/time.h:1573
grpc_resolved_address::addr
char addr[GRPC_MAX_SOCKADDR_SIZE]
Definition: resolved_address.h:35
grpc_event_engine::experimental::PosixOracleEndpoint::WriteOperation
Definition: oracle_event_engine_posix.h:86
bloaty::ReadBytes
absl::string_view ReadBytes(size_t bytes, absl::string_view *data)
Definition: bloaty/src/util.h:165
addr
struct sockaddr_in addr
Definition: libuv/docs/code/tcp-echo-server/main.c:10
grpc_event_engine::experimental::PosixOracleEndpoint::~PosixOracleEndpoint
~PosixOracleEndpoint() override
Definition: oracle_event_engine_posix.cc:224
timeout
uv_timer_t timeout
Definition: libuv/docs/code/uvwget/main.c:9
i
uint64_t i
Definition: abseil-cpp/absl/container/btree_benchmark.cc:230
absl::exchange
T exchange(T &obj, U &&new_value)
Definition: abseil-cpp/absl/utility/utility.h:314
grpc_event_engine::experimental::PosixOracleListener::~PosixOracleListener
~PosixOracleListener() override
Definition: oracle_event_engine_posix.cc:315


grpc
Author(s):
autogenerated on Thu Mar 13 2025 03:00:46