14 """Test helpers for RPC invocation tests."""
27 _SERIALIZE_REQUEST =
lambda bytestring: bytestring * 2
28 _DESERIALIZE_REQUEST =
lambda bytestring: bytestring[
len(bytestring) // 2:]
29 _SERIALIZE_RESPONSE =
lambda bytestring: bytestring * 3
30 _DESERIALIZE_RESPONSE =
lambda bytestring: bytestring[:
len(bytestring) // 3]
32 _UNARY_UNARY =
'/test/UnaryUnary'
33 _UNARY_STREAM =
'/test/UnaryStream'
34 _UNARY_STREAM_NON_BLOCKING =
'/test/UnaryStreamNonBlocking'
35 _STREAM_UNARY =
'/test/StreamUnary'
36 _STREAM_STREAM =
'/test/StreamStream'
37 _STREAM_STREAM_NON_BLOCKING =
'/test/StreamStreamNonBlocking'
39 TIMEOUT_SHORT = datetime.timedelta(seconds=4).total_seconds()
69 for non_blocking_function
in non_blocking_functions:
70 non_blocking_function.__func__.experimental_non_blocking =
True
71 non_blocking_function.__func__.experimental_thread_pool = self.
_thread_pool
75 if servicer_context
is not None:
76 servicer_context.set_trailing_metadata(((
83 servicer_context.is_active()
84 servicer_context.time_remaining()
88 for _
in range(test_constants.STREAM_LENGTH):
92 if servicer_context
is not None:
93 servicer_context.set_trailing_metadata(((
100 for _
in range(test_constants.STREAM_LENGTH):
104 if servicer_context
is not None:
105 servicer_context.set_trailing_metadata(((
112 if servicer_context
is not None:
113 servicer_context.invocation_metadata()
115 response_elements = []
116 for request
in request_iterator:
118 response_elements.append(request)
120 if servicer_context
is not None:
121 servicer_context.set_trailing_metadata(((
125 return b
''.join(response_elements)
129 if servicer_context
is not None:
130 servicer_context.set_trailing_metadata(((
134 for request
in request_iterator:
140 servicer_context, on_next):
142 if servicer_context
is not None:
143 servicer_context.set_trailing_metadata(((
147 for request
in request_iterator:
156 def __init__(self, request_streaming, response_streaming,
157 request_deserializer, response_serializer, unary_unary,
158 unary_stream, stream_unary, stream_stream):
175 if handler_call_details.method == _UNARY_UNARY:
177 self.
_handler.handle_unary_unary,
None,
None,
179 elif handler_call_details.method == _UNARY_STREAM:
181 _SERIALIZE_RESPONSE,
None,
182 self.
_handler.handle_unary_stream,
None,
None)
183 elif handler_call_details.method == _UNARY_STREAM_NON_BLOCKING:
185 False,
True, _DESERIALIZE_REQUEST, _SERIALIZE_RESPONSE,
None,
186 self.
_handler.handle_unary_stream_non_blocking,
None,
None)
187 elif handler_call_details.method == _STREAM_UNARY:
189 _SERIALIZE_RESPONSE,
None,
None,
190 self.
_handler.handle_stream_unary,
None)
191 elif handler_call_details.method == _STREAM_STREAM:
194 elif handler_call_details.method == _STREAM_STREAM_NON_BLOCKING:
196 True,
True,
None,
None,
None,
None,
None,
197 self.
_handler.handle_stream_stream_non_blocking)
203 return channel.unary_unary(_UNARY_UNARY)
207 return channel.unary_stream(_UNARY_STREAM,
208 request_serializer=_SERIALIZE_REQUEST,
209 response_deserializer=_DESERIALIZE_RESPONSE)
213 return channel.unary_stream(_UNARY_STREAM_NON_BLOCKING,
214 request_serializer=_SERIALIZE_REQUEST,
215 response_deserializer=_DESERIALIZE_RESPONSE)
219 return channel.stream_unary(_STREAM_UNARY,
220 request_serializer=_SERIALIZE_REQUEST,
221 response_deserializer=_DESERIALIZE_RESPONSE)
225 return channel.stream_stream(_STREAM_STREAM)
229 return channel.stream_stream(_STREAM_STREAM_NON_BLOCKING)
240 port = self.
_server.add_insecure_port(
'[::]:0')
251 request = b
'\x57\x38'
255 metadata=((
'test',
'ConsumingOneStreamResponseUnaryRequest'),))
256 next(response_iterator)
259 self, multi_callable):
260 request = b
'\x57\x38'
265 'ConsumingSomeButNotAllStreamResponsesUnaryRequest'),))
266 for _
in range(test_constants.STREAM_LENGTH // 2):
267 next(response_iterator)
270 self, multi_callable):
272 b
'\x67\x88' for _
in range(test_constants.STREAM_LENGTH))
273 request_iterator =
iter(requests)
278 'ConsumingSomeButNotAllStreamResponsesStreamRequest'),))
279 for _
in range(test_constants.STREAM_LENGTH // 2):
280 next(response_iterator)
284 b
'\x67\x88' for _
in range(test_constants.STREAM_LENGTH))
285 request_iterator =
iter(requests)
290 'ConsumingTooManyStreamResponsesStreamRequest'),))
291 for _
in range(test_constants.STREAM_LENGTH):
292 next(response_iterator)
293 for _
in range(test_constants.STREAM_LENGTH):
294 with self.assertRaises(StopIteration):
295 next(response_iterator)
297 self.assertIsNotNone(response_iterator.initial_metadata())
298 self.assertIs(grpc.StatusCode.OK, response_iterator.code())
299 self.assertIsNotNone(response_iterator.details())
300 self.assertIsNotNone(response_iterator.trailing_metadata())
303 request = b
'\x07\x19'
308 metadata=((
'test',
'CancelledUnaryRequestStreamResponse'),))
310 response_iterator.cancel()
313 next(response_iterator)
314 self.assertIs(grpc.StatusCode.CANCELLED,
315 exception_context.exception.code())
316 self.assertIsNotNone(response_iterator.initial_metadata())
317 self.assertIs(grpc.StatusCode.CANCELLED, response_iterator.code())
318 self.assertIsNotNone(response_iterator.details())
319 self.assertIsNotNone(response_iterator.trailing_metadata())
323 b
'\x07\x08' for _
in range(test_constants.STREAM_LENGTH))
324 request_iterator =
iter(requests)
329 metadata=((
'test',
'CancelledStreamRequestStreamResponse'),))
330 response_iterator.cancel()
333 next(response_iterator)
334 self.assertIsNotNone(response_iterator.initial_metadata())
335 self.assertIs(grpc.StatusCode.CANCELLED, response_iterator.code())
336 self.assertIsNotNone(response_iterator.details())
337 self.assertIsNotNone(response_iterator.trailing_metadata())
340 request = b
'\x07\x19'
346 timeout=test_constants.SHORT_TIMEOUT,
347 metadata=((
'test',
'ExpiredUnaryRequestStreamResponse'),))
348 next(response_iterator)
350 self.assertIs(grpc.StatusCode.DEADLINE_EXCEEDED,
351 exception_context.exception.code())
352 self.assertIs(grpc.StatusCode.DEADLINE_EXCEEDED,
353 response_iterator.code())
357 b
'\x67\x18' for _
in range(test_constants.STREAM_LENGTH))
358 request_iterator =
iter(requests)
364 timeout=test_constants.SHORT_TIMEOUT,
365 metadata=((
'test',
'ExpiredStreamRequestStreamResponse'),))
366 next(response_iterator)
368 self.assertIs(grpc.StatusCode.DEADLINE_EXCEEDED,
369 exception_context.exception.code())
370 self.assertIs(grpc.StatusCode.DEADLINE_EXCEEDED,
371 response_iterator.code())
374 request = b
'\x37\x17'
380 metadata=((
'test',
'FailedUnaryRequestStreamResponse'),))
381 next(response_iterator)
383 self.assertIs(grpc.StatusCode.UNKNOWN,
384 exception_context.exception.code())
388 b
'\x67\x88' for _
in range(test_constants.STREAM_LENGTH))
389 request_iterator =
iter(requests)
395 metadata=((
'test',
'FailedStreamRequestStreamResponse'),))
396 tuple(response_iterator)
398 self.assertIs(grpc.StatusCode.UNKNOWN,
399 exception_context.exception.code())
400 self.assertIs(grpc.StatusCode.UNKNOWN, response_iterator.code())
403 self, multi_callable):
404 request = b
'\x37\x17'
408 'IgnoredUnaryRequestStreamResponse'),))
412 b
'\x67\x88' for _
in range(test_constants.STREAM_LENGTH))
413 request_iterator =
iter(requests)
417 'IgnoredStreamRequestStreamResponse'),))