stream_testing.py
Go to the documentation of this file.
1 # Copyright 2015 gRPC authors.
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 # http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14 """Utilities for testing stream-related code."""
15 
16 from grpc.framework.foundation import stream
17 
18 
19 class TestConsumer(stream.Consumer):
20  """A stream.Consumer instrumented for testing.
21 
22  Attributes:
23  calls: A sequence of value-termination pairs describing the history of calls
24  made on this object.
25  """
26 
27  def __init__(self):
28  self.calls = []
29 
30  def consume(self, value):
31  """See stream.Consumer.consume for specification."""
32  self.calls.append((value, False))
33 
34  def terminate(self):
35  """See stream.Consumer.terminate for specification."""
36  self.calls.append((None, True))
37 
38  def consume_and_terminate(self, value):
39  """See stream.Consumer.consume_and_terminate for specification."""
40  self.calls.append((value, True))
41 
42  def is_legal(self):
43  """Reports whether or not a legal sequence of calls has been made."""
44  terminated = False
45  for value, terminal in self.calls:
46  if terminated:
47  return False
48  elif terminal:
49  terminated = True
50  elif value is None:
51  return False
52  else: # pylint: disable=useless-else-on-loop
53  return True
54 
55  def values(self):
56  """Returns the sequence of values that have been passed to this Consumer."""
57  return [value for value, _ in self.calls if value]
tests.unit.framework.foundation.stream_testing.TestConsumer.values
def values(self)
Definition: stream_testing.py:55
grpc.framework.foundation
Definition: src/python/grpcio/grpc/framework/foundation/__init__.py:1
tests.unit.framework.foundation.stream_testing.TestConsumer.calls
calls
Definition: stream_testing.py:28
tests.unit.framework.foundation.stream_testing.TestConsumer
Definition: stream_testing.py:19
tests.unit.framework.foundation.stream_testing.TestConsumer.consume
def consume(self, value)
Definition: stream_testing.py:30
tests.unit.framework.foundation.stream_testing.TestConsumer.__init__
def __init__(self)
Definition: stream_testing.py:27
tests.unit.framework.foundation.stream_testing.TestConsumer.consume_and_terminate
def consume_and_terminate(self, value)
Definition: stream_testing.py:38
tests.unit.framework.foundation.stream_testing.TestConsumer.terminate
def terminate(self)
Definition: stream_testing.py:34
tests.unit.framework.foundation.stream_testing.TestConsumer.is_legal
def is_legal(self)
Definition: stream_testing.py:42


grpc
Author(s):
autogenerated on Thu Mar 13 2025 03:01:25