testing/grpc_testing/_channel/_channel.py
Go to the documentation of this file.
1 # Copyright 2017 gRPC authors.
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 # http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14 
15 import grpc_testing
16 from grpc_testing._channel import _channel_rpc
17 from grpc_testing._channel import _multi_callable
18 
19 
20 # All serializer and deserializer parameters are not (yet) used by this
21 # test infrastructure.
22 # pylint: disable=unused-argument
24 
25  def __init__(self, time, state):
26  self._time = time
27  self._state = state
28 
29  def subscribe(self, callback, try_to_connect=False):
30  raise NotImplementedError()
31 
32  def unsubscribe(self, callback):
33  raise NotImplementedError()
34 
35  def unary_unary(self,
36  method,
37  request_serializer=None,
38  response_deserializer=None):
39  return _multi_callable.UnaryUnary(method, self._state)
40 
41  def unary_stream(self,
42  method,
43  request_serializer=None,
44  response_deserializer=None):
45  return _multi_callable.UnaryStream(method, self._state)
46 
47  def stream_unary(self,
48  method,
49  request_serializer=None,
50  response_deserializer=None):
51  return _multi_callable.StreamUnary(method, self._state)
52 
53  def stream_stream(self,
54  method,
55  request_serializer=None,
56  response_deserializer=None):
57  return _multi_callable.StreamStream(method, self._state)
58 
59  def _close(self):
60  # TODO(https://github.com/grpc/grpc/issues/12531): Decide what
61  # action to take here, if any?
62  pass
63 
64  def __enter__(self):
65  return self
66 
67  def __exit__(self, exc_type, exc_val, exc_tb):
68  self._close()
69  return False
70 
71  def close(self):
72  self._close()
73 
74  def take_unary_unary(self, method_descriptor):
75  return _channel_rpc.unary_unary(self._state, method_descriptor)
76 
77  def take_unary_stream(self, method_descriptor):
78  return _channel_rpc.unary_stream(self._state, method_descriptor)
79 
80  def take_stream_unary(self, method_descriptor):
81  return _channel_rpc.stream_unary(self._state, method_descriptor)
82 
83  def take_stream_stream(self, method_descriptor):
84  return _channel_rpc.stream_stream(self._state, method_descriptor)
85 
86 
87 # pylint: enable=unused-argument
grpc_testing._channel._channel.TestingChannel._state
_state
Definition: testing/grpc_testing/_channel/_channel.py:27
grpc_testing._channel._multi_callable.UnaryStream
Definition: _multi_callable.py:47
grpc_testing._channel._channel.TestingChannel.take_unary_unary
def take_unary_unary(self, method_descriptor)
Definition: testing/grpc_testing/_channel/_channel.py:74
grpc_testing._channel._channel.TestingChannel.unary_stream
def unary_stream(self, method, request_serializer=None, response_deserializer=None)
Definition: testing/grpc_testing/_channel/_channel.py:41
grpc_testing._channel._channel.TestingChannel.__enter__
def __enter__(self)
Definition: testing/grpc_testing/_channel/_channel.py:64
grpc_testing._channel._channel.TestingChannel.unary_unary
def unary_unary(self, method, request_serializer=None, response_deserializer=None)
Definition: testing/grpc_testing/_channel/_channel.py:35
grpc_testing._channel._multi_callable.StreamUnary
Definition: _multi_callable.py:60
grpc_testing._channel._channel.TestingChannel.take_unary_stream
def take_unary_stream(self, method_descriptor)
Definition: testing/grpc_testing/_channel/_channel.py:77
grpc_testing._channel._channel.TestingChannel.unsubscribe
def unsubscribe(self, callback)
Definition: testing/grpc_testing/_channel/_channel.py:32
grpc_testing._channel._channel.TestingChannel.stream_stream
def stream_stream(self, method, request_serializer=None, response_deserializer=None)
Definition: testing/grpc_testing/_channel/_channel.py:53
grpc_testing.Channel
Definition: src/python/grpcio_testing/grpc_testing/__init__.py:215
grpc_testing._channel._channel.TestingChannel._close
def _close(self)
Definition: testing/grpc_testing/_channel/_channel.py:59
grpc_testing._channel._channel.TestingChannel.__init__
def __init__(self, time, state)
Definition: testing/grpc_testing/_channel/_channel.py:25
grpc_testing._channel._channel.TestingChannel.__exit__
def __exit__(self, exc_type, exc_val, exc_tb)
Definition: testing/grpc_testing/_channel/_channel.py:67
grpc_testing._channel._channel.TestingChannel.close
def close(self)
Definition: testing/grpc_testing/_channel/_channel.py:71
grpc_testing._channel._channel.TestingChannel.subscribe
def subscribe(self, callback, try_to_connect=False)
Definition: testing/grpc_testing/_channel/_channel.py:29
grpc_testing._channel._channel.TestingChannel.take_stream_stream
def take_stream_stream(self, method_descriptor)
Definition: testing/grpc_testing/_channel/_channel.py:83
grpc_testing._channel._channel.TestingChannel
Definition: testing/grpc_testing/_channel/_channel.py:23
grpc_testing._channel._channel.TestingChannel.stream_unary
def stream_unary(self, method, request_serializer=None, response_deserializer=None)
Definition: testing/grpc_testing/_channel/_channel.py:47
grpc_testing._channel._multi_callable.StreamStream
Definition: _multi_callable.py:100
grpc_testing._channel._channel.TestingChannel._time
_time
Definition: testing/grpc_testing/_channel/_channel.py:26
grpc_testing._channel._channel.TestingChannel.take_stream_unary
def take_stream_unary(self, method_descriptor)
Definition: testing/grpc_testing/_channel/_channel.py:80
grpc_testing._channel._multi_callable.UnaryUnary
Definition: _multi_callable.py:22
grpc_testing._channel
Definition: src/python/grpcio_testing/grpc_testing/_channel/__init__.py:1


grpc
Author(s):
autogenerated on Fri May 16 2025 02:57:38