stream_util.py
Go to the documentation of this file.
1 # Copyright 2015 gRPC authors.
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 # http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14 """Helpful utilities related to the stream module."""
15 
16 import logging
17 import threading
18 
19 from grpc.framework.foundation import stream
20 
21 _NO_VALUE = object()
22 _LOGGER = logging.getLogger(__name__)
23 
24 
25 class TransformingConsumer(stream.Consumer):
26  """A stream.Consumer that passes a transformation of its input to another."""
27 
28  def __init__(self, transformation, downstream):
29  self._transformation = transformation
30  self._downstream = downstream
31 
32  def consume(self, value):
33  self._downstream.consume(self._transformation(value))
34 
35  def terminate(self):
36  self._downstream.terminate()
37 
38  def consume_and_terminate(self, value):
40 
41 
42 class IterableConsumer(stream.Consumer):
43  """A Consumer that when iterated over emits the values it has consumed."""
44 
45  def __init__(self):
46  self._condition = threading.Condition()
47  self._values = []
48  self._active = True
49 
50  def consume(self, value):
51  with self._condition:
52  if self._active:
53  self._values.append(value)
54  self._condition.notify()
55 
56  def terminate(self):
57  with self._condition:
58  self._active = False
59  self._condition.notify()
60 
61  def consume_and_terminate(self, value):
62  with self._condition:
63  if self._active:
64  self._values.append(value)
65  self._active = False
66  self._condition.notify()
67 
68  def __iter__(self):
69  return self
70 
71  def __next__(self):
72  return self.next()
73 
74  def next(self):
75  with self._condition:
76  while self._active and not self._values:
77  self._condition.wait()
78  if self._values:
79  return self._values.pop(0)
80  else:
81  raise StopIteration()
82 
83 
84 class ThreadSwitchingConsumer(stream.Consumer):
85  """A Consumer decorator that affords serialization and asynchrony."""
86 
87  def __init__(self, sink, pool):
88  self._lock = threading.Lock()
89  self._sink = sink
90  self._pool = pool
91  # True if self._spin has been submitted to the pool to be called once and
92  # that call has not yet returned, False otherwise.
93  self._spinning = False
94  self._values = []
95  self._active = True
96 
97  def _spin(self, sink, value, terminate):
98  while True:
99  try:
100  if value is _NO_VALUE:
101  sink.terminate()
102  elif terminate:
103  sink.consume_and_terminate(value)
104  else:
105  sink.consume(value)
106  except Exception as e: # pylint:disable=broad-except
107  _LOGGER.exception(e)
108 
109  with self._lock:
110  if terminate:
111  self._spinning = False
112  return
113  elif self._values:
114  value = self._values.pop(0)
115  terminate = not self._values and not self._active
116  elif not self._active:
117  value = _NO_VALUE
118  terminate = True
119  else:
120  self._spinning = False
121  return
122 
123  def consume(self, value):
124  with self._lock:
125  if self._active:
126  if self._spinning:
127  self._values.append(value)
128  else:
129  self._pool.submit(self._spin, self._sink, value, False)
130  self._spinning = True
131 
132  def terminate(self):
133  with self._lock:
134  if self._active:
135  self._active = False
136  if not self._spinning:
137  self._pool.submit(self._spin, self._sink, _NO_VALUE, True)
138  self._spinning = True
139 
140  def consume_and_terminate(self, value):
141  with self._lock:
142  if self._active:
143  self._active = False
144  if self._spinning:
145  self._values.append(value)
146  else:
147  self._pool.submit(self._spin, self._sink, value, True)
148  self._spinning = True
grpc.framework.foundation.stream_util.ThreadSwitchingConsumer._pool
_pool
Definition: stream_util.py:90
grpc.framework.foundation
Definition: src/python/grpcio/grpc/framework/foundation/__init__.py:1
grpc.framework.foundation.stream_util.ThreadSwitchingConsumer.consume_and_terminate
def consume_and_terminate(self, value)
Definition: stream_util.py:140
grpc.framework.foundation.stream_util.ThreadSwitchingConsumer._sink
_sink
Definition: stream_util.py:89
grpc.framework.foundation.stream_util.IterableConsumer.consume
def consume(self, value)
Definition: stream_util.py:50
grpc.framework.foundation.stream_util.TransformingConsumer
Definition: stream_util.py:25
grpc.framework.foundation.stream_util.IterableConsumer
Definition: stream_util.py:42
grpc.framework.foundation.stream_util.TransformingConsumer.consume_and_terminate
def consume_and_terminate(self, value)
Definition: stream_util.py:38
grpc.framework.foundation.stream_util.ThreadSwitchingConsumer.consume
def consume(self, value)
Definition: stream_util.py:123
grpc.framework.foundation.stream_util.ThreadSwitchingConsumer._lock
_lock
Definition: stream_util.py:88
grpc.framework.foundation.stream_util.TransformingConsumer.__init__
def __init__(self, transformation, downstream)
Definition: stream_util.py:28
grpc.framework.foundation.stream_util.ThreadSwitchingConsumer
Definition: stream_util.py:84
grpc.framework.foundation.stream_util.TransformingConsumer._downstream
_downstream
Definition: stream_util.py:30
grpc.framework.foundation.stream_util.ThreadSwitchingConsumer.__init__
def __init__(self, sink, pool)
Definition: stream_util.py:87
grpc.framework.foundation.stream_util.TransformingConsumer._transformation
_transformation
Definition: stream_util.py:29
grpc.framework.foundation.stream_util.IterableConsumer.__iter__
def __iter__(self)
Definition: stream_util.py:68
grpc.framework.foundation.stream_util.ThreadSwitchingConsumer._spinning
_spinning
Definition: stream_util.py:93
grpc.framework.foundation.stream_util.IterableConsumer.terminate
def terminate(self)
Definition: stream_util.py:56
grpc.framework.foundation.stream_util.ThreadSwitchingConsumer._values
_values
Definition: stream_util.py:94
grpc.framework.foundation.stream_util.IterableConsumer.__init__
def __init__(self)
Definition: stream_util.py:45
grpc.framework.foundation.stream_util.TransformingConsumer.consume
def consume(self, value)
Definition: stream_util.py:32
grpc.framework.foundation.stream_util.IterableConsumer.next
def next(self)
Definition: stream_util.py:74
grpc.framework.foundation.stream_util.IterableConsumer._active
_active
Definition: stream_util.py:48
grpc.framework.foundation.stream_util.IterableConsumer._condition
_condition
Definition: stream_util.py:46
grpc.framework.foundation.stream_util.IterableConsumer.__next__
def __next__(self)
Definition: stream_util.py:71
grpc.framework.foundation.stream_util.ThreadSwitchingConsumer._spin
def _spin(self, sink, value, terminate)
Definition: stream_util.py:97
grpc.framework.foundation.stream_util.IterableConsumer.consume_and_terminate
def consume_and_terminate(self, value)
Definition: stream_util.py:61
grpc.framework.foundation.stream_util.IterableConsumer._values
_values
Definition: stream_util.py:47
wait
static void wait(notification *n)
Definition: alts_tsi_handshaker_test.cc:114
grpc.framework.foundation.stream_util.TransformingConsumer.terminate
def terminate(self)
Definition: stream_util.py:35
grpc.framework.foundation.stream_util.ThreadSwitchingConsumer._active
_active
Definition: stream_util.py:95
grpc.framework.foundation.stream_util.ThreadSwitchingConsumer.terminate
def terminate(self)
Definition: stream_util.py:132


grpc
Author(s):
autogenerated on Thu Mar 13 2025 03:01:25