14 """Tests exposure of SSL auth context"""
21 from grpc
import _channel
28 _REQUEST = b
'\x00\x00\x00'
29 _RESPONSE = b
'\x00\x00\x00'
31 _UNARY_UNARY =
'/test/UnaryUnary'
33 _SERVER_HOST_OVERRIDE =
'foo.test.google.fr'
36 b
'waterzooi.test.google.be',
37 b
'*.test.youtube.com',
42 _AUTH_CTX =
'auth_ctx'
44 _PRIVATE_KEY = resources.private_key()
45 _CERTIFICATE_CHAIN = resources.certificate_chain()
46 _TEST_ROOT_CERTIFICATES = resources.test_root_certificates()
47 _SERVER_CERTS = ((_PRIVATE_KEY, _CERTIFICATE_CHAIN),)
48 _PROPERTY_OPTIONS = ((
49 'grpc.ssl_target_name_override',
50 _SERVER_HOST_OVERRIDE,
56 _ID: servicer_context.peer_identities(),
57 _ID_KEY: servicer_context.peer_identity_key(),
58 _AUTH_CTX: servicer_context.auth_context()
69 server = test_common.test_server()
70 server.add_generic_rpc_handlers((handler,))
71 port = server.add_insecure_port(
'[::]:0')
75 response = channel.unary_unary(_UNARY_UNARY)(_REQUEST)
78 auth_data = pickle.loads(response)
79 self.assertIsNone(auth_data[_ID])
80 self.assertIsNone(auth_data[_ID_KEY])
83 'security_level': [b
'TSI_SECURITY_NONE'],
84 'transport_security_type': [b
'insecure'],
85 }, auth_data[_AUTH_CTX])
92 server = test_common.test_server()
93 server.add_generic_rpc_handlers((handler,))
95 port = server.add_secure_port(
'[::]:0', server_cred)
99 root_certificates=_TEST_ROOT_CERTIFICATES)
102 options=_PROPERTY_OPTIONS)
103 response = channel.unary_unary(_UNARY_UNARY)(_REQUEST)
107 auth_data = pickle.loads(response)
108 self.assertIsNone(auth_data[_ID])
109 self.assertIsNone(auth_data[_ID_KEY])
110 self.assertDictEqual(
112 'security_level': [b
'TSI_PRIVACY_AND_INTEGRITY'],
113 'transport_security_type': [b
'ssl'],
114 'ssl_session_reused': [b
'false'],
115 }, auth_data[_AUTH_CTX])
122 server = test_common.test_server()
123 server.add_generic_rpc_handlers((handler,))
126 root_certificates=_TEST_ROOT_CERTIFICATES,
127 require_client_auth=
True)
128 port = server.add_secure_port(
'[::]:0', server_cred)
132 root_certificates=_TEST_ROOT_CERTIFICATES,
133 private_key=_PRIVATE_KEY,
134 certificate_chain=_CERTIFICATE_CHAIN)
137 options=_PROPERTY_OPTIONS)
139 response = channel.unary_unary(_UNARY_UNARY)(_REQUEST)
143 auth_data = pickle.loads(response)
144 auth_ctx = auth_data[_AUTH_CTX]
145 six.assertCountEqual(self, _CLIENT_IDS, auth_data[_ID])
146 self.assertEqual(
'x509_subject_alternative_name', auth_data[_ID_KEY])
147 self.assertSequenceEqual([b
'ssl'], auth_ctx[
'transport_security_type'])
148 self.assertSequenceEqual([b
'*.test.google.com'],
149 auth_ctx[
'x509_common_name'])
152 expect_ssl_session_reused):
155 options=channel_options)
156 response = channel.unary_unary(_UNARY_UNARY)(_REQUEST)
157 auth_data = pickle.loads(response)
158 self.assertEqual(expect_ssl_session_reused,
159 auth_data[_AUTH_CTX][
'ssl_session_reused'])
168 server = test_common.test_server()
169 server.add_generic_rpc_handlers((handler,))
171 port = server.add_secure_port(
'[::]:0', server_cred)
175 cache = session_cache.ssl_session_cache_lru(1)
177 root_certificates=_TEST_ROOT_CERTIFICATES)
178 channel_options = _PROPERTY_OPTIONS + (
179 (
'grpc.ssl_session_cache', cache),)
185 expect_ssl_session_reused=[b
'false'])
191 expect_ssl_session_reused=[b
'true'])
195 if __name__ ==
'__main__':
196 logging.basicConfig()
197 unittest.main(verbosity=2)