grpc
src
python
grpcio_tests
tests
unit
thread_pool.py
Go to the documentation of this file.
1
# Copyright 2016 gRPC authors.
2
#
3
# Licensed under the Apache License, Version 2.0 (the "License");
4
# you may not use this file except in compliance with the License.
5
# You may obtain a copy of the License at
6
#
7
# http://www.apache.org/licenses/LICENSE-2.0
8
#
9
# Unless required by applicable law or agreed to in writing, software
10
# distributed under the License is distributed on an "AS IS" BASIS,
11
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
# See the License for the specific language governing permissions and
13
# limitations under the License.
14
15
from
concurrent
import
futures
16
import
threading
17
18
19
class
RecordingThreadPool
(futures.ThreadPoolExecutor):
20
"""A thread pool that records if used."""
21
22
def
__init__
(self, max_workers):
23
self.
_tp_executor
= futures.ThreadPoolExecutor(max_workers=max_workers)
24
self.
_lock
= threading.Lock()
25
self.
_was_used
=
False
26
27
def
submit
(self, fn, *args, **kwargs):
# pylint: disable=arguments-differ
28
with
self.
_lock
:
29
self.
_was_used
=
True
30
self.
_tp_executor
.
submit
(fn, *args, **kwargs)
31
32
def
was_used
(self):
33
with
self.
_lock
:
34
return
self.
_was_used
tests.unit.thread_pool.RecordingThreadPool.__init__
def __init__(self, max_workers)
Definition:
thread_pool.py:22
tests.unit.thread_pool.RecordingThreadPool._lock
_lock
Definition:
thread_pool.py:24
tests.unit.thread_pool.RecordingThreadPool.was_used
def was_used(self)
Definition:
thread_pool.py:32
tests.unit.thread_pool.RecordingThreadPool._tp_executor
_tp_executor
Definition:
thread_pool.py:23
tests.unit.thread_pool.RecordingThreadPool._was_used
_was_used
Definition:
thread_pool.py:25
tests.unit.thread_pool.RecordingThreadPool
Definition:
thread_pool.py:19
tests.unit.thread_pool.RecordingThreadPool.submit
def submit(self, fn, *args, **kwargs)
Definition:
thread_pool.py:27
grpc
Author(s):
autogenerated on Fri May 16 2025 03:00:36