_server_wait_for_termination_test.py
Go to the documentation of this file.
1 # Copyright 2019 The gRPC Authors
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 # http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14 
15 from __future__ import division
16 
17 from concurrent import futures
18 import datetime
19 import threading
20 import time
21 import unittest
22 
23 import grpc
24 import six
25 
26 from tests.unit.framework.common import test_constants
27 
28 _WAIT_FOR_BLOCKING = datetime.timedelta(seconds=1)
29 
30 
31 def _block_on_waiting(server, termination_event, timeout=None):
32  server.start()
33  server.wait_for_termination(timeout=timeout)
34  termination_event.set()
35 
36 
37 class ServerWaitForTerminationTest(unittest.TestCase):
38 
40  termination_event = threading.Event()
41  server = grpc.server(futures.ThreadPoolExecutor())
42 
43  wait_thread = threading.Thread(target=_block_on_waiting,
44  args=(
45  server,
46  termination_event,
47  ))
48  wait_thread.daemon = True
49  wait_thread.start()
50  time.sleep(_WAIT_FOR_BLOCKING.total_seconds())
51 
52  server.stop(None)
53  termination_event.wait(timeout=test_constants.SHORT_TIMEOUT)
54  self.assertTrue(termination_event.is_set())
55 
57  termination_event = threading.Event()
58  server = grpc.server(futures.ThreadPoolExecutor())
59 
60  wait_thread = threading.Thread(target=_block_on_waiting,
61  args=(
62  server,
63  termination_event,
64  ))
65  wait_thread.daemon = True
66  wait_thread.start()
67  time.sleep(_WAIT_FOR_BLOCKING.total_seconds())
68 
69  # Invoke manually here, in Python 2 it will be invoked by GC sometime.
70  server.__del__()
71  termination_event.wait(timeout=test_constants.SHORT_TIMEOUT)
72  self.assertTrue(termination_event.is_set())
73 
75  termination_event = threading.Event()
76  server = grpc.server(futures.ThreadPoolExecutor())
77 
78  wait_thread = threading.Thread(target=_block_on_waiting,
79  args=(
80  server,
81  termination_event,
82  test_constants.SHORT_TIMEOUT / 2,
83  ))
84  wait_thread.daemon = True
85  wait_thread.start()
86 
87  termination_event.wait(timeout=test_constants.SHORT_TIMEOUT)
88  self.assertTrue(termination_event.is_set())
89 
90 
91 if __name__ == '__main__':
92  unittest.main(verbosity=2)
tests.unit._server_wait_for_termination_test.ServerWaitForTerminationTest.test_unblock_by_invoking_stop
def test_unblock_by_invoking_stop(self)
Definition: _server_wait_for_termination_test.py:39
tests.unit._server_wait_for_termination_test.ServerWaitForTerminationTest.test_unblock_by_del
def test_unblock_by_del(self)
Definition: _server_wait_for_termination_test.py:56
tests.unit._server_wait_for_termination_test._block_on_waiting
def _block_on_waiting(server, termination_event, timeout=None)
Definition: _server_wait_for_termination_test.py:31
grpc.server
def server(thread_pool, handlers=None, interceptors=None, options=None, maximum_concurrent_rpcs=None, compression=None, xds=False)
Definition: src/python/grpcio/grpc/__init__.py:2034
tests.unit._server_wait_for_termination_test.ServerWaitForTerminationTest.test_unblock_by_timeout
def test_unblock_by_timeout(self)
Definition: _server_wait_for_termination_test.py:74
tests.unit.framework.common
Definition: src/python/grpcio_tests/tests/unit/framework/common/__init__.py:1
tests.unit._server_wait_for_termination_test.ServerWaitForTerminationTest
Definition: _server_wait_for_termination_test.py:37


grpc
Author(s):
autogenerated on Thu Mar 13 2025 02:58:27