14 """Testing the server context ability to access peer info."""
19 from typing
import Callable, Iterable, Sequence, Tuple
25 from src.proto.grpc.testing
import messages_pb2
26 from src.proto.grpc.testing
import test_pb2_grpc
33 _REQUEST = b
'\x03\x07'
34 _TEST_METHOD =
'/test/UnaryUnary'
41 @grpc.unary_unary_rpc_method_handler
42 async
def check_peer_unary_unary(request: bytes,
43 context: aio.ServicerContext):
44 self.assertEqual(_REQUEST, request)
46 self.assertIn(
'ip', context.peer())
52 'test', {
'UnaryUnary': check_peer_unary_unary})
53 server.add_generic_rpc_handlers((handlers,))
54 port = server.add_insecure_port(
'[::]:0')
58 async
with aio.insecure_channel(
'localhost:%d' % port)
as channel:
59 response = await channel.unary_unary(_TEST_METHOD)(_REQUEST)
60 self.assertEqual(_REQUEST, response)
62 await server.stop(
None)
65 if __name__ ==
'__main__':
66 logging.basicConfig(level=logging.DEBUG)
67 unittest.main(verbosity=2)