oracle_event_engine_posix.h
Go to the documentation of this file.
1 // Copyright 2022 gRPC authors.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #ifndef GRPC_TEST_CORE_EVENT_ENGINE_TEST_SUITE_ORACLE_EVENT_ENGINE_POSIX_H_
16 #define GRPC_TEST_CORE_EVENT_ENGINE_TEST_SUITE_ORACLE_EVENT_ENGINE_POSIX_H_
17 
18 #include <functional>
19 #include <memory>
20 #include <string>
21 #include <unordered_map>
22 #include <utility>
23 
24 #include "absl/status/status.h"
25 #include "absl/status/statusor.h"
26 #include "absl/time/time.h"
27 
30 #include <grpc/support/log.h>
31 
33 #include "src/core/lib/gprpp/thd.h"
36 
37 namespace grpc_event_engine {
38 namespace experimental {
39 
41  public:
42  explicit PosixOracleEndpoint(int socket_fd);
43  static std::unique_ptr<PosixOracleEndpoint> Create(int socket_fd);
44  ~PosixOracleEndpoint() override;
46  const ReadArgs* args) override;
47  void Write(std::function<void(absl::Status)> on_writable, SliceBuffer* data,
48  const WriteArgs* args) override;
49  void Shutdown();
51  GPR_ASSERT(false && "unimplemented");
52  }
54  GPR_ASSERT(false && "unimplemented");
55  }
56 
57  private:
58  // An internal helper class definition of Read operations to be performed
59  // by the TCPServerEndpoint.
60  class ReadOperation {
61  public:
63  : num_bytes_to_read_(-1), buffer_(nullptr), on_complete_(nullptr) {}
64  ReadOperation(int num_bytes_to_read, SliceBuffer* buffer,
65  std::function<void(absl::Status)>&& on_complete)
66  : num_bytes_to_read_(num_bytes_to_read),
67  buffer_(buffer),
68  on_complete_(std::move(on_complete)) {}
69  bool IsValid() { return num_bytes_to_read_ >= 0 && buffer_ != nullptr; }
70  int GetNumBytesToRead() const { return num_bytes_to_read_; }
72  if (on_complete_ != nullptr) {
73  AppendStringToSliceBuffer(absl::exchange(buffer_, nullptr), read_data);
75  }
76  }
77 
78  private:
82  };
83 
84  // An internal helper class definition of Write operations to be performed
85  // by the TCPServerEndpoint.
87  public:
90  std::function<void(absl::Status)>&& on_complete)
92  on_complete_(std::move(on_complete)) {}
93  bool IsValid() { return bytes_to_write_.length() > 0; }
96  if (on_complete_ != nullptr) {
98  }
99  }
100 
101  private:
104  };
105 
106  void ProcessReadOperations();
107  void ProcessWriteOperations();
108 
109  mutable absl::Mutex mu_;
110  bool is_shutdown_ = false;
116 };
117 
119  public:
122  std::function<void(absl::Status)> on_shutdown,
123  std::unique_ptr<MemoryAllocatorFactory> memory_allocator_factory);
124  ~PosixOracleListener() override;
126  absl::Status Start() override;
127 
128  private:
130 
131  mutable absl::Mutex mu_;
134  std::unique_ptr<MemoryAllocatorFactory> memory_allocator_factory_;
136  int pipefd_[2];
137  bool is_started_ = false;
138  std::vector<int> listener_fds_;
139 };
140 
141 // A posix based oracle event engine.
142 class PosixOracleEventEngine final : public EventEngine {
143  public:
144  PosixOracleEventEngine() = default;
145  ~PosixOracleEventEngine() override = default;
146 
149  std::function<void(absl::Status)> on_shutdown,
150  const EndpointConfig& /*config*/,
151  std::unique_ptr<MemoryAllocatorFactory> memory_allocator_factory)
152  override {
153  return std::make_unique<PosixOracleListener>(
154  std::move(on_accept), std::move(on_shutdown),
155  std::move(memory_allocator_factory));
156  }
157 
158  ConnectionHandle Connect(OnConnectCallback on_connect,
159  const ResolvedAddress& addr,
160  const EndpointConfig& args,
161  MemoryAllocator memory_allocator,
162  EventEngine::Duration timeout) override;
163 
164  bool CancelConnect(ConnectionHandle /*handle*/) override {
165  GPR_ASSERT(false && "unimplemented");
166  }
167  bool IsWorkerThread() override { return false; };
168  std::unique_ptr<DNSResolver> GetDNSResolver(
169  const DNSResolver::ResolverOptions& /*options*/) override {
170  GPR_ASSERT(false && "unimplemented");
171  }
172  void Run(Closure* /*closure*/) override {
173  GPR_ASSERT(false && "unimplemented");
174  }
175  void Run(std::function<void()> /*closure*/) override {
176  GPR_ASSERT(false && "unimplemented");
177  }
179  Closure* /*closure*/) override {
180  GPR_ASSERT(false && "unimplemented");
181  }
183  std::function<void()> /*closure*/) override {
184  GPR_ASSERT(false && "unimplemented");
185  }
186  bool Cancel(TaskHandle /*handle*/) override {
187  GPR_ASSERT(false && "unimplemented");
188  }
189 };
190 
191 } // namespace experimental
192 } // namespace grpc_event_engine
193 
194 #endif // GRPC_TEST_CORE_EVENT_ENGINE_TEST_SUITE_ORACLE_EVENT_ENGINE_POSIX_H_
grpc_event_engine::experimental::PosixOracleEndpoint::WriteOperation::on_complete_
std::function< void(absl::Status)> on_complete_
Definition: oracle_event_engine_posix.h:103
grpc_event_engine::experimental::PosixOracleEndpoint::ReadOperation::GetNumBytesToRead
int GetNumBytesToRead() const
Definition: oracle_event_engine_posix.h:70
log.h
grpc_event_engine::experimental::EventEngine::Listener::AcceptCallback
std::function< void(std::unique_ptr< Endpoint >, MemoryAllocator memory_allocator)> AcceptCallback
Called when the listener has accepted a new client connection.
Definition: event_engine.h:232
grpc_event_engine::experimental::PosixOracleEndpoint::WriteOperation::bytes_to_write_
std::string bytes_to_write_
Definition: oracle_event_engine_posix.h:102
grpc_event_engine::experimental::MemoryAllocator
Definition: memory_allocator.h:35
grpc_event_engine::experimental::PosixOracleListener::serve_
grpc_core::Thread serve_
Definition: oracle_event_engine_posix.h:135
grpc_event_engine::experimental::PosixOracleEndpoint::Shutdown
void Shutdown()
Definition: oracle_event_engine_posix.cc:208
grpc_event_engine::experimental::PosixOracleEndpoint::ReadOperation::ReadOperation
ReadOperation()
Definition: oracle_event_engine_posix.h:62
absl::Mutex
Definition: abseil-cpp/absl/synchronization/mutex.h:131
grpc_event_engine::experimental::PosixOracleListener::Start
absl::Status Start() override
Definition: oracle_event_engine_posix.cc:299
grpc_event_engine::experimental::PosixOracleEventEngine::CreateListener
absl::StatusOr< std::unique_ptr< Listener > > CreateListener(Listener::AcceptCallback on_accept, std::function< void(absl::Status)> on_shutdown, const EndpointConfig &, std::unique_ptr< MemoryAllocatorFactory > memory_allocator_factory) override
Definition: oracle_event_engine_posix.h:147
event_engine.h
grpc_event_engine::experimental::PosixOracleEndpoint::mu_
absl::Mutex mu_
Definition: oracle_event_engine_posix.h:109
grpc_event_engine::experimental::PosixOracleEndpoint::socket_fd_
int socket_fd_
Definition: oracle_event_engine_posix.h:111
grpc_event_engine::experimental::PosixOracleEndpoint::Write
void Write(std::function< void(absl::Status)> on_writable, SliceBuffer *data, const WriteArgs *args) override
Definition: oracle_event_engine_posix.cc:239
grpc_event_engine::experimental::EventEngine
Definition: event_engine.h:74
testing::internal::string
::std::string string
Definition: bloaty/third_party/protobuf/third_party/googletest/googletest/include/gtest/internal/gtest-port.h:881
grpc_event_engine::experimental::PosixOracleEndpoint::WriteOperation::operator()
void operator()(absl::Status status)
Definition: oracle_event_engine_posix.h:95
grpc_event_engine::experimental::PosixOracleListener
Definition: oracle_event_engine_posix.h:118
grpc_event_engine::experimental::EventEngine::Duration
std::chrono::duration< int64_t, std::nano > Duration
Definition: event_engine.h:80
grpc_event_engine::experimental::PosixOracleListener::on_shutdown_
std::function< void(absl::Status)> on_shutdown_
Definition: oracle_event_engine_posix.h:133
grpc_event_engine::experimental::PosixOracleEndpoint::ABSL_GUARDED_BY
grpc_core::Thread read_ops_ ABSL_GUARDED_BY(mu_)
grpc_event_engine::experimental::PosixOracleEventEngine::Connect
ConnectionHandle Connect(OnConnectCallback on_connect, const ResolvedAddress &addr, const EndpointConfig &args, MemoryAllocator memory_allocator, EventEngine::Duration timeout) override
Definition: oracle_event_engine_posix.cc:421
grpc_event_engine::experimental::EventEngine::Endpoint
Definition: event_engine.h:141
status
absl::Status status
Definition: rls.cc:251
grpc_event_engine::experimental::PosixOracleEndpoint::ReadOperation::on_complete_
std::function< void(absl::Status)> on_complete_
Definition: oracle_event_engine_posix.h:81
grpc_event_engine::experimental::PosixOracleListener::mu_
absl::Mutex mu_
Definition: oracle_event_engine_posix.h:131
grpc_event_engine::experimental::PosixOracleListener::listener_fds_
std::vector< int > listener_fds_
Definition: oracle_event_engine_posix.h:138
grpc_event_engine::experimental::PosixOracleListener::is_started_
bool is_started_
Definition: oracle_event_engine_posix.h:137
grpc_event_engine::experimental::EventEngine::TaskHandle
Definition: event_engine.h:102
grpc_event_engine::experimental::PosixOracleEventEngine::CancelConnect
bool CancelConnect(ConnectionHandle) override
Definition: oracle_event_engine_posix.h:164
grpc_event_engine::experimental::EndpointConfig
Definition: endpoint_config.h:31
grpc_event_engine::experimental::EventEngine::ResolvedAddress
Definition: event_engine.h:118
event_engine_test_utils.h
grpc_event_engine::experimental::PosixOracleEndpoint::PosixOracleEndpoint
PosixOracleEndpoint(int socket_fd)
Definition: oracle_event_engine_posix.cc:190
grpc_event_engine::experimental::PosixOracleListener::Bind
absl::StatusOr< int > Bind(const EventEngine::ResolvedAddress &addr) override
Definition: oracle_event_engine_posix.cc:375
asyncio_get_stats.args
args
Definition: asyncio_get_stats.py:40
on_read
static grpc_closure on_read
Definition: bad_server_response_test.cc:88
absl::move
constexpr absl::remove_reference_t< T > && move(T &&t) noexcept
Definition: abseil-cpp/absl/utility/utility.h:221
GPR_ASSERT
#define GPR_ASSERT(x)
Definition: include/grpc/impl/codegen/log.h:94
grpc_event_engine::experimental::PosixOracleEndpoint::GetPeerAddress
EventEngine::ResolvedAddress & GetPeerAddress() const override
Definition: oracle_event_engine_posix.h:50
grpc_event_engine::experimental::EventEngine::Endpoint::WriteArgs
Definition: event_engine.h:181
grpc_event_engine::experimental::PosixOracleEndpoint::ReadOperation::operator()
void operator()(std::string read_data, absl::Status status)
Definition: oracle_event_engine_posix.h:71
promise.h
grpc_event_engine::experimental::PosixOracleEventEngine::RunAfter
TaskHandle RunAfter(EventEngine::Duration, std::function< void()>) override
Definition: oracle_event_engine_posix.h:182
grpc_event_engine::experimental::PosixOracleListener::pipefd_
int pipefd_[2]
Definition: oracle_event_engine_posix.h:136
grpc_event_engine::experimental::EventEngine::ConnectionHandle
Definition: event_engine.h:108
grpc_event_engine::experimental::Promise
Definition: event_engine/promise.h:31
grpc_event_engine::experimental::PosixOracleEndpoint::is_shutdown_
bool is_shutdown_
Definition: oracle_event_engine_posix.h:110
grpc_event_engine::experimental::PosixOracleEndpoint::ReadOperation
Definition: oracle_event_engine_posix.h:60
grpc_event_engine::experimental::PosixOracleEventEngine::Run
void Run(Closure *) override
Definition: oracle_event_engine_posix.h:172
grpc_event_engine::experimental::AppendStringToSliceBuffer
void AppendStringToSliceBuffer(SliceBuffer *buf, std::string data)
Definition: event_engine_test_utils.cc:60
data
char data[kBufferLength]
Definition: abseil-cpp/absl/strings/internal/str_format/float_conversion.cc:1006
grpc_event_engine::experimental::PosixOracleEndpoint::Create
static std::unique_ptr< PosixOracleEndpoint > Create(int socket_fd)
Definition: oracle_event_engine_posix.cc:219
grpc_event_engine::experimental::PosixOracleEndpoint::ReadOperation::IsValid
bool IsValid()
Definition: oracle_event_engine_posix.h:69
buffer
char buffer[1024]
Definition: libuv/docs/code/idle-compute/main.c:8
grpc_event_engine::experimental::EventEngine::Endpoint::ReadArgs
Definition: event_engine.h:150
grpc_event_engine::experimental::PosixOracleEventEngine::Cancel
bool Cancel(TaskHandle) override
Definition: oracle_event_engine_posix.h:186
grpc_event_engine::experimental::PosixOracleEndpoint
Definition: oracle_event_engine_posix.h:40
grpc_event_engine::experimental::PosixOracleEndpoint::ReadOperation::buffer_
SliceBuffer * buffer_
Definition: oracle_event_engine_posix.h:80
grpc_event_engine::experimental::EventEngine::Listener
Definition: event_engine.h:228
on_connect
void on_connect(uv_connect_t *req, int status)
Definition: libuv/docs/code/dns/main.c:32
grpc_event_engine::experimental::PosixOracleEventEngine::GetDNSResolver
std::unique_ptr< DNSResolver > GetDNSResolver(const DNSResolver::ResolverOptions &) override
Definition: oracle_event_engine_posix.h:168
grpc_event_engine::experimental::PosixOracleEndpoint::ProcessWriteOperations
void ProcessWriteOperations()
Definition: oracle_event_engine_posix.cc:266
grpc_event_engine::experimental::PosixOracleListener::HandleIncomingConnections
void HandleIncomingConnections()
Definition: oracle_event_engine_posix.cc:330
grpc_event_engine::experimental::EventEngine::OnConnectCallback
std::function< void(absl::StatusOr< std::unique_ptr< Endpoint > >)> OnConnectCallback
Definition: event_engine.h:224
grpc_event_engine::experimental::PosixOracleEndpoint::ReadOperation::ReadOperation
ReadOperation(int num_bytes_to_read, SliceBuffer *buffer, std::function< void(absl::Status)> &&on_complete)
Definition: oracle_event_engine_posix.h:64
grpc_event_engine::experimental::PosixOracleEndpoint::WriteOperation::IsValid
bool IsValid()
Definition: oracle_event_engine_posix.h:93
grpc_event_engine::experimental::EventEngine::Closure
Definition: event_engine.h:87
grpc_event_engine::experimental::PosixOracleEndpoint::ProcessReadOperations
void ProcessReadOperations()
Definition: oracle_event_engine_posix.cc:245
slice_buffer.h
grpc_event_engine::experimental::PosixOracleListener::PosixOracleListener
PosixOracleListener(EventEngine::Listener::AcceptCallback on_accept, std::function< void(absl::Status)> on_shutdown, std::unique_ptr< MemoryAllocatorFactory > memory_allocator_factory)
Definition: oracle_event_engine_posix.cc:286
grpc_event_engine::experimental::PosixOracleEndpoint::WriteOperation::WriteOperation
WriteOperation()
Definition: oracle_event_engine_posix.h:88
absl::Status
Definition: third_party/abseil-cpp/absl/status/status.h:424
grpc_event_engine::experimental::PosixOracleListener::memory_allocator_factory_
std::unique_ptr< MemoryAllocatorFactory > memory_allocator_factory_
Definition: oracle_event_engine_posix.h:134
on_accept
static void on_accept(void *arg, grpc_endpoint *endpoint, grpc_pollset *, grpc_tcp_server_acceptor *acceptor)
Definition: http_proxy_fixture.cc:567
grpc_event_engine::experimental::SliceBuffer
Definition: include/grpc/event_engine/slice_buffer.h:51
std
Definition: grpcpp/impl/codegen/async_unary_call.h:407
grpc_event_engine
Definition: endpoint_config.h:24
grpc_event_engine::experimental::PosixOracleListener::on_accept_
EventEngine::Listener::AcceptCallback on_accept_
Definition: oracle_event_engine_posix.h:132
thd.h
grpc_event_engine::experimental::PosixOracleEventEngine
Definition: oracle_event_engine_posix.h:142
grpc_event_engine::experimental::PosixOracleEventEngine::PosixOracleEventEngine
PosixOracleEventEngine()=default
grpc_core::Thread
Definition: thd.h:43
grpc_event_engine::experimental::ExtractSliceBufferIntoString
std::string ExtractSliceBufferIntoString(SliceBuffer *buf)
Definition: event_engine_test_utils.cc:64
memory_quota.h
grpc_event_engine::experimental::PosixOracleEventEngine::RunAfter
TaskHandle RunAfter(EventEngine::Duration, Closure *) override
Definition: oracle_event_engine_posix.h:178
grpc_event_engine::experimental::PosixOracleEndpoint::WriteOperation::WriteOperation
WriteOperation(SliceBuffer *buffer, std::function< void(absl::Status)> &&on_complete)
Definition: oracle_event_engine_posix.h:89
grpc_event_engine::experimental::PosixOracleEventEngine::IsWorkerThread
bool IsWorkerThread() override
Definition: oracle_event_engine_posix.h:167
grpc_event_engine::experimental::PosixOracleEndpoint::WriteOperation::GetBytesToWrite
std::string GetBytesToWrite() const
Definition: oracle_event_engine_posix.h:94
grpc_event_engine::experimental::PosixOracleEndpoint::read_ops_channel_
Promise< ReadOperation > read_ops_channel_
Definition: oracle_event_engine_posix.h:112
grpc_event_engine::experimental::PosixOracleEndpoint::write_ops_channel_
Promise< WriteOperation > write_ops_channel_
Definition: oracle_event_engine_posix.h:113
absl::StatusOr
Definition: abseil-cpp/absl/status/statusor.h:187
function
std::function< bool(GrpcTool *, int, const char **, const CliCredentials &, GrpcToolOutputCallback)> function
Definition: grpc_tool.cc:250
grpc_event_engine::experimental::PosixOracleEndpoint::ReadOperation::num_bytes_to_read_
int num_bytes_to_read_
Definition: oracle_event_engine_posix.h:79
grpc_event_engine::experimental::PosixOracleEndpoint::Read
void Read(std::function< void(absl::Status)> on_read, SliceBuffer *buffer, const ReadArgs *args) override
Definition: oracle_event_engine_posix.cc:229
grpc_event_engine::experimental::PosixOracleEndpoint::GetLocalAddress
EventEngine::ResolvedAddress & GetLocalAddress() const override
Definition: oracle_event_engine_posix.h:53
grpc_event_engine::experimental::PosixOracleEventEngine::Run
void Run(std::function< void()>) override
Definition: oracle_event_engine_posix.h:175
grpc_event_engine::experimental::PosixOracleEndpoint::WriteOperation
Definition: oracle_event_engine_posix.h:86
addr
struct sockaddr_in addr
Definition: libuv/docs/code/tcp-echo-server/main.c:10
grpc_event_engine::experimental::PosixOracleEndpoint::~PosixOracleEndpoint
~PosixOracleEndpoint() override
Definition: oracle_event_engine_posix.cc:224
timeout
uv_timer_t timeout
Definition: libuv/docs/code/uvwget/main.c:9
grpc_event_engine::experimental::PosixOracleEventEngine::~PosixOracleEventEngine
~PosixOracleEventEngine() override=default
absl::exchange
T exchange(T &obj, U &&new_value)
Definition: abseil-cpp/absl/utility/utility.h:314
grpc_event_engine::experimental::PosixOracleListener::~PosixOracleListener
~PosixOracleListener() override
Definition: oracle_event_engine_posix.cc:315


grpc
Author(s):
autogenerated on Thu Mar 13 2025 03:00:46