_fork_test.py
Go to the documentation of this file.
1 # Copyright 2018 gRPC authors.
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 # http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14 
15 import os
16 import threading
17 import unittest
18 
19 from grpc._cython import cygrpc
20 
21 
23  return cygrpc._fork_state.active_thread_count._num_active_threads
24 
25 
26 @unittest.skipIf(os.name == 'nt', 'Posix-specific tests')
27 class ForkPosixTester(unittest.TestCase):
28 
29  def setUp(self):
30  self._saved_fork_support_flag = cygrpc._GRPC_ENABLE_FORK_SUPPORT
31  cygrpc._GRPC_ENABLE_FORK_SUPPORT = True
32 
34 
35  def cb():
36  self.assertEqual(1, _get_number_active_threads())
37 
38  thread = cygrpc.ForkManagedThread(cb)
39  thread.start()
40  thread.join()
41  self.assertEqual(0, _get_number_active_threads())
42 
44 
45  def cb():
46  self.assertEqual(1, _get_number_active_threads())
47  raise Exception("expected exception")
48 
49  thread = cygrpc.ForkManagedThread(cb)
50  thread.start()
51  thread.join()
52  self.assertEqual(0, _get_number_active_threads())
53 
54  def tearDown(self):
55  cygrpc._GRPC_ENABLE_FORK_SUPPORT = self._saved_fork_support_flag
56 
57 
58 @unittest.skipUnless(os.name == 'nt', 'Windows-specific tests')
59 class ForkWindowsTester(unittest.TestCase):
60 
62 
63  def cb():
64  pass
65 
66  thread = cygrpc.ForkManagedThread(cb)
67  thread.start()
68  thread.join()
69 
70 
71 if __name__ == '__main__':
72  unittest.main(verbosity=2)
tests.unit._cython._fork_test.ForkWindowsTester
Definition: _fork_test.py:59
tests.unit._cython._fork_test.ForkPosixTester.tearDown
def tearDown(self)
Definition: _fork_test.py:54
tests.unit._cython._fork_test.ForkPosixTester.setUp
def setUp(self)
Definition: _fork_test.py:29
tests.unit._cython._fork_test.ForkPosixTester
Definition: _fork_test.py:27
tests.unit._cython._fork_test.ForkPosixTester._saved_fork_support_flag
_saved_fork_support_flag
Definition: _fork_test.py:30
tests.unit._cython._fork_test.ForkPosixTester.testForkManagedThread
def testForkManagedThread(self)
Definition: _fork_test.py:33
tests.unit._cython._fork_test._get_number_active_threads
def _get_number_active_threads()
Definition: _fork_test.py:22
tests.unit._cython._fork_test.ForkWindowsTester.testForkManagedThreadIsNoOp
def testForkManagedThreadIsNoOp(self)
Definition: _fork_test.py:61
grpc._cython
Definition: src/python/grpcio/grpc/_cython/__init__.py:1
tests.unit._cython._fork_test.ForkPosixTester.testForkManagedThreadThrowsException
def testForkManagedThreadThrowsException(self)
Definition: _fork_test.py:43
cb
OPENSSL_EXPORT pem_password_cb * cb
Definition: pem.h:351


grpc
Author(s):
autogenerated on Fri May 16 2025 02:57:38