_time.py
Go to the documentation of this file.
1 # Copyright 2017 gRPC authors.
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 # http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14 """Test times."""
15 
16 import collections
17 import logging
18 import threading
19 import time as _time
20 
21 import grpc
22 import grpc_testing
23 
24 logging.basicConfig()
25 _LOGGER = logging.getLogger(__name__)
26 
27 
28 def _call(behaviors):
29  for behavior in behaviors:
30  try:
31  behavior()
32  except Exception: # pylint: disable=broad-except
33  _LOGGER.exception('Exception calling behavior "%r"!', behavior)
34 
35 
36 def _call_in_thread(behaviors):
37  calling = threading.Thread(target=_call, args=(behaviors,))
38  calling.start()
39  # NOTE(nathaniel): Because this function is called from "strict" Time
40  # implementations, it blocks until after all behaviors have terminated.
41  calling.join()
42 
43 
44 class _State(object):
45 
46  def __init__(self):
47  self.condition = threading.Condition()
48  self.times_to_behaviors = collections.defaultdict(list)
49 
50 
51 class _Delta(
52  collections.namedtuple('_Delta', (
53  'mature_behaviors',
54  'earliest_mature_time',
55  'earliest_immature_time',
56  ))):
57  pass
58 
59 
60 def _process(state, now):
61  mature_behaviors = []
62  earliest_mature_time = None
63  while state.times_to_behaviors:
64  earliest_time = min(state.times_to_behaviors)
65  if earliest_time <= now:
66  if earliest_mature_time is None:
67  earliest_mature_time = earliest_time
68  earliest_mature_behaviors = state.times_to_behaviors.pop(
69  earliest_time)
70  mature_behaviors.extend(earliest_mature_behaviors)
71  else:
72  earliest_immature_time = earliest_time
73  break
74  else:
75  earliest_immature_time = None
76  return _Delta(mature_behaviors, earliest_mature_time,
77  earliest_immature_time)
78 
79 
81 
82  def __init__(self, state, behavior, time):
83  self._state = state
84  self._behavior = behavior
85  self._time = time
86  self._cancelled = False
87 
88  def cancel(self):
89  with self._state.condition:
90  if self._cancelled:
91  return True
92  else:
93  behaviors_at_time = self._state.times_to_behaviors.get(
94  self._time)
95  if behaviors_at_time is None:
96  return False
97  else:
98  behaviors_at_time.remove(self._behavior)
99  if not behaviors_at_time:
100  self._state.times_to_behaviors.pop(self._time)
101  self._state.condition.notify_all()
102  self._cancelled = True
103  return True
104 
105  def cancelled(self):
106  with self._state.condition:
107  return self._cancelled
108 
109  def running(self):
110  raise NotImplementedError()
111 
112  def done(self):
113  raise NotImplementedError()
114 
115  def result(self, timeout=None):
116  raise NotImplementedError()
117 
118  def exception(self, timeout=None):
119  raise NotImplementedError()
120 
121  def traceback(self, timeout=None):
122  raise NotImplementedError()
123 
124  def add_done_callback(self, fn):
125  raise NotImplementedError()
126 
127 
129 
130  def __init__(self):
131  self._state = _State()
132  self._active = False
133  self._calling = None
134 
135  def _activity(self):
136  while True:
137  with self._state.condition:
138  while True:
139  now = _time.time()
140  delta = _process(self._state, now)
141  self._state.condition.notify_all()
142  if delta.mature_behaviors:
143  self._calling = delta.earliest_mature_time
144  break
145  self._calling = None
146  if delta.earliest_immature_time is None:
147  self._active = False
148  return
149  else:
150  timeout = max(0, delta.earliest_immature_time - now)
151  self._state.condition.wait(timeout=timeout)
152  _call(delta.mature_behaviors)
153 
154  def _ensure_called_through(self, time):
155  with self._state.condition:
156  while ((self._state.times_to_behaviors and
157  min(self._state.times_to_behaviors) < time) or
158  (self._calling is not None and self._calling < time)):
159  self._state.condition.wait()
160 
161  def _call_at(self, behavior, time):
162  with self._state.condition:
163  self._state.times_to_behaviors[time].append(behavior)
164  if self._active:
165  self._state.condition.notify_all()
166  else:
167  activity = threading.Thread(target=self._activity)
168  activity.start()
169  self._active = True
170  return _Future(self._state, behavior, time)
171 
172  def time(self):
173  return _time.time()
174 
175  def call_in(self, behavior, delay):
176  return self._call_at(behavior, _time.time() + delay)
177 
178  def call_at(self, behavior, time):
179  return self._call_at(behavior, time)
180 
181  def sleep_for(self, duration):
182  time = _time.time() + duration
183  _time.sleep(duration)
184  self._ensure_called_through(time)
185 
186  def sleep_until(self, time):
187  _time.sleep(max(0, time - _time.time()))
188  self._ensure_called_through(time)
189 
190 
192 
193  def __init__(self, time):
194  self._state = _State()
195  self._time = time
196 
197  def time(self):
198  return self._time
199 
200  def call_in(self, behavior, delay):
201  if delay <= 0:
202  _call_in_thread((behavior,))
203  else:
204  with self._state.condition:
205  time = self._time + delay
206  self._state.times_to_behaviors[time].append(behavior)
207  return _Future(self._state, behavior, time)
208 
209  def call_at(self, behavior, time):
210  with self._state.condition:
211  if time <= self._time:
212  _call_in_thread((behavior,))
213  else:
214  self._state.times_to_behaviors[time].append(behavior)
215  return _Future(self._state, behavior, time)
216 
217  def sleep_for(self, duration):
218  if 0 < duration:
219  with self._state.condition:
220  self._time += duration
221  delta = _process(self._state, self._time)
222  _call_in_thread(delta.mature_behaviors)
223 
224  def sleep_until(self, time):
225  with self._state.condition:
226  if self._time < time:
227  self._time = time
228  delta = _process(self._state, self._time)
229  _call_in_thread(delta.mature_behaviors)
grpc_testing._time.StrictFakeTime._state
_state
Definition: _time.py:194
grpc_testing._time.StrictFakeTime.call_in
def call_in(self, behavior, delay)
Definition: _time.py:200
grpc_testing._time.StrictRealTime._state
_state
Definition: _time.py:131
grpc_testing._time._Future._behavior
_behavior
Definition: _time.py:84
grpc_testing.Time
Definition: src/python/grpcio_testing/grpc_testing/__init__.py:567
grpc_testing._time.StrictRealTime._activity
def _activity(self)
Definition: _time.py:135
grpc_testing._time._Future._time
_time
Definition: _time.py:85
grpc_testing._time._Future.traceback
def traceback(self, timeout=None)
Definition: _time.py:121
grpc_testing._time._State
Definition: _time.py:44
grpc_testing._time.StrictFakeTime.call_at
def call_at(self, behavior, time)
Definition: _time.py:209
grpc_testing._time._process
def _process(state, now)
Definition: _time.py:60
grpc_testing._time._Future.exception
def exception(self, timeout=None)
Definition: _time.py:118
grpc_testing._time._Future.cancelled
def cancelled(self)
Definition: _time.py:105
grpc_testing._time.StrictRealTime.sleep_for
def sleep_for(self, duration)
Definition: _time.py:181
grpc_testing._time._Future.add_done_callback
def add_done_callback(self, fn)
Definition: _time.py:124
grpc_testing._time.StrictFakeTime.sleep_for
def sleep_for(self, duration)
Definition: _time.py:217
grpc_testing._time.StrictRealTime
Definition: _time.py:128
max
int max
Definition: bloaty/third_party/zlib/examples/enough.c:170
grpc_testing._time._call_in_thread
def _call_in_thread(behaviors)
Definition: _time.py:36
grpc_testing._time.StrictFakeTime._time
_time
Definition: _time.py:195
grpc_testing._time._Future.done
def done(self)
Definition: _time.py:112
grpc_testing._time._Future.cancel
def cancel(self)
Definition: _time.py:88
grpc_testing._time._Future._cancelled
_cancelled
Definition: _time.py:86
grpc_testing._time.StrictRealTime.sleep_until
def sleep_until(self, time)
Definition: _time.py:186
grpc.Future
Definition: src/python/grpcio/grpc/__init__.py:48
grpc_testing._time.StrictFakeTime.time
def time(self)
Definition: _time.py:197
min
#define min(a, b)
Definition: qsort.h:83
grpc_testing._time.StrictRealTime._ensure_called_through
def _ensure_called_through(self, time)
Definition: _time.py:154
grpc_testing._time.StrictRealTime.time
def time(self)
Definition: _time.py:172
grpc_testing._time.StrictRealTime._active
_active
Definition: _time.py:132
grpc_testing._time.StrictRealTime.call_in
def call_in(self, behavior, delay)
Definition: _time.py:175
grpc_testing._time._State.condition
condition
Definition: _time.py:47
grpc_testing._time._Future._state
_state
Definition: _time.py:83
grpc_testing._time._Future.__init__
def __init__(self, state, behavior, time)
Definition: _time.py:82
grpc_testing._time._Future.result
def result(self, timeout=None)
Definition: _time.py:115
grpc_testing._time._call
def _call(behaviors)
Definition: _time.py:28
grpc_testing._time._Future.running
def running(self)
Definition: _time.py:109
grpc_testing._time.StrictFakeTime.sleep_until
def sleep_until(self, time)
Definition: _time.py:224
grpc_testing._time.StrictRealTime.call_at
def call_at(self, behavior, time)
Definition: _time.py:178
grpc_testing._time._State.times_to_behaviors
times_to_behaviors
Definition: _time.py:48
grpc_testing._time.StrictRealTime._call_at
def _call_at(self, behavior, time)
Definition: _time.py:161
grpc_testing._time.StrictFakeTime.__init__
def __init__(self, time)
Definition: _time.py:193
grpc_testing._time._State.__init__
def __init__(self)
Definition: _time.py:46
grpc_testing._time._Delta
Definition: _time.py:56
grpc_testing._time.StrictRealTime._calling
_calling
Definition: _time.py:133
grpc_testing._time.StrictFakeTime
Definition: _time.py:191
grpc_testing._time.StrictRealTime.__init__
def __init__(self)
Definition: _time.py:130
grpc_testing._time._Future
Definition: _time.py:80


grpc
Author(s):
autogenerated on Thu Mar 13 2025 02:58:27