27 _SIMPLE_UNARY_UNARY =
'/test/SimpleUnaryUnary'
28 _BLOCK_FOREVER =
'/test/BlockForever'
29 _BLOCK_BRIEFLY =
'/test/BlockBriefly'
30 _UNARY_STREAM_ASYNC_GEN =
'/test/UnaryStreamAsyncGen'
31 _UNARY_STREAM_READER_WRITER =
'/test/UnaryStreamReaderWriter'
32 _UNARY_STREAM_EVILLY_MIXED =
'/test/UnaryStreamEvillyMixed'
33 _STREAM_UNARY_ASYNC_GEN =
'/test/StreamUnaryAsyncGen'
34 _STREAM_UNARY_READER_WRITER =
'/test/StreamUnaryReaderWriter'
35 _STREAM_UNARY_EVILLY_MIXED =
'/test/StreamUnaryEvillyMixed'
36 _STREAM_STREAM_ASYNC_GEN =
'/test/StreamStreamAsyncGen'
37 _STREAM_STREAM_READER_WRITER =
'/test/StreamStreamReaderWriter'
38 _STREAM_STREAM_EVILLY_MIXED =
'/test/StreamStreamEvillyMixed'
39 _UNIMPLEMENTED_METHOD =
'/test/UnimplementedMethod'
40 _ERROR_IN_STREAM_STREAM =
'/test/ErrorInStreamStream'
41 _ERROR_IN_STREAM_UNARY =
'/test/ErrorInStreamUnary'
42 _ERROR_WITHOUT_RAISE_IN_UNARY_UNARY =
'/test/ErrorWithoutRaiseInUnaryUnary'
43 _ERROR_WITHOUT_RAISE_IN_STREAM_STREAM =
'/test/ErrorWithoutRaiseInStreamStream'
44 _INVALID_TRAILING_METADATA =
'/test/InvalidTrailingMetadata'
46 _REQUEST = b
'\x00\x00\x00'
47 _RESPONSE = b
'\x01\x01\x01'
48 _NUM_STREAM_REQUESTS = 3
49 _NUM_STREAM_RESPONSES = 5
50 _MAXIMUM_CONCURRENT_RPCS = 5
56 self.
_called = asyncio.get_event_loop().create_future()
64 _UNARY_STREAM_ASYNC_GEN:
67 _UNARY_STREAM_READER_WRITER:
70 _UNARY_STREAM_EVILLY_MIXED:
73 _STREAM_UNARY_ASYNC_GEN:
76 _STREAM_UNARY_READER_WRITER:
79 _STREAM_UNARY_EVILLY_MIXED:
82 _STREAM_STREAM_ASYNC_GEN:
85 _STREAM_STREAM_READER_WRITER:
88 _STREAM_STREAM_EVILLY_MIXED:
91 _ERROR_IN_STREAM_STREAM:
94 _ERROR_IN_STREAM_UNARY:
97 _ERROR_WITHOUT_RAISE_IN_UNARY_UNARY:
100 _ERROR_WITHOUT_RAISE_IN_STREAM_STREAM:
103 _INVALID_TRAILING_METADATA:
113 await asyncio.get_event_loop().create_future()
116 await asyncio.sleep(test_constants.SHORT_TIMEOUT / 2)
120 for _
in range(_NUM_STREAM_RESPONSES):
124 for _
in range(_NUM_STREAM_RESPONSES):
125 await context.write(_RESPONSE)
129 for _
in range(_NUM_STREAM_RESPONSES - 1):
130 await context.write(_RESPONSE)
134 async
for request
in request_iterator:
135 assert _REQUEST == request
137 assert _NUM_STREAM_REQUESTS == request_count
141 for _
in range(_NUM_STREAM_REQUESTS):
142 assert _REQUEST == await context.read()
146 assert _REQUEST == await context.read()
148 async
for request
in request_iterator:
149 assert _REQUEST == request
151 assert _NUM_STREAM_REQUESTS - 1 == request_count
156 async
for request
in request_iterator:
157 assert _REQUEST == request
159 assert _NUM_STREAM_REQUESTS == request_count
161 for _
in range(_NUM_STREAM_RESPONSES):
165 for _
in range(_NUM_STREAM_REQUESTS):
166 assert _REQUEST == await context.read()
167 for _
in range(_NUM_STREAM_RESPONSES):
168 await context.write(_RESPONSE)
171 assert _REQUEST == await context.read()
173 async
for request
in request_iterator:
174 assert _REQUEST == request
176 assert _NUM_STREAM_REQUESTS - 1 == request_count
179 for _
in range(_NUM_STREAM_RESPONSES - 1):
180 await context.write(_RESPONSE)
183 async
for request
in request_iterator:
184 assert _REQUEST == request
185 raise RuntimeError(
'A testing RuntimeError!')
190 async
for request
in request_iterator:
191 assert _REQUEST == request
193 if request_count >= 1:
194 raise ValueError(
'A testing RuntimeError!')
197 assert _REQUEST == request
198 context.set_code(grpc.StatusCode.INTERNAL)
202 async
for request
in request_iterator:
203 assert _REQUEST == request
204 context.set_code(grpc.StatusCode.INTERNAL)
207 assert _REQUEST == request
208 for invalid_metadata
in [
216 context.set_trailing_metadata(invalid_metadata)
221 f
'No TypeError raised for invalid metadata: {invalid_metadata}'
224 await context.abort(grpc.StatusCode.DATA_LOSS,
225 details=
"invalid abort",
227 'error': (
'error1',
'error2')
240 server = aio.server()
241 port = server.add_insecure_port(
'[::]:0')
243 server.add_generic_rpc_handlers((generic_handler,))
245 return 'localhost:%d' % port, server, generic_handler
256 await self._server.
stop(
None)
260 response = await unary_unary_call(_REQUEST)
261 self.assertEqual(response, _RESPONSE)
265 call = unary_stream_call(_REQUEST)
268 async
for response
in call:
270 self.assertEqual(_RESPONSE, response)
272 self.assertEqual(_NUM_STREAM_RESPONSES, response_cnt)
273 self.assertEqual(await call.code(), grpc.StatusCode.OK)
277 _UNARY_STREAM_READER_WRITER)
278 call = unary_stream_call(_REQUEST)
280 for _
in range(_NUM_STREAM_RESPONSES):
281 response = await call.read()
282 self.assertEqual(_RESPONSE, response)
284 self.assertEqual(await call.code(), grpc.StatusCode.OK)
288 _UNARY_STREAM_EVILLY_MIXED)
289 call = unary_stream_call(_REQUEST)
292 self.assertEqual(_RESPONSE, await call.read())
295 with self.assertRaises(aio.UsageError):
296 async
for response
in call:
297 self.assertEqual(_RESPONSE, response)
301 call = stream_unary_call()
303 for _
in range(_NUM_STREAM_REQUESTS):
304 await call.write(_REQUEST)
305 await call.done_writing()
307 response = await call
308 self.assertEqual(_RESPONSE, response)
309 self.assertEqual(await call.code(), grpc.StatusCode.OK)
317 for _
in range(_NUM_STREAM_REQUESTS):
322 call = stream_unary_call(request_gen())
324 response = await call
325 self.assertEqual(_RESPONSE, response)
326 self.assertEqual(await call.code(), grpc.StatusCode.OK)
327 self.assertEqual(finished,
True)
331 _STREAM_UNARY_READER_WRITER)
332 call = stream_unary_call()
334 for _
in range(_NUM_STREAM_REQUESTS):
335 await call.write(_REQUEST)
336 await call.done_writing()
338 response = await call
339 self.assertEqual(_RESPONSE, response)
340 self.assertEqual(await call.code(), grpc.StatusCode.OK)
344 _STREAM_UNARY_EVILLY_MIXED)
345 call = stream_unary_call()
347 for _
in range(_NUM_STREAM_REQUESTS):
348 await call.write(_REQUEST)
349 await call.done_writing()
351 response = await call
352 self.assertEqual(_RESPONSE, response)
353 self.assertEqual(await call.code(), grpc.StatusCode.OK)
357 _STREAM_STREAM_ASYNC_GEN)
358 call = stream_stream_call()
360 for _
in range(_NUM_STREAM_REQUESTS):
361 await call.write(_REQUEST)
362 await call.done_writing()
364 for _
in range(_NUM_STREAM_RESPONSES):
365 response = await call.read()
366 self.assertEqual(_RESPONSE, response)
368 self.assertEqual(await call.code(), grpc.StatusCode.OK)
372 _STREAM_STREAM_READER_WRITER)
373 call = stream_stream_call()
375 for _
in range(_NUM_STREAM_REQUESTS):
376 await call.write(_REQUEST)
377 await call.done_writing()
379 for _
in range(_NUM_STREAM_RESPONSES):
380 response = await call.read()
381 self.assertEqual(_RESPONSE, response)
383 self.assertEqual(await call.code(), grpc.StatusCode.OK)
387 _STREAM_STREAM_EVILLY_MIXED)
388 call = stream_stream_call()
390 for _
in range(_NUM_STREAM_REQUESTS):
391 await call.write(_REQUEST)
392 await call.done_writing()
394 for _
in range(_NUM_STREAM_RESPONSES):
395 response = await call.read()
396 self.assertEqual(_RESPONSE, response)
398 self.assertEqual(await call.code(), grpc.StatusCode.OK)
401 await self._server.
stop(
None)
407 await self._server.
stop(
None)
413 shutdown_start_time = time.time()
414 await self._server.
stop(test_constants.SHORT_TIMEOUT)
415 grace_period_length = time.time() - shutdown_start_time
416 self.assertGreater(grace_period_length,
417 test_constants.SHORT_TIMEOUT / 3)
420 self.assertEqual(_RESPONSE, await call)
421 self.assertTrue(call.done())
427 await self._server.
stop(test_constants.SHORT_TIMEOUT)
429 with self.assertRaises(aio.AioRpcError)
as exception_context:
431 self.assertEqual(grpc.StatusCode.UNAVAILABLE,
432 exception_context.exception.code())
439 shutdown_start_time = time.time()
440 await asyncio.gather(
441 self._server.
stop(test_constants.LONG_TIMEOUT),
442 self._server.
stop(test_constants.SHORT_TIMEOUT),
443 self._server.
stop(test_constants.LONG_TIMEOUT),
445 grace_period_length = time.time() - shutdown_start_time
446 self.assertGreater(grace_period_length,
447 test_constants.SHORT_TIMEOUT / 3)
449 self.assertEqual(_RESPONSE, await call)
450 self.assertTrue(call.done())
457 await asyncio.gather(
458 self._server.
stop(test_constants.LONG_TIMEOUT),
459 self._server.
stop(
None),
460 self._server.
stop(test_constants.SHORT_TIMEOUT),
461 self._server.
stop(test_constants.LONG_TIMEOUT),
464 with self.assertRaises(aio.AioRpcError)
as exception_context:
466 self.assertEqual(grpc.StatusCode.UNAVAILABLE,
467 exception_context.exception.code())
470 await self._server.
stop(
None)
474 with self.assertRaises(aio.AioRpcError):
479 with self.assertRaises(aio.AioRpcError)
as exception_context:
481 rpc_error = exception_context.exception
482 self.assertEqual(grpc.StatusCode.UNIMPLEMENTED, rpc_error.code())
486 _STREAM_STREAM_ASYNC_GEN)
487 call = stream_stream_call()
490 await call.write(_REQUEST)
491 await self._server.
stop(
None)
493 self.assertEqual(grpc.StatusCode.UNAVAILABLE, await call.code())
498 _ERROR_IN_STREAM_STREAM)
499 call = stream_stream_call()
502 await call.write(_REQUEST)
505 self.assertEqual(grpc.StatusCode.UNKNOWN, await call.code())
511 with self.assertRaises(aio.AioRpcError)
as exception_context:
514 rpc_error = exception_context.exception
515 self.assertEqual(grpc.StatusCode.INTERNAL, rpc_error.code())
519 _ERROR_WITHOUT_RAISE_IN_STREAM_STREAM)()
521 for _
in range(_NUM_STREAM_REQUESTS):
522 await call.write(_REQUEST)
523 await call.done_writing()
525 self.assertEqual(grpc.StatusCode.INTERNAL, await call.code())
530 async
def request_gen():
531 for _
in range(_NUM_STREAM_REQUESTS):
534 call = stream_unary_call(request_gen())
536 with self.assertRaises(aio.AioRpcError)
as exception_context:
538 rpc_error = exception_context.exception
539 self.assertEqual(grpc.StatusCode.UNKNOWN, rpc_error.code())
542 server = aio.server(options=((
'grpc.so_reuseport', 0),))
543 port = server.add_insecure_port(
'localhost:0')
544 bind_address =
"localhost:%d" % port
546 with self.assertRaises(RuntimeError):
547 server.add_insecure_port(bind_address)
550 (resources.private_key(), resources.certificate_chain())
552 with self.assertRaises(RuntimeError):
553 server.add_secure_port(bind_address, server_credentials)
557 server = aio.server(maximum_concurrent_rpcs=_MAXIMUM_CONCURRENT_RPCS)
558 port = server.add_insecure_port(
'localhost:0')
559 bind_address =
"localhost:%d" % port
563 channel = aio.insecure_channel(bind_address)
566 for _
in range(3 * _MAXIMUM_CONCURRENT_RPCS):
567 rpcs.append(channel.unary_unary(_BLOCK_BRIEFLY)(_REQUEST))
568 task = self.
loop.create_task(
569 asyncio.wait(rpcs, return_when=asyncio.FIRST_EXCEPTION))
571 start_time = time.time()
573 elapsed_time = time.time() - start_time
574 self.assertGreater(elapsed_time, test_constants.SHORT_TIMEOUT * 3 / 2)
576 await channel.close()
582 with self.assertRaises(aio.AioRpcError)
as exception_context:
585 rpc_error = exception_context.exception
586 self.assertEqual(grpc.StatusCode.UNKNOWN, rpc_error.code())
587 self.assertIn(
'trailing', rpc_error.details())
590 if __name__ ==
'__main__':
591 logging.basicConfig(level=logging.DEBUG)
592 unittest.main(verbosity=2)