iomgr_engine.cc
Go to the documentation of this file.
1 // Copyright 2022 The gRPC Authors
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
15 
17 
18 #include <algorithm>
19 #include <string>
20 #include <utility>
21 
22 #include "absl/container/flat_hash_set.h"
23 #include "absl/strings/str_cat.h"
24 
26 #include <grpc/support/log.h>
27 
32 
33 namespace grpc_event_engine {
34 namespace experimental {
35 
36 namespace {
37 
38 std::string HandleToString(EventEngine::TaskHandle handle) {
39  return absl::StrCat("{", handle.keys[0], ",", handle.keys[1], "}");
40 }
41 
42 } // namespace
43 
45  return timer_manager_.Now() +
49 }
50 
52  std::function<void()> cb;
56 
57  void Run() override {
58  GRPC_EVENT_ENGINE_TRACE("IomgrEventEngine:%p executing callback:%s", engine,
59  HandleToString(handle).c_str());
60  {
62  engine->known_handles_.erase(handle);
63  }
64  cb();
65  delete this;
66  }
67 };
68 
70 
74  for (auto handle : known_handles_) {
76  "(event_engine) IomgrEventEngine:%p uncleared TaskHandle at "
77  "shutdown:%s",
78  this, HandleToString(handle).c_str());
79  }
80  }
81  GPR_ASSERT(GPR_LIKELY(known_handles_.empty()));
82 }
83 
86  if (!known_handles_.contains(handle)) return false;
87  auto* cd = reinterpret_cast<ClosureData*>(handle.keys[0]);
88  bool r = timer_manager_.TimerCancel(&cd->timer);
89  known_handles_.erase(handle);
90  if (r) delete cd;
91  return r;
92 }
93 
95  Duration when, std::function<void()> closure) {
96  return RunAfterInternal(when, std::move(closure));
97 }
98 
101  return RunAfterInternal(when, [closure]() { closure->Run(); });
102 }
103 
106 }
107 
109  thread_pool_.Add([closure]() { closure->Run(); });
110 }
111 
113  Duration when, std::function<void()> cb) {
114  auto when_ts = ToTimestamp(when);
115  auto* cd = new ClosureData;
116  cd->cb = std::move(cb);
117  cd->engine = this;
118  EventEngine::TaskHandle handle{reinterpret_cast<intptr_t>(cd),
119  aba_token_.fetch_add(1)};
120  grpc_core::MutexLock lock(&mu_);
121  known_handles_.insert(handle);
122  cd->handle = handle;
123  GRPC_EVENT_ENGINE_TRACE("IomgrEventEngine:%p scheduling callback:%s", this,
124  HandleToString(handle).c_str());
125  timer_manager_.TimerInit(&cd->timer, when_ts, cd);
126  return handle;
127 }
128 
129 std::unique_ptr<EventEngine::DNSResolver> IomgrEventEngine::GetDNSResolver(
130  EventEngine::DNSResolver::ResolverOptions const& /*options*/) {
131  GPR_ASSERT(false && "unimplemented");
132 }
133 
135  GPR_ASSERT(false && "unimplemented");
136 }
137 
139  GPR_ASSERT(false && "unimplemented");
140 }
141 
143  OnConnectCallback /*on_connect*/, const ResolvedAddress& /*addr*/,
144  const EndpointConfig& /*args*/, MemoryAllocator /*memory_allocator*/,
145  Duration /*deadline*/) {
146  GPR_ASSERT(false && "unimplemented");
147 }
148 
151  Listener::AcceptCallback /*on_accept*/,
152  std::function<void(absl::Status)> /*on_shutdown*/,
153  const EndpointConfig& /*config*/,
154  std::unique_ptr<MemoryAllocatorFactory> /*memory_allocator_factory*/) {
155  GPR_ASSERT(false && "unimplemented");
156 }
157 
158 } // namespace experimental
159 } // namespace grpc_event_engine
trace.h
grpc_event_engine::experimental::IomgrEventEngine::~IomgrEventEngine
~IomgrEventEngine() override
Definition: iomgr_engine.cc:71
grpc_event_engine::experimental::IomgrEventEngine::IsWorkerThread
bool IsWorkerThread() override
Definition: iomgr_engine.cc:134
log.h
grpc_event_engine::iomgr_engine::TimerManager::Now
grpc_core::Timestamp Now()
Definition: event_engine/iomgr_engine/timer_manager.h:52
grpc_event_engine::experimental::IomgrEventEngine::ClosureData::timer
iomgr_engine::Timer timer
Definition: iomgr_engine.cc:53
grpc_event_engine::experimental::IomgrEventEngine::aba_token_
std::atomic< intptr_t > aba_token_
Definition: iomgr_engine.h:116
grpc_event_engine::experimental::EventEngine::Listener::AcceptCallback
std::function< void(std::unique_ptr< Endpoint >, MemoryAllocator memory_allocator)> AcceptCallback
Called when the listener has accepted a new client connection.
Definition: event_engine.h:232
absl::StrCat
std::string StrCat(const AlphaNum &a, const AlphaNum &b)
Definition: abseil-cpp/absl/strings/str_cat.cc:98
iomgr_engine.h
grpc_event_engine::experimental::MemoryAllocator
Definition: memory_allocator.h:35
grpc_event_engine::iomgr_engine::ThreadPool::Add
void Add(const std::function< void()> &callback)
Definition: thread_pool.cc:112
grpc_event_engine::experimental::IomgrEventEngine::IomgrEventEngine
IomgrEventEngine()
Definition: iomgr_engine.cc:69
grpc_event_engine::experimental::IomgrEventEngine::ClosureData::handle
EventEngine::TaskHandle handle
Definition: iomgr_engine.cc:55
event_engine.h
grpc_core::MutexLock
Definition: src/core/lib/gprpp/sync.h:88
grpc_event_engine::experimental::IomgrEventEngine::RunAfter
TaskHandle RunAfter(Duration when, Closure *closure) override
Definition: iomgr_engine.cc:99
GPR_LIKELY
#define GPR_LIKELY(x)
Definition: impl/codegen/port_platform.h:769
grpc_core::Timestamp
Definition: src/core/lib/gprpp/time.h:62
grpc_event_engine::iomgr_engine::TimerManager::TimerInit
void TimerInit(Timer *timer, grpc_core::Timestamp deadline, experimental::EventEngine::Closure *closure)
Definition: event_engine/iomgr_engine/timer_manager.cc:220
grpc_event_engine::experimental::IomgrEventEngine::CreateListener
absl::StatusOr< std::unique_ptr< Listener > > CreateListener(Listener::AcceptCallback on_accept, std::function< void(absl::Status)> on_shutdown, const EndpointConfig &config, std::unique_ptr< MemoryAllocatorFactory > memory_allocator_factory) override
Definition: iomgr_engine.cc:150
testing::internal::string
::std::string string
Definition: bloaty/third_party/protobuf/third_party/googletest/googletest/include/gtest/internal/gtest-port.h:881
grpc_event_engine::experimental::EventEngine::Duration
std::chrono::duration< int64_t, std::nano > Duration
Definition: event_engine.h:80
grpc_event_engine::experimental::IomgrEventEngine::GetDNSResolver
std::unique_ptr< DNSResolver > GetDNSResolver(const DNSResolver::ResolverOptions &options) override
Definition: iomgr_engine.cc:129
grpc_event_engine::experimental::EventEngine::TaskHandle
Definition: event_engine.h:102
GRPC_TRACE_FLAG_ENABLED
#define GRPC_TRACE_FLAG_ENABLED(f)
Definition: debug/trace.h:114
grpc_event_engine::experimental::IomgrEventEngine::ToTimestamp
grpc_core::Timestamp ToTimestamp(EventEngine::Duration when)
Definition: iomgr_engine.cc:44
grpc_event_engine::experimental::EndpointConfig
Definition: endpoint_config.h:31
grpc_event_engine::experimental::EventEngine::ResolvedAddress
Definition: event_engine.h:118
absl::move
constexpr absl::remove_reference_t< T > && move(T &&t) noexcept
Definition: abseil-cpp/absl/utility/utility.h:221
GPR_ASSERT
#define GPR_ASSERT(x)
Definition: include/grpc/impl/codegen/log.h:94
gen_stats_data.c_str
def c_str(s, encoding='ascii')
Definition: gen_stats_data.py:38
max
int max
Definition: bloaty/third_party/zlib/examples/enough.c:170
grpc_event_engine::experimental::IomgrEventEngine::ClosureData::Run
void Run() override
Definition: iomgr_engine.cc:57
grpc_event_engine::experimental::IomgrEventEngine::timer_manager_
iomgr_engine::TimerManager timer_manager_
Definition: iomgr_engine.h:111
gpr_log
GPRAPI void gpr_log(const char *file, int line, gpr_log_severity severity, const char *format,...) GPR_PRINT_FORMAT_CHECK(4
grpc_event_engine::experimental::EventEngine::ConnectionHandle
Definition: event_engine.h:108
time.h
intptr_t
_W64 signed int intptr_t
Definition: stdint-msvc2008.h:118
grpc_event_engine::experimental::IomgrEventEngine::Cancel
bool Cancel(TaskHandle handle) override
Definition: iomgr_engine.cc:84
GPR_ERROR
#define GPR_ERROR
Definition: include/grpc/impl/codegen/log.h:57
grpc_event_engine_trace
grpc_core::TraceFlag grpc_event_engine_trace(false, "event_engine")
grpc_event_engine::iomgr_engine::Timer
Definition: event_engine/iomgr_engine/timer.h:44
grpc_event_engine::experimental::IomgrEventEngine::RunAfterInternal
EventEngine::TaskHandle RunAfterInternal(Duration when, std::function< void()> cb)
Definition: iomgr_engine.cc:112
grpc_event_engine::experimental::IomgrEventEngine::Connect
ConnectionHandle Connect(OnConnectCallback on_connect, const ResolvedAddress &addr, const EndpointConfig &args, MemoryAllocator memory_allocator, Duration timeout) override
Definition: iomgr_engine.cc:142
grpc_event_engine::experimental::EventEngine::OnConnectCallback
std::function< void(absl::StatusOr< std::unique_ptr< Endpoint > >)> OnConnectCallback
Definition: event_engine.h:224
grpc_core::Duration::Milliseconds
static constexpr Duration Milliseconds(int64_t millis)
Definition: src/core/lib/gprpp/time.h:155
grpc_event_engine::experimental::EventEngine::Closure
Definition: event_engine.h:87
grpc_event_engine::experimental::IomgrEventEngine::ClosureData::cb
std::function< void()> cb
Definition: iomgr_engine.cc:52
grpc_event_engine::experimental::IomgrEventEngine::thread_pool_
iomgr_engine::ThreadPool thread_pool_
Definition: iomgr_engine.h:112
absl::Status
Definition: third_party/abseil-cpp/absl/status/status.h:424
fix_build_deps.r
r
Definition: fix_build_deps.py:491
grpc_event_engine
Definition: endpoint_config.h:24
closure
Definition: proxy.cc:59
grpc_core::Duration::NanosecondsRoundUp
static constexpr Duration NanosecondsRoundUp(int64_t nanos)
Definition: src/core/lib/gprpp/time.h:171
grpc_event_engine::experimental::IomgrEventEngine::mu_
grpc_core::Mutex mu_
Definition: iomgr_engine.h:114
handle
static csh handle
Definition: test_arm_regression.c:16
Duration
Definition: bloaty/third_party/protobuf/src/google/protobuf/duration.pb.h:69
trace.h
grpc_event_engine::experimental::IomgrEventEngine
Definition: iomgr_engine.h:45
absl::StatusOr
Definition: abseil-cpp/absl/status/statusor.h:187
GRPC_EVENT_ENGINE_TRACE
#define GRPC_EVENT_ENGINE_TRACE(format,...)
Definition: event_engine/trace.h:25
function
std::function< bool(GrpcTool *, int, const char **, const CliCredentials &, GrpcToolOutputCallback)> function
Definition: grpc_tool.cc:250
grpc_event_engine::experimental::IomgrEventEngine::ClosureData::engine
IomgrEventEngine * engine
Definition: iomgr_engine.cc:54
grpc_event_engine::experimental::IomgrEventEngine::ClosureData
Definition: iomgr_engine.cc:51
grpc_event_engine::experimental::EventEngine::DNSResolver::ResolverOptions
Optional configuration for DNSResolvers.
Definition: event_engine.h:296
grpc_event_engine::iomgr_engine::TimerManager::TimerCancel
bool TimerCancel(Timer *timer)
Definition: event_engine/iomgr_engine/timer_manager.cc:225
cb
OPENSSL_EXPORT pem_password_cb * cb
Definition: pem.h:351
grpc_event_engine::experimental::IomgrEventEngine::Run
void Run(Closure *closure) override
Definition: iomgr_engine.cc:108
timer.h
grpc_event_engine::experimental::IomgrEventEngine::CancelConnect
bool CancelConnect(ConnectionHandle handle) override
Definition: iomgr_engine.cc:138
port_platform.h


grpc
Author(s):
autogenerated on Fri May 16 2025 02:59:08