event_engine_test_utils.cc
Go to the documentation of this file.
1 // Copyright 2022 gRPC authors.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
16 
17 #include <cstring>
18 #include <memory>
19 #include <string>
20 #include <utility>
21 
22 #include "absl/status/status.h"
23 #include "absl/status/statusor.h"
24 #include "absl/strings/str_cat.h"
25 #include "absl/synchronization/mutex.h"
26 #include "absl/time/time.h"
27 
32 #include <grpc/slice_buffer.h>
33 #include <grpc/support/alloc.h>
34 #include <grpc/support/log.h>
35 
40 
43 
44 namespace grpc_event_engine {
45 namespace experimental {
46 
50  if (!uri.ok()) {
51  gpr_log(GPR_ERROR, "Failed to parse. Error: %s",
52  uri.status().ToString().c_str());
53  GPR_ASSERT(uri.ok());
54  }
57  reinterpret_cast<const sockaddr*>(addr.addr), addr.len);
58 }
59 
62 }
63 
65  if (!buf->Length()) {
66  return std::string();
67  }
68  std::string tmp(buf->Length(), '\0');
69  char* bytes = const_cast<char*>(tmp.c_str());
70  grpc_slice_buffer_move_first_into_buffer(buf->c_slice_buffer(), buf->Length(),
71  bytes);
72  return tmp;
73 }
74 
76  Endpoint* receive_endpoint) {
77  GPR_ASSERT(receive_endpoint != nullptr && send_endpoint != nullptr);
78  int num_bytes_written = data.size();
79  Promise<bool> read_promise;
80  Promise<bool> write_promise;
81  SliceBuffer read_slice_buf;
82  SliceBuffer write_slice_buf;
83 
84  AppendStringToSliceBuffer(&write_slice_buf, data);
85  EventEngine::Endpoint::ReadArgs args = {num_bytes_written};
87  read_cb = [receive_endpoint, &read_slice_buf, &read_cb, &read_promise,
89  GPR_ASSERT(status.ok());
90  if (read_slice_buf.Length() == static_cast<size_t>(args.read_hint_bytes)) {
91  read_promise.Set(true);
92  return;
93  }
94  args.read_hint_bytes -= read_slice_buf.Length();
95  receive_endpoint->Read(std::move(read_cb), &read_slice_buf, &args);
96  };
97  // Start asynchronous reading at the receive_endpoint.
98  receive_endpoint->Read(std::move(read_cb), &read_slice_buf, &args);
99  // Start asynchronous writing at the send_endpoint.
100  send_endpoint->Write(
101  [&write_promise](absl::Status status) {
102  GPR_ASSERT(status.ok());
103  write_promise.Set(true);
104  },
105  &write_slice_buf, nullptr);
106  // Wait for async write to complete.
107  GPR_ASSERT(write_promise.Get() == true);
108  // Wait for async read to complete.
109  GPR_ASSERT(read_promise.Get() == true);
110  // Check if data written == data read
111  if (data != ExtractSliceBufferIntoString(&read_slice_buf)) {
112  return absl::CancelledError("Data read != Data written");
113  }
114  return absl::OkStatus();
115 }
116 
118  std::vector<std::string> addrs, bool listener_type_oracle) {
119  grpc_core::MutexLock lock(&mu_);
120  if (addrs.empty()) {
122  "Atleast one bind address must be specified");
123  }
124  for (auto& addr : addrs) {
125  if (listeners_.find(addr) != listeners_.end()) {
126  // There is already a listener at this address. Return error.
128  absl::StrCat("Listener already existis for address: ", addr));
129  }
130  }
131  Listener::AcceptCallback accept_cb =
132  [this](std::unique_ptr<Endpoint> ep,
133  MemoryAllocator /*memory_allocator*/) {
135  };
136 
137  EventEngine* event_engine = listener_type_oracle ? oracle_event_engine_.get()
138  : test_event_engine_.get();
139 
140  auto status = event_engine->CreateListener(
141  std::move(accept_cb),
142  [](absl::Status status) { GPR_ASSERT(status.ok()); },
143  ChannelArgsEndpointConfig(nullptr),
144  std::make_unique<grpc_core::MemoryQuota>("foo"));
145  if (!status.ok()) {
146  return status.status();
147  }
148 
149  std::shared_ptr<Listener> listener((*status).release());
150  for (auto& addr : addrs) {
151  auto bind_status = listener->Bind(URIToResolvedAddress(addr));
152  if (!bind_status.ok()) {
153  gpr_log(GPR_ERROR, "Binding listener failed: %s",
154  bind_status.status().ToString().c_str());
155  return bind_status.status();
156  }
157  }
158  GPR_ASSERT(listener->Start().ok());
159  // Insert same listener pointer for all bind addresses after the listener
160  // has started successfully.
161  for (auto& addr : addrs) {
162  listeners_.insert(std::make_pair(addr, listener));
163  }
164  return absl::OkStatus();
165 }
166 
167 absl::StatusOr<std::tuple<std::unique_ptr<Endpoint>, std::unique_ptr<Endpoint>>>
170  bool client_type_oracle) {
171  // Only allow one CreateConnection call to proceed at a time.
172  grpc_core::MutexLock lock(&mu_);
173  std::string conn_name =
175  EventEngine* event_engine = client_type_oracle ? oracle_event_engine_.get()
176  : test_event_engine_.get();
177  event_engine->Connect(
178  [this](absl::StatusOr<std::unique_ptr<Endpoint>> status) {
179  if (!status.ok()) {
180  gpr_log(GPR_ERROR, "Connect failed: %s",
181  status.status().ToString().c_str());
182  last_in_progress_connection_.SetClientEndpoint(nullptr);
183  } else {
184  last_in_progress_connection_.SetClientEndpoint(std::move(*status));
185  }
186  },
187  URIToResolvedAddress(target_addr), ChannelArgsEndpointConfig(nullptr),
188  memory_quota_->CreateMemoryAllocator(conn_name), timeout);
189 
190  auto client_endpoint = last_in_progress_connection_.GetClientEndpoint();
191  if (client_endpoint != nullptr &&
192  listeners_.find(target_addr) != listeners_.end()) {
193  // There is a listener for the specified address. Wait until it
194  // creates a ServerEndpoint after accepting the connection.
195  auto server_endpoint = last_in_progress_connection_.GetServerEndpoint();
196  GPR_ASSERT(server_endpoint != nullptr);
197  // Set last_in_progress_connection_ to nullptr
198  return std::make_tuple(std::move(client_endpoint),
199  std::move(server_endpoint));
200  }
201  return absl::CancelledError("Failed to create connection.");
202 }
203 
204 } // namespace experimental
205 } // namespace grpc_event_engine
absl::InvalidArgumentError
Status InvalidArgumentError(absl::string_view message)
Definition: third_party/abseil-cpp/absl/status/status.cc:351
grpc_event_engine::experimental::ConnectionManager::last_in_progress_connection_
Connection last_in_progress_connection_
Definition: event_engine_test_utils.h:117
log.h
grpc_event_engine::experimental::slice_detail::CopyConstructors< Slice >::FromCopiedString
static Slice FromCopiedString(const char *s)
Definition: include/grpc/event_engine/slice.h:159
grpc_event_engine::experimental::ConnectionManager::Connection::GetServerEndpoint
std::unique_ptr< EventEngine::Endpoint > GetServerEndpoint()
Definition: event_engine_test_utils.h:103
read_cb
static void read_cb(uv_stream_t *stream, ssize_t nread, const uv_buf_t *buf)
Definition: benchmark-pound.c:138
grpc_event_engine::experimental::EventEngine::Listener::AcceptCallback
std::function< void(std::unique_ptr< Endpoint >, MemoryAllocator memory_allocator)> AcceptCallback
Called when the listener has accepted a new client connection.
Definition: event_engine.h:232
absl::StrCat
std::string StrCat(const AlphaNum &a, const AlphaNum &b)
Definition: abseil-cpp/absl/strings/str_cat.cc:98
grpc_event_engine::experimental::MemoryAllocator
Definition: memory_allocator.h:35
std::tr1::make_tuple
tuple make_tuple()
Definition: cares/cares/test/gmock-1.8.0/gtest/gtest.h:1619
Listener
::grpc_event_engine::experimental::EventEngine::Listener Listener
Definition: event_engine_test_utils.cc:42
event_engine.h
grpc_core::MutexLock
Definition: src/core/lib/gprpp/sync.h:88
absl::CancelledError
Status CancelledError(absl::string_view message)
Definition: third_party/abseil-cpp/absl/status/status.cc:331
grpc_event_engine::experimental::URIToResolvedAddress
EventEngine::ResolvedAddress URIToResolvedAddress(std::string address_str)
Definition: event_engine_test_utils.cc:47
Listener
Definition: transport_common.h:31
buf
voidpf void * buf
Definition: bloaty/third_party/zlib/contrib/minizip/ioapi.h:136
grpc_event_engine::experimental::ConnectionManager::CreateConnection
absl::StatusOr< std::tuple< std::unique_ptr< EventEngine::Endpoint >, std::unique_ptr< EventEngine::Endpoint > > > CreateConnection(std::string target_addr, EventEngine::Duration timeout, bool client_type_oracle)
Definition: event_engine_test_utils.cc:168
channel_args_endpoint_config.h
grpc_event_engine::experimental::EventEngine
Definition: event_engine.h:74
testing::internal::string
::std::string string
Definition: bloaty/third_party/protobuf/third_party/googletest/googletest/include/gtest/internal/gtest-port.h:881
grpc_slice_buffer_move_first_into_buffer
GPRAPI void grpc_slice_buffer_move_first_into_buffer(grpc_slice_buffer *src, size_t n, void *dst)
Definition: slice/slice_buffer.cc:358
grpc_event_engine::experimental::EventEngine::Duration
std::chrono::duration< int64_t, std::nano > Duration
Definition: event_engine.h:80
grpc_event_engine::experimental::ConnectionManager::memory_quota_
std::unique_ptr< grpc_core::MemoryQuota > memory_quota_
Definition: event_engine_test_utils.h:115
grpc_resolved_address
Definition: resolved_address.h:34
absl::OkStatus
Status OkStatus()
Definition: third_party/abseil-cpp/absl/status/status.h:882
grpc_event_engine::experimental::EventEngine::Endpoint
Definition: event_engine.h:141
status
absl::Status status
Definition: rls.cc:251
grpc_event_engine::experimental::ConnectionManager::BindAndStartListener
absl::Status BindAndStartListener(std::vector< std::string > addrs, bool listener_type_oracle=true)
Definition: event_engine_test_utils.cc:117
grpc_event_engine::experimental::ConnectionManager::listeners_
std::map< std::string, std::shared_ptr< EventEngine::Listener > > listeners_
Definition: event_engine_test_utils.h:118
grpc_event_engine::experimental::ChannelArgsEndpointConfig
Definition: channel_args_endpoint_config.h:30
grpc_core::URI::Parse
static absl::StatusOr< URI > Parse(absl::string_view uri_text)
Definition: uri_parser.cc:209
grpc_event_engine::experimental::ConnectionManager::test_event_engine_
std::unique_ptr< EventEngine > test_event_engine_
Definition: event_engine_test_utils.h:119
grpc_event_engine::experimental::ConnectionManager::mu_
grpc_core::Mutex mu_
Definition: event_engine_test_utils.h:114
grpc_event_engine::experimental::EventEngine::Endpoint::Write
virtual void Write(std::function< void(absl::Status)> on_writable, SliceBuffer *data, const WriteArgs *args)=0
grpc_event_engine::experimental::EventEngine::ResolvedAddress
Definition: event_engine.h:118
grpc_event_engine::experimental::ConnectionManager::Connection::SetServerEndpoint
void SetServerEndpoint(std::unique_ptr< EventEngine::Endpoint > &&server_endpoint)
Definition: event_engine_test_utils.h:94
grpc_event_engine::experimental::ConnectionManager::num_processed_connections_
int num_processed_connections_
Definition: event_engine_test_utils.h:116
grpc_parse_uri
bool grpc_parse_uri(const grpc_core::URI &uri, grpc_resolved_address *resolved_addr)
Definition: parse_address.cc:293
event_engine_test_utils.h
parse_address.h
asyncio_get_stats.args
args
Definition: asyncio_get_stats.py:40
absl::move
constexpr absl::remove_reference_t< T > && move(T &&t) noexcept
Definition: abseil-cpp/absl/utility/utility.h:221
GPR_ASSERT
#define GPR_ASSERT(x)
Definition: include/grpc/impl/codegen/log.h:94
gpr_log
GPRAPI void gpr_log(const char *file, int line, gpr_log_severity severity, const char *format,...) GPR_PRINT_FORMAT_CHECK(4
grpc_event_engine::experimental::ConnectionManager::Connection::GetClientEndpoint
std::unique_ptr< EventEngine::Endpoint > GetClientEndpoint()
Definition: event_engine_test_utils.h:98
grpc_event_engine::experimental::Promise
Definition: event_engine/promise.h:31
slice_buffer.h
grpc_event_engine::experimental::AppendStringToSliceBuffer
void AppendStringToSliceBuffer(SliceBuffer *buf, std::string data)
Definition: event_engine_test_utils.cc:60
data
char data[kBufferLength]
Definition: abseil-cpp/absl/strings/internal/str_format/float_conversion.cc:1006
grpc_event_engine::experimental::EventEngine::Endpoint::ReadArgs
Definition: event_engine.h:150
GPR_ERROR
#define GPR_ERROR
Definition: include/grpc/impl/codegen/log.h:57
Endpoint
::grpc_event_engine::experimental::EventEngine::Endpoint Endpoint
Definition: event_engine_test_utils.cc:41
grpc_event_engine::experimental::ConnectionManager::oracle_event_engine_
std::unique_ptr< EventEngine > oracle_event_engine_
Definition: event_engine_test_utils.h:120
grpc_event_engine::experimental::SendValidatePayload
absl::Status SendValidatePayload(std::string data, Endpoint *send_endpoint, Endpoint *receive_endpoint)
Definition: event_engine_test_utils.cc:75
absl::StatusOr::ok
ABSL_MUST_USE_RESULT bool ok() const
Definition: abseil-cpp/absl/status/statusor.h:491
slice_buffer.h
bytes
uint8 bytes[10]
Definition: bloaty/third_party/protobuf/src/google/protobuf/io/coded_stream_unittest.cc:153
grpc_event_engine::experimental::Promise::Get
T & Get()
Definition: event_engine/promise.h:35
absl::Status
Definition: third_party/abseil-cpp/absl/status/status.h:424
alloc.h
grpc_event_engine::experimental::SliceBuffer
Definition: include/grpc/event_engine/slice_buffer.h:51
grpc_event_engine
Definition: endpoint_config.h:24
grpc_event_engine::experimental::Promise::Set
void Set(T &&val)
Definition: event_engine/promise.h:44
grpc_event_engine::experimental::EventEngine::Connect
virtual ConnectionHandle Connect(OnConnectCallback on_connect, const ResolvedAddress &addr, const EndpointConfig &args, MemoryAllocator memory_allocator, Duration timeout)=0
absl::Status::ok
ABSL_MUST_USE_RESULT bool ok() const
Definition: third_party/abseil-cpp/absl/status/status.h:802
grpc_event_engine::experimental::ExtractSliceBufferIntoString
std::string ExtractSliceBufferIntoString(SliceBuffer *buf)
Definition: event_engine_test_utils.cc:64
memory_quota.h
memory_allocator.h
absl::StatusOr
Definition: abseil-cpp/absl/status/statusor.h:187
uri_parser.h
autogen_x86imm.tmp
tmp
Definition: autogen_x86imm.py:12
endpoint_config.h
function
std::function< bool(GrpcTool *, int, const char **, const CliCredentials &, GrpcToolOutputCallback)> function
Definition: grpc_tool.cc:250
to_string
static bool to_string(zval *from)
Definition: protobuf/php/ext/google/protobuf/convert.c:333
addr
struct sockaddr_in addr
Definition: libuv/docs/code/tcp-echo-server/main.c:10
timeout
uv_timer_t timeout
Definition: libuv/docs/code/uvwget/main.c:9
absl::AlreadyExistsError
Status AlreadyExistsError(absl::string_view message)
Definition: third_party/abseil-cpp/absl/status/status.cc:327
absl::StatusOr::status
const Status & status() const &
Definition: abseil-cpp/absl/status/statusor.h:678
grpc_event_engine::experimental::EventEngine::Endpoint::Read
virtual void Read(std::function< void(absl::Status)> on_read, SliceBuffer *buffer, const ReadArgs *args)=0
grpc_event_engine::experimental::EventEngine::CreateListener
virtual absl::StatusOr< std::unique_ptr< Listener > > CreateListener(Listener::AcceptCallback on_accept, std::function< void(absl::Status)> on_shutdown, const EndpointConfig &config, std::unique_ptr< MemoryAllocatorFactory > memory_allocator_factory)=0


grpc
Author(s):
autogenerated on Thu Mar 13 2025 02:59:16