Go to the documentation of this file.
26 _UNARY_UNARY =
'/test/UnaryUnary'
27 _UNARY_STREAM =
'/test/UnaryStream'
28 _STREAM_UNARY =
'/test/StreamUnary'
29 _STREAM_STREAM =
'/test/StreamStream'
37 for _
in range(test_constants.STREAM_LENGTH):
42 for request
in request_iterator:
48 for request
in request_iterator:
54 def __init__(self, request_streaming, response_streaming):
76 if handler_call_details.method == _UNARY_UNARY:
78 elif handler_call_details.method == _UNARY_STREAM:
80 elif handler_call_details.method == _STREAM_UNARY:
82 elif handler_call_details.method == _STREAM_STREAM:
93 port = self.
_server.add_insecure_port(
'[::]:0')
103 self.assertEqual(_RESPONSE, response)
107 self.assertSequenceEqual([_RESPONSE] * test_constants.STREAM_LENGTH,
108 list(response_iterator))
112 [_REQUEST] * test_constants.STREAM_LENGTH))
113 self.assertEqual(_RESPONSE, response)
117 [_REQUEST] * test_constants.STREAM_LENGTH))
118 self.assertSequenceEqual([_RESPONSE] * test_constants.STREAM_LENGTH,
119 list(response_iterator))
122 if __name__ ==
'__main__':
123 logging.basicConfig()
124 unittest.main(verbosity=2)
def insecure_channel(target, options=None, compression=None)
def testStreamUnary(self)
def handle_unary_unary(request, servicer_context)
Iterator[ResponseType] unary_stream(RequestType request, str target, str method, Optional[Callable[[Any], bytes]] request_serializer=None, Optional[Callable[[bytes], Any]] response_deserializer=None, Sequence[Tuple[AnyStr, AnyStr]] options=(), Optional[grpc.ChannelCredentials] channel_credentials=None, bool insecure=False, Optional[grpc.CallCredentials] call_credentials=None, Optional[grpc.Compression] compression=None, Optional[bool] wait_for_ready=None, Optional[float] timeout=_DEFAULT_TIMEOUT, Optional[Sequence[Tuple[str, Union[str, bytes]]]] metadata=None)
def service(self, handler_call_details)
Iterator[ResponseType] stream_stream(Iterator[RequestType] request_iterator, str target, str method, Optional[Callable[[Any], bytes]] request_serializer=None, Optional[Callable[[bytes], Any]] response_deserializer=None, Sequence[Tuple[AnyStr, AnyStr]] options=(), Optional[grpc.ChannelCredentials] channel_credentials=None, bool insecure=False, Optional[grpc.CallCredentials] call_credentials=None, Optional[grpc.Compression] compression=None, Optional[bool] wait_for_ready=None, Optional[float] timeout=_DEFAULT_TIMEOUT, Optional[Sequence[Tuple[str, Union[str, bytes]]]] metadata=None)
def testUnaryStream(self)
def handle_stream_unary(request_iterator, servicer_context)
ResponseType stream_unary(Iterator[RequestType] request_iterator, str target, str method, Optional[Callable[[Any], bytes]] request_serializer=None, Optional[Callable[[bytes], Any]] response_deserializer=None, Sequence[Tuple[AnyStr, AnyStr]] options=(), Optional[grpc.ChannelCredentials] channel_credentials=None, bool insecure=False, Optional[grpc.CallCredentials] call_credentials=None, Optional[grpc.Compression] compression=None, Optional[bool] wait_for_ready=None, Optional[float] timeout=_DEFAULT_TIMEOUT, Optional[Sequence[Tuple[str, Union[str, bytes]]]] metadata=None)
def handle_unary_stream(request, servicer_context)
ResponseType unary_unary(RequestType request, str target, str method, Optional[Callable[[Any], bytes]] request_serializer=None, Optional[Callable[[bytes], Any]] response_deserializer=None, Sequence[Tuple[AnyStr, AnyStr]] options=(), Optional[grpc.ChannelCredentials] channel_credentials=None, bool insecure=False, Optional[grpc.CallCredentials] call_credentials=None, Optional[grpc.Compression] compression=None, Optional[bool] wait_for_ready=None, Optional[float] timeout=_DEFAULT_TIMEOUT, Optional[Sequence[Tuple[str, Union[str, bytes]]]] metadata=None)
def handle_stream_stream(request_iterator, servicer_context)
def __init__(self, request_streaming, response_streaming)
def testStreamStream(self)
grpc
Author(s):
autogenerated on Thu Mar 13 2025 02:58:27