fork.cc
Go to the documentation of this file.
1 /*
2  *
3  * Copyright 2017 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
20 
22 
24 #include <grpc/support/atm.h>
25 #include <grpc/support/sync.h>
26 #include <grpc/support/time.h>
27 
29 
30 /*
31  * NOTE: FORKING IS NOT GENERALLY SUPPORTED, THIS IS ONLY INTENDED TO WORK
32  * AROUND VERY SPECIFIC USE CASES.
33  */
34 
35 #ifdef GRPC_ENABLE_FORK_SUPPORT
36 #define GRPC_ENABLE_FORK_SUPPORT_DEFAULT true
37 #else
38 #define GRPC_ENABLE_FORK_SUPPORT_DEFAULT false
39 #endif // GRPC_ENABLE_FORK_SUPPORT
40 
41 GPR_GLOBAL_CONFIG_DEFINE_BOOL(grpc_enable_fork_support,
43  "Enable fork support");
44 
45 namespace grpc_core {
46 namespace internal {
47 // The exec_ctx_count has 2 modes, blocked and unblocked.
48 // When unblocked, the count is 2-indexed; exec_ctx_count=2 indicates
49 // 0 active ExecCtxs, exex_ctx_count=3 indicates 1 active ExecCtxs...
50 
51 // When blocked, the exec_ctx_count is 0-indexed. Note that ExecCtx
52 // creation can only be blocked if there is exactly 1 outstanding ExecCtx,
53 // meaning that BLOCKED and UNBLOCKED counts partition the integers
54 #define UNBLOCKED(n) ((n) + 2)
55 #define BLOCKED(n) (n)
56 
57 class ExecCtxState {
58  public:
60  gpr_mu_init(&mu_);
61  gpr_cv_init(&cv_);
63  }
64 
65  void IncExecCtxCount() {
67  while (true) {
68  if (count <= BLOCKED(1)) {
69  // This only occurs if we are trying to fork. Wait until the fork()
70  // operation completes before allowing new ExecCtxs.
71  gpr_mu_lock(&mu_);
73  while (!fork_complete_) {
75  }
76  }
78  } else if (gpr_atm_no_barrier_cas(&count_, count, count + 1)) {
79  break;
80  }
82  }
83  }
84 
86 
87  bool BlockExecCtx() {
88  // Assumes there is an active ExecCtx when this function is called
90  gpr_mu_lock(&mu_);
91  fork_complete_ = false;
93  return true;
94  }
95  return false;
96  }
97 
98  void AllowExecCtx() {
99  gpr_mu_lock(&mu_);
101  fork_complete_ = true;
103  gpr_mu_unlock(&mu_);
104  }
105 
109  }
110 
111  private:
116 };
117 
118 class ThreadState {
119  public:
121  gpr_mu_init(&mu_);
122  gpr_cv_init(&cv_);
123  }
124 
125  void IncThreadCount() {
126  gpr_mu_lock(&mu_);
127  count_++;
128  gpr_mu_unlock(&mu_);
129  }
130 
131  void DecThreadCount() {
132  gpr_mu_lock(&mu_);
133  count_--;
134  if (awaiting_threads_ && count_ == 0) {
135  threads_done_ = true;
136  gpr_cv_signal(&cv_);
137  }
138  gpr_mu_unlock(&mu_);
139  }
140  void AwaitThreads() {
141  gpr_mu_lock(&mu_);
142  awaiting_threads_ = true;
143  threads_done_ = (count_ == 0);
144  while (!threads_done_) {
146  }
147  awaiting_threads_ = true;
148  gpr_mu_unlock(&mu_);
149  }
150 
154  }
155 
156  private:
161  int count_;
162 };
163 
164 } // namespace internal
165 
167  if (!override_enabled_) {
168  support_enabled_.store(GPR_GLOBAL_CONFIG_GET(grpc_enable_fork_support),
169  std::memory_order_relaxed);
170  }
171  if (support_enabled_.load(std::memory_order_relaxed)) {
174  }
175 }
176 
178  if (support_enabled_.load(std::memory_order_relaxed)) {
179  delete exec_ctx_state_;
180  delete thread_state_;
181  }
182 }
183 
185  return support_enabled_.load(std::memory_order_relaxed);
186 }
187 
188 // Testing Only
189 void Fork::Enable(bool enable) {
190  override_enabled_ = true;
191  support_enabled_.store(enable, std::memory_order_relaxed);
192 }
193 
195 
197 
199  Fork::child_postfork_func reset_child_polling_engine) {
200  reset_child_polling_engine_ = reset_child_polling_engine;
201 }
204 }
205 
207  if (support_enabled_.load(std::memory_order_relaxed)) {
208  return exec_ctx_state_->BlockExecCtx();
209  }
210  return false;
211 }
212 
214  if (support_enabled_.load(std::memory_order_relaxed)) {
216  }
217 }
218 
220  if (support_enabled_.load(std::memory_order_relaxed)) {
222  }
223 }
224 
226  if (support_enabled_.load(std::memory_order_relaxed)) {
228  }
229 }
231  if (support_enabled_.load(std::memory_order_relaxed)) {
233  }
234 }
235 
238 std::atomic<bool> Fork::support_enabled_(false);
239 bool Fork::override_enabled_ = false;
241 } // namespace grpc_core
gpr_cv_signal
GPRAPI void gpr_cv_signal(gpr_cv *cv)
global_config_env.h
grpc_core::Fork::GetResetChildPollingEngineFunc
static child_postfork_func GetResetChildPollingEngineFunc()
Definition: fork.cc:202
fork.h
grpc_core::Fork::IncThreadCount
static void IncThreadCount()
Definition: fork.cc:219
gpr_mu_unlock
GPRAPI void gpr_mu_unlock(gpr_mu *mu)
grpc_core::Fork::reset_child_polling_engine_
static child_postfork_func reset_child_polling_engine_
Definition: src/core/lib/gprpp/fork.h:98
gpr_atm_no_barrier_load
#define gpr_atm_no_barrier_load(p)
Definition: impl/codegen/atm_gcc_atomic.h:53
gpr_atm_no_barrier_store
#define gpr_atm_no_barrier_store(p, value)
Definition: impl/codegen/atm_gcc_atomic.h:56
grpc_core::internal::ExecCtxState::~ExecCtxState
~ExecCtxState()
Definition: fork.cc:106
grpc_core::internal::ThreadState::IncThreadCount
void IncThreadCount()
Definition: fork.cc:125
grpc_core::internal::ThreadState::mu_
gpr_mu mu_
Definition: fork.cc:159
grpc_core::internal::ThreadState::awaiting_threads_
bool awaiting_threads_
Definition: fork.cc:157
GPR_GLOBAL_CONFIG_GET
#define GPR_GLOBAL_CONFIG_GET(name)
Definition: global_config_generic.h:24
false
#define false
Definition: setup_once.h:323
grpc_core::Fork::DoDecExecCtxCount
static void DoDecExecCtxCount()
Definition: fork.cc:196
grpc_core
Definition: call_metric_recorder.h:31
grpc_core::internal::ThreadState::ThreadState
ThreadState()
Definition: fork.cc:120
gpr_cv
pthread_cond_t gpr_cv
Definition: impl/codegen/sync_posix.h:48
grpc_core::Fork::exec_ctx_state_
static internal::ExecCtxState * exec_ctx_state_
Definition: src/core/lib/gprpp/fork.h:94
grpc_core::internal::ExecCtxState::cv_
gpr_cv cv_
Definition: fork.cc:114
UNBLOCKED
#define UNBLOCKED(n)
Definition: fork.cc:54
grpc_core::Fork::AllowExecCtx
static void AllowExecCtx()
Definition: fork.cc:213
grpc_core::Fork::DoIncExecCtxCount
static void DoIncExecCtxCount()
Definition: fork.cc:194
gpr_inf_future
GPRAPI gpr_timespec gpr_inf_future(gpr_clock_type type)
Definition: src/core/lib/gpr/time.cc:55
grpc_core::internal::ExecCtxState::IncExecCtxCount
void IncExecCtxCount()
Definition: fork.cc:65
time.h
grpc_core::internal::ExecCtxState
Definition: fork.cc:57
grpc_core::internal::ThreadState::count_
int count_
Definition: fork.cc:161
grpc_core::Fork::Enable
static void Enable(bool enable)
Definition: fork.cc:189
grpc_core::internal::ExecCtxState::count_
gpr_atm count_
Definition: fork.cc:115
true
#define true
Definition: setup_once.h:324
grpc_core::Fork::support_enabled_
static std::atomic< bool > support_enabled_
Definition: src/core/lib/gprpp/fork.h:96
gpr_mu_destroy
GPRAPI void gpr_mu_destroy(gpr_mu *mu)
grpc_core::internal::ThreadState
Definition: fork.cc:118
BLOCKED
#define BLOCKED(n)
Definition: fork.cc:55
grpc_core::internal::ExecCtxState::AllowExecCtx
void AllowExecCtx()
Definition: fork.cc:98
grpc_core::internal::ThreadState::DecThreadCount
void DecThreadCount()
Definition: fork.cc:131
gpr_cv_destroy
GPRAPI void gpr_cv_destroy(gpr_cv *cv)
grpc_core::Fork::AwaitThreads
static void AwaitThreads()
Definition: fork.cc:230
gpr_mu_init
GPRAPI void gpr_mu_init(gpr_mu *mu)
gpr_cv_wait
GPRAPI int gpr_cv_wait(gpr_cv *cv, gpr_mu *mu, gpr_timespec abs_deadline)
gpr_atm_no_barrier_fetch_add
#define gpr_atm_no_barrier_fetch_add(p, delta)
Definition: impl/codegen/atm_gcc_atomic.h:59
GPR_GLOBAL_CONFIG_DEFINE_BOOL
GPR_GLOBAL_CONFIG_DEFINE_BOOL(grpc_enable_fork_support, GRPC_ENABLE_FORK_SUPPORT_DEFAULT, "Enable fork support")
gpr_mu_lock
GPRAPI void gpr_mu_lock(gpr_mu *mu)
grpc_core::Fork::DecThreadCount
static void DecThreadCount()
Definition: fork.cc:225
gpr_atm_no_barrier_cas
#define gpr_atm_no_barrier_cas(p, o, n)
Definition: impl/codegen/atm_gcc_sync.h:74
grpc_core::Fork::child_postfork_func
void(* child_postfork_func)(void)
Definition: src/core/lib/gprpp/fork.h:40
grpc_core::internal::ThreadState::cv_
gpr_cv cv_
Definition: fork.cc:160
grpc_core::internal::ExecCtxState::ExecCtxState
ExecCtxState()
Definition: fork.cc:59
gpr_types.h
grpc_core::internal::ExecCtxState::DecExecCtxCount
void DecExecCtxCount()
Definition: fork.cc:85
grpc_core::internal::ThreadState::threads_done_
bool threads_done_
Definition: fork.cc:158
gpr_atm
intptr_t gpr_atm
Definition: impl/codegen/atm_gcc_atomic.h:32
grpc_core::Fork::override_enabled_
static bool override_enabled_
Definition: src/core/lib/gprpp/fork.h:97
gpr_mu
pthread_mutex_t gpr_mu
Definition: impl/codegen/sync_posix.h:47
count
int * count
Definition: bloaty/third_party/googletest/googlemock/test/gmock_stress_test.cc:96
grpc_core::Fork::GlobalShutdown
static void GlobalShutdown()
Definition: fork.cc:177
grpc_core::Fork::thread_state_
static internal::ThreadState * thread_state_
Definition: src/core/lib/gprpp/fork.h:95
grpc_core::internal::ThreadState::AwaitThreads
void AwaitThreads()
Definition: fork.cc:140
grpc_core::Fork::SetResetChildPollingEngineFunc
static void SetResetChildPollingEngineFunc(child_postfork_func reset_child_polling_engine)
Definition: fork.cc:198
gpr_cv_broadcast
GPRAPI void gpr_cv_broadcast(gpr_cv *cv)
grpc_core::internal::ExecCtxState::fork_complete_
bool fork_complete_
Definition: fork.cc:112
internal
Definition: benchmark/test/output_test_helper.cc:20
grpc_core::Fork::BlockExecCtx
static bool BlockExecCtx()
Definition: fork.cc:206
atm.h
GRPC_ENABLE_FORK_SUPPORT_DEFAULT
#define GRPC_ENABLE_FORK_SUPPORT_DEFAULT
Definition: fork.cc:38
GPR_CLOCK_REALTIME
@ GPR_CLOCK_REALTIME
Definition: gpr_types.h:39
grpc_core::internal::ExecCtxState::BlockExecCtx
bool BlockExecCtx()
Definition: fork.cc:87
sync.h
grpc_core::internal::ThreadState::~ThreadState
~ThreadState()
Definition: fork.cc:151
grpc_core::Fork::GlobalInit
static void GlobalInit()
Definition: fork.cc:166
gpr_cv_init
GPRAPI void gpr_cv_init(gpr_cv *cv)
grpc_core::Fork::Enabled
static bool Enabled()
Definition: fork.cc:184
grpc_core::internal::ExecCtxState::mu_
gpr_mu mu_
Definition: fork.cc:113
port_platform.h


grpc
Author(s):
autogenerated on Thu Mar 13 2025 02:59:22