14 """Tests behavior around the metadata mechanism."""
23 from src.proto.grpc.testing
import messages_pb2
24 from src.proto.grpc.testing
import test_pb2_grpc
40 aio.insecure_channel(
'')
41 aio.secure_channel(
'', channel_creds)
44 aio.shutdown_grpc_aio()
51 async
def ping_pong():
53 channel = aio.insecure_channel(address)
54 stub = test_pb2_grpc.TestServiceStub(channel)
59 await server.stop(
None)
61 for i
in range(_NUM_OF_LOOPS):
62 old_loop = asyncio.get_event_loop()
65 loop = asyncio.new_event_loop()
67 asyncio.set_event_loop(loop)
69 loop.run_until_complete(ping_pong())
71 aio.shutdown_grpc_aio()
74 if __name__ ==
"__main__":
75 logging.basicConfig(level=logging.DEBUG)
76 unittest.main(verbosity=2)