_utilities.py
Go to the documentation of this file.
1 # Copyright 2015 gRPC authors.
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 # http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14 """Internal utilities for gRPC Python."""
15 
16 import collections
17 import logging
18 import threading
19 import time
20 
21 import grpc
22 from grpc import _common
23 import six
24 
25 _LOGGER = logging.getLogger(__name__)
26 
27 _DONE_CALLBACK_EXCEPTION_LOG_MESSAGE = (
28  'Exception calling connectivity future "done" callback!')
29 
30 
31 class RpcMethodHandler(
32  collections.namedtuple('_RpcMethodHandler', (
33  'request_streaming',
34  'response_streaming',
35  'request_deserializer',
36  'response_serializer',
37  'unary_unary',
38  'unary_stream',
39  'stream_unary',
40  'stream_stream',
42  pass
43 
44 
46 
47  def __init__(self, service, method_handlers):
48  self._name = service
50  _common.fully_qualified_method(service, method): method_handler
51  for method, method_handler in six.iteritems(method_handlers)
52  }
53 
54  def service_name(self):
55  return self._name
56 
57  def service(self, handler_call_details):
58  return self._method_handlers.get(handler_call_details.method)
59 
60 
62 
63  def __init__(self, channel):
64  self._condition = threading.Condition()
65  self._channel = channel
66 
67  self._matured = False
68  self._cancelled = False
69  self._done_callbacks = []
70 
71  def _block(self, timeout):
72  until = None if timeout is None else time.time() + timeout
73  with self._condition:
74  while True:
75  if self._cancelled:
77  elif self._matured:
78  return
79  else:
80  if until is None:
81  self._condition.wait()
82  else:
83  remaining = until - time.time()
84  if remaining < 0:
86  else:
87  self._condition.wait(timeout=remaining)
88 
89  def _update(self, connectivity):
90  with self._condition:
91  if (not self._cancelled and
92  connectivity is grpc.ChannelConnectivity.READY):
93  self._matured = True
94  self._channel.unsubscribe(self._update)
95  self._condition.notify_all()
96  done_callbacks = tuple(self._done_callbacks)
97  self._done_callbacks = None
98  else:
99  return
100 
101  for done_callback in done_callbacks:
102  try:
103  done_callback(self)
104  except Exception: # pylint: disable=broad-except
105  _LOGGER.exception(_DONE_CALLBACK_EXCEPTION_LOG_MESSAGE)
106 
107  def cancel(self):
108  with self._condition:
109  if not self._matured:
110  self._cancelled = True
111  self._channel.unsubscribe(self._update)
112  self._condition.notify_all()
113  done_callbacks = tuple(self._done_callbacks)
114  self._done_callbacks = None
115  else:
116  return False
117 
118  for done_callback in done_callbacks:
119  try:
120  done_callback(self)
121  except Exception: # pylint: disable=broad-except
122  _LOGGER.exception(_DONE_CALLBACK_EXCEPTION_LOG_MESSAGE)
123 
124  return True
125 
126  def cancelled(self):
127  with self._condition:
128  return self._cancelled
129 
130  def running(self):
131  with self._condition:
132  return not self._cancelled and not self._matured
133 
134  def done(self):
135  with self._condition:
136  return self._cancelled or self._matured
137 
138  def result(self, timeout=None):
139  self._block(timeout)
140 
141  def exception(self, timeout=None):
142  self._block(timeout)
143 
144  def traceback(self, timeout=None):
145  self._block(timeout)
146 
147  def add_done_callback(self, fn):
148  with self._condition:
149  if not self._cancelled and not self._matured:
150  self._done_callbacks.append(fn)
151  return
152 
153  fn(self)
154 
155  def start(self):
156  with self._condition:
157  self._channel.subscribe(self._update, try_to_connect=True)
158 
159  def __del__(self):
160  with self._condition:
161  if not self._cancelled and not self._matured:
162  self._channel.unsubscribe(self._update)
163 
164 
165 def channel_ready_future(channel):
166  ready_future = _ChannelReadyFuture(channel)
167  ready_future.start()
168  return ready_future
grpc._utilities.DictionaryGenericHandler.service
def service(self, handler_call_details)
Definition: _utilities.py:57
grpc._utilities._ChannelReadyFuture._cancelled
_cancelled
Definition: _utilities.py:68
get
absl::string_view get(const Cont &c)
Definition: abseil-cpp/absl/strings/str_replace_test.cc:185
grpc._utilities._ChannelReadyFuture.traceback
def traceback(self, timeout=None)
Definition: _utilities.py:144
grpc.ServiceRpcHandler
Definition: src/python/grpcio/grpc/__init__.py:1350
grpc.FutureCancelledError
Definition: src/python/grpcio/grpc/__init__.py:44
grpc._utilities.DictionaryGenericHandler._method_handlers
_method_handlers
Definition: _utilities.py:49
grpc._utilities.channel_ready_future
def channel_ready_future(channel)
Definition: _utilities.py:165
grpc._utilities._ChannelReadyFuture._done_callbacks
_done_callbacks
Definition: _utilities.py:69
grpc._utilities._ChannelReadyFuture.running
def running(self)
Definition: _utilities.py:130
grpc._utilities._ChannelReadyFuture.result
def result(self, timeout=None)
Definition: _utilities.py:138
grpc._utilities.DictionaryGenericHandler
Definition: _utilities.py:45
grpc._utilities._ChannelReadyFuture._update
def _update(self, connectivity)
Definition: _utilities.py:89
grpc._utilities.DictionaryGenericHandler.__init__
def __init__(self, service, method_handlers)
Definition: _utilities.py:47
grpc._utilities.RpcMethodHandler
Definition: _utilities.py:41
grpc._utilities._ChannelReadyFuture.__del__
def __del__(self)
Definition: _utilities.py:159
generate-asm-lcov.fn
fn
Definition: generate-asm-lcov.py:146
grpc._utilities._ChannelReadyFuture.start
def start(self)
Definition: _utilities.py:155
grpc._utilities._ChannelReadyFuture.add_done_callback
def add_done_callback(self, fn)
Definition: _utilities.py:147
grpc._utilities._ChannelReadyFuture._channel
_channel
Definition: _utilities.py:65
grpc._utilities._ChannelReadyFuture._block
def _block(self, timeout)
Definition: _utilities.py:71
grpc._utilities._ChannelReadyFuture._matured
_matured
Definition: _utilities.py:67
grpc.Future
Definition: src/python/grpcio/grpc/__init__.py:48
grpc._utilities.DictionaryGenericHandler._name
_name
Definition: _utilities.py:48
grpc._utilities._ChannelReadyFuture._condition
_condition
Definition: _utilities.py:64
grpc._utilities._ChannelReadyFuture.cancelled
def cancelled(self)
Definition: _utilities.py:126
grpc._utilities._ChannelReadyFuture.done
def done(self)
Definition: _utilities.py:134
grpc.FutureTimeoutError
Future Interface ###############################.
Definition: src/python/grpcio/grpc/__init__.py:40
grpc._utilities._ChannelReadyFuture.cancel
def cancel(self)
Definition: _utilities.py:107
grpc._utilities._ChannelReadyFuture
Definition: _utilities.py:61
grpc._utilities._ChannelReadyFuture.exception
def exception(self, timeout=None)
Definition: _utilities.py:141
wait
static void wait(notification *n)
Definition: alts_tsi_handshaker_test.cc:114
grpc._utilities._ChannelReadyFuture.__init__
def __init__(self, channel)
Definition: _utilities.py:63
grpc._utilities.DictionaryGenericHandler.service_name
def service_name(self)
Definition: _utilities.py:54
grpc.RpcMethodHandler
Definition: src/python/grpcio/grpc/__init__.py:1288


grpc
Author(s):
autogenerated on Thu Mar 13 2025 02:58:27