14 """Porting auth context tests from sync stack."""
28 _REQUEST = b
'\x00\x00\x00'
29 _RESPONSE = b
'\x00\x00\x00'
31 _UNARY_UNARY =
'/test/UnaryUnary'
33 _SERVER_HOST_OVERRIDE =
'foo.test.google.fr'
36 b
'waterzooi.test.google.be',
37 b
'*.test.youtube.com',
42 _AUTH_CTX =
'auth_ctx'
44 _PRIVATE_KEY = resources.private_key()
45 _CERTIFICATE_CHAIN = resources.certificate_chain()
46 _TEST_ROOT_CERTIFICATES = resources.test_root_certificates()
47 _SERVER_CERTS = ((_PRIVATE_KEY, _CERTIFICATE_CHAIN),)
48 _PROPERTY_OPTIONS = ((
49 'grpc.ssl_target_name_override',
50 _SERVER_HOST_OVERRIDE,
55 servicer_context: aio.ServicerContext):
57 _ID: servicer_context.peer_identities(),
58 _ID_KEY: servicer_context.peer_identity_key(),
59 _AUTH_CTX: servicer_context.auth_context()
71 server.add_generic_rpc_handlers((handler,))
72 port = server.add_insecure_port(
'[::]:0')
75 async
with aio.insecure_channel(
'localhost:%d' % port)
as channel:
76 response = await channel.unary_unary(_UNARY_UNARY)(_REQUEST)
77 await server.stop(
None)
79 auth_data = pickle.loads(response)
80 self.assertIsNone(auth_data[_ID])
81 self.assertIsNone(auth_data[_ID_KEY])
84 'security_level': [b
'TSI_SECURITY_NONE'],
85 'transport_security_type': [b
'insecure'],
86 }, auth_data[_AUTH_CTX])
88 async
def test_secure_no_cert(self):
94 server.add_generic_rpc_handlers((handler,))
96 port = server.add_secure_port(
'[::]:0', server_cred)
100 root_certificates=_TEST_ROOT_CERTIFICATES)
101 channel = aio.secure_channel(
'localhost:{}'.
format(port),
103 options=_PROPERTY_OPTIONS)
104 response = await channel.unary_unary(_UNARY_UNARY)(_REQUEST)
105 await channel.close()
106 await server.stop(
None)
108 auth_data = pickle.loads(response)
109 self.assertIsNone(auth_data[_ID])
110 self.assertIsNone(auth_data[_ID_KEY])
111 self.assertDictEqual(
113 'security_level': [b
'TSI_PRIVACY_AND_INTEGRITY'],
114 'transport_security_type': [b
'ssl'],
115 'ssl_session_reused': [b
'false'],
116 }, auth_data[_AUTH_CTX])
118 async
def test_secure_client_cert(self):
123 server = aio.server()
124 server.add_generic_rpc_handlers((handler,))
127 root_certificates=_TEST_ROOT_CERTIFICATES,
128 require_client_auth=
True)
129 port = server.add_secure_port(
'[::]:0', server_cred)
133 root_certificates=_TEST_ROOT_CERTIFICATES,
134 private_key=_PRIVATE_KEY,
135 certificate_chain=_CERTIFICATE_CHAIN)
136 channel = aio.secure_channel(
'localhost:{}'.
format(port),
138 options=_PROPERTY_OPTIONS)
140 response = await channel.unary_unary(_UNARY_UNARY)(_REQUEST)
141 await channel.close()
142 await server.stop(
None)
144 auth_data = pickle.loads(response)
145 auth_ctx = auth_data[_AUTH_CTX]
146 self.assertCountEqual(_CLIENT_IDS, auth_data[_ID])
147 self.assertEqual(
'x509_subject_alternative_name', auth_data[_ID_KEY])
148 self.assertSequenceEqual([b
'ssl'], auth_ctx[
'transport_security_type'])
149 self.assertSequenceEqual([b
'*.test.google.com'],
150 auth_ctx[
'x509_common_name'])
152 async
def _do_one_shot_client_rpc(self, channel_creds, channel_options,
153 port, expect_ssl_session_reused):
154 channel = aio.secure_channel(
'localhost:{}'.
format(port),
156 options=channel_options)
157 response = await channel.unary_unary(_UNARY_UNARY)(_REQUEST)
158 auth_data = pickle.loads(response)
159 self.assertEqual(expect_ssl_session_reused,
160 auth_data[_AUTH_CTX][
'ssl_session_reused'])
161 await channel.close()
163 async
def test_session_resumption(self):
169 server = aio.server()
170 server.add_generic_rpc_handlers((handler,))
172 port = server.add_secure_port(
'[::]:0', server_cred)
176 cache = session_cache.ssl_session_cache_lru(1)
178 root_certificates=_TEST_ROOT_CERTIFICATES)
179 channel_options = _PROPERTY_OPTIONS + (
180 (
'grpc.ssl_session_cache', cache),)
183 await self._do_one_shot_client_rpc(channel_creds,
186 expect_ssl_session_reused=[b
'false'])
189 await self._do_one_shot_client_rpc(channel_creds,
192 expect_ssl_session_reused=[b
'true'])
193 await server.stop(
None)
196 if __name__ ==
'__main__':
197 logging.basicConfig(level=logging.DEBUG)