beta/utilities.py
Go to the documentation of this file.
1 # Copyright 2015 gRPC authors.
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 # http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14 """Utilities for the gRPC Python Beta API."""
15 
16 import threading
17 import time
18 
19 # implementations is referenced from specification in this module.
20 from grpc.beta import implementations # pylint: disable=unused-import
21 from grpc.beta import interfaces
22 from grpc.framework.foundation import callable_util
23 from grpc.framework.foundation import future
24 
25 _DONE_CALLBACK_EXCEPTION_LOG_MESSAGE = (
26  'Exception calling connectivity future "done" callback!')
27 
28 
29 class _ChannelReadyFuture(future.Future):
30 
31  def __init__(self, channel):
32  self._condition = threading.Condition()
33  self._channel = channel
34 
35  self._matured = False
36  self._cancelled = False
37  self._done_callbacks = []
38 
39  def _block(self, timeout):
40  until = None if timeout is None else time.time() + timeout
41  with self._condition:
42  while True:
43  if self._cancelled:
44  raise future.CancelledError()
45  elif self._matured:
46  return
47  else:
48  if until is None:
49  self._condition.wait()
50  else:
51  remaining = until - time.time()
52  if remaining < 0:
53  raise future.TimeoutError()
54  else:
55  self._condition.wait(timeout=remaining)
56 
57  def _update(self, connectivity):
58  with self._condition:
59  if (not self._cancelled and
60  connectivity is interfaces.ChannelConnectivity.READY):
61  self._matured = True
62  self._channel.unsubscribe(self._update)
63  self._condition.notify_all()
64  done_callbacks = tuple(self._done_callbacks)
65  self._done_callbacks = None
66  else:
67  return
68 
69  for done_callback in done_callbacks:
70  callable_util.call_logging_exceptions(
71  done_callback, _DONE_CALLBACK_EXCEPTION_LOG_MESSAGE, self)
72 
73  def cancel(self):
74  with self._condition:
75  if not self._matured:
76  self._cancelled = True
77  self._channel.unsubscribe(self._update)
78  self._condition.notify_all()
79  done_callbacks = tuple(self._done_callbacks)
80  self._done_callbacks = None
81  else:
82  return False
83 
84  for done_callback in done_callbacks:
85  callable_util.call_logging_exceptions(
86  done_callback, _DONE_CALLBACK_EXCEPTION_LOG_MESSAGE, self)
87 
88  return True
89 
90  def cancelled(self):
91  with self._condition:
92  return self._cancelled
93 
94  def running(self):
95  with self._condition:
96  return not self._cancelled and not self._matured
97 
98  def done(self):
99  with self._condition:
100  return self._cancelled or self._matured
101 
102  def result(self, timeout=None):
103  self._block(timeout)
104  return None
105 
106  def exception(self, timeout=None):
107  self._block(timeout)
108  return None
109 
110  def traceback(self, timeout=None):
111  self._block(timeout)
112  return None
113 
114  def add_done_callback(self, fn):
115  with self._condition:
116  if not self._cancelled and not self._matured:
117  self._done_callbacks.append(fn)
118  return
119 
120  fn(self)
121 
122  def start(self):
123  with self._condition:
124  self._channel.subscribe(self._update, try_to_connect=True)
125 
126  def __del__(self):
127  with self._condition:
128  if not self._cancelled and not self._matured:
129  self._channel.unsubscribe(self._update)
130 
131 
132 def channel_ready_future(channel):
133  """Creates a future.Future tracking when an implementations.Channel is ready.
134 
135  Cancelling the returned future.Future does not tell the given
136  implementations.Channel to abandon attempts it may have been making to
137  connect; cancelling merely deactivates the return future.Future's
138  subscription to the given implementations.Channel's connectivity.
139 
140  Args:
141  channel: An implementations.Channel.
142 
143  Returns:
144  A future.Future that matures when the given Channel has connectivity
145  interfaces.ChannelConnectivity.READY.
146  """
147  ready_future = _ChannelReadyFuture(channel)
148  ready_future.start()
149  return ready_future
grpc.beta.utilities._ChannelReadyFuture.result
def result(self, timeout=None)
Definition: beta/utilities.py:102
grpc.beta.utilities._ChannelReadyFuture.cancelled
def cancelled(self)
Definition: beta/utilities.py:90
grpc.framework.foundation
Definition: src/python/grpcio/grpc/framework/foundation/__init__.py:1
grpc.beta
Definition: src/python/grpcio/grpc/beta/__init__.py:1
grpc.beta.utilities._ChannelReadyFuture.done
def done(self)
Definition: beta/utilities.py:98
grpc.beta.utilities._ChannelReadyFuture.start
def start(self)
Definition: beta/utilities.py:122
grpc.beta.utilities._ChannelReadyFuture.add_done_callback
def add_done_callback(self, fn)
Definition: beta/utilities.py:114
grpc.beta.utilities._ChannelReadyFuture._done_callbacks
_done_callbacks
Definition: beta/utilities.py:37
grpc.beta.utilities._ChannelReadyFuture.__del__
def __del__(self)
Definition: beta/utilities.py:126
grpc.beta.utilities._ChannelReadyFuture._matured
_matured
Definition: beta/utilities.py:35
generate-asm-lcov.fn
fn
Definition: generate-asm-lcov.py:146
grpc.beta.utilities._ChannelReadyFuture
Definition: beta/utilities.py:29
grpc.beta.utilities._ChannelReadyFuture._block
def _block(self, timeout)
Definition: beta/utilities.py:39
grpc.beta.utilities._ChannelReadyFuture._channel
_channel
Definition: beta/utilities.py:33
grpc.beta.utilities._ChannelReadyFuture._update
def _update(self, connectivity)
Definition: beta/utilities.py:57
grpc.beta.utilities._ChannelReadyFuture._cancelled
_cancelled
Definition: beta/utilities.py:36
grpc.beta.utilities.channel_ready_future
def channel_ready_future(channel)
Definition: beta/utilities.py:132
wait
static void wait(notification *n)
Definition: alts_tsi_handshaker_test.cc:114
grpc.beta.utilities._ChannelReadyFuture.exception
def exception(self, timeout=None)
Definition: beta/utilities.py:106
grpc.beta.utilities._ChannelReadyFuture.traceback
def traceback(self, timeout=None)
Definition: beta/utilities.py:110
grpc.beta.utilities._ChannelReadyFuture._condition
_condition
Definition: beta/utilities.py:32
grpc.beta.utilities._ChannelReadyFuture.cancel
def cancel(self)
Definition: beta/utilities.py:73
grpc.beta.utilities._ChannelReadyFuture.running
def running(self)
Definition: beta/utilities.py:94
grpc.beta.utilities._ChannelReadyFuture.__init__
def __init__(self, channel)
Definition: beta/utilities.py:31


grpc
Author(s):
autogenerated on Fri May 16 2025 03:00:49