_auth_context_test.py
Go to the documentation of this file.
1 # Copyright 2017 gRPC authors.
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 # http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14 """Tests exposure of SSL auth context"""
15 
16 import logging
17 import pickle
18 import unittest
19 
20 import grpc
21 from grpc import _channel
22 from grpc.experimental import session_cache
23 import six
24 
25 from tests.unit import resources
26 from tests.unit import test_common
27 
28 _REQUEST = b'\x00\x00\x00'
29 _RESPONSE = b'\x00\x00\x00'
30 
31 _UNARY_UNARY = '/test/UnaryUnary'
32 
33 _SERVER_HOST_OVERRIDE = 'foo.test.google.fr'
34 _CLIENT_IDS = (
35  b'*.test.google.fr',
36  b'waterzooi.test.google.be',
37  b'*.test.youtube.com',
38  b'192.168.1.3',
39 )
40 _ID = 'id'
41 _ID_KEY = 'id_key'
42 _AUTH_CTX = 'auth_ctx'
43 
44 _PRIVATE_KEY = resources.private_key()
45 _CERTIFICATE_CHAIN = resources.certificate_chain()
46 _TEST_ROOT_CERTIFICATES = resources.test_root_certificates()
47 _SERVER_CERTS = ((_PRIVATE_KEY, _CERTIFICATE_CHAIN),)
48 _PROPERTY_OPTIONS = ((
49  'grpc.ssl_target_name_override',
50  _SERVER_HOST_OVERRIDE,
51 ),)
52 
53 
54 def handle_unary_unary(request, servicer_context):
55  return pickle.dumps({
56  _ID: servicer_context.peer_identities(),
57  _ID_KEY: servicer_context.peer_identity_key(),
58  _AUTH_CTX: servicer_context.auth_context()
59  })
60 
61 
62 class AuthContextTest(unittest.TestCase):
63 
64  def testInsecure(self):
65  handler = grpc.method_handlers_generic_handler('test', {
66  'UnaryUnary':
67  grpc.unary_unary_rpc_method_handler(handle_unary_unary)
68  })
69  server = test_common.test_server()
70  server.add_generic_rpc_handlers((handler,))
71  port = server.add_insecure_port('[::]:0')
72  server.start()
73 
74  with grpc.insecure_channel('localhost:%d' % port) as channel:
75  response = channel.unary_unary(_UNARY_UNARY)(_REQUEST)
76  server.stop(None)
77 
78  auth_data = pickle.loads(response)
79  self.assertIsNone(auth_data[_ID])
80  self.assertIsNone(auth_data[_ID_KEY])
81  self.assertDictEqual(
82  {
83  'security_level': [b'TSI_SECURITY_NONE'],
84  'transport_security_type': [b'insecure'],
85  }, auth_data[_AUTH_CTX])
86 
87  def testSecureNoCert(self):
88  handler = grpc.method_handlers_generic_handler('test', {
89  'UnaryUnary':
90  grpc.unary_unary_rpc_method_handler(handle_unary_unary)
91  })
92  server = test_common.test_server()
93  server.add_generic_rpc_handlers((handler,))
94  server_cred = grpc.ssl_server_credentials(_SERVER_CERTS)
95  port = server.add_secure_port('[::]:0', server_cred)
96  server.start()
97 
98  channel_creds = grpc.ssl_channel_credentials(
99  root_certificates=_TEST_ROOT_CERTIFICATES)
100  channel = grpc.secure_channel('localhost:{}'.format(port),
101  channel_creds,
102  options=_PROPERTY_OPTIONS)
103  response = channel.unary_unary(_UNARY_UNARY)(_REQUEST)
104  channel.close()
105  server.stop(None)
106 
107  auth_data = pickle.loads(response)
108  self.assertIsNone(auth_data[_ID])
109  self.assertIsNone(auth_data[_ID_KEY])
110  self.assertDictEqual(
111  {
112  'security_level': [b'TSI_PRIVACY_AND_INTEGRITY'],
113  'transport_security_type': [b'ssl'],
114  'ssl_session_reused': [b'false'],
115  }, auth_data[_AUTH_CTX])
116 
118  handler = grpc.method_handlers_generic_handler('test', {
119  'UnaryUnary':
120  grpc.unary_unary_rpc_method_handler(handle_unary_unary)
121  })
122  server = test_common.test_server()
123  server.add_generic_rpc_handlers((handler,))
124  server_cred = grpc.ssl_server_credentials(
125  _SERVER_CERTS,
126  root_certificates=_TEST_ROOT_CERTIFICATES,
127  require_client_auth=True)
128  port = server.add_secure_port('[::]:0', server_cred)
129  server.start()
130 
131  channel_creds = grpc.ssl_channel_credentials(
132  root_certificates=_TEST_ROOT_CERTIFICATES,
133  private_key=_PRIVATE_KEY,
134  certificate_chain=_CERTIFICATE_CHAIN)
135  channel = grpc.secure_channel('localhost:{}'.format(port),
136  channel_creds,
137  options=_PROPERTY_OPTIONS)
138 
139  response = channel.unary_unary(_UNARY_UNARY)(_REQUEST)
140  channel.close()
141  server.stop(None)
142 
143  auth_data = pickle.loads(response)
144  auth_ctx = auth_data[_AUTH_CTX]
145  six.assertCountEqual(self, _CLIENT_IDS, auth_data[_ID])
146  self.assertEqual('x509_subject_alternative_name', auth_data[_ID_KEY])
147  self.assertSequenceEqual([b'ssl'], auth_ctx['transport_security_type'])
148  self.assertSequenceEqual([b'*.test.google.com'],
149  auth_ctx['x509_common_name'])
150 
151  def _do_one_shot_client_rpc(self, channel_creds, channel_options, port,
152  expect_ssl_session_reused):
153  channel = grpc.secure_channel('localhost:{}'.format(port),
154  channel_creds,
155  options=channel_options)
156  response = channel.unary_unary(_UNARY_UNARY)(_REQUEST)
157  auth_data = pickle.loads(response)
158  self.assertEqual(expect_ssl_session_reused,
159  auth_data[_AUTH_CTX]['ssl_session_reused'])
160  channel.close()
161 
163  # Set up a secure server
164  handler = grpc.method_handlers_generic_handler('test', {
165  'UnaryUnary':
166  grpc.unary_unary_rpc_method_handler(handle_unary_unary)
167  })
168  server = test_common.test_server()
169  server.add_generic_rpc_handlers((handler,))
170  server_cred = grpc.ssl_server_credentials(_SERVER_CERTS)
171  port = server.add_secure_port('[::]:0', server_cred)
172  server.start()
173 
174  # Create a cache for TLS session tickets
175  cache = session_cache.ssl_session_cache_lru(1)
176  channel_creds = grpc.ssl_channel_credentials(
177  root_certificates=_TEST_ROOT_CERTIFICATES)
178  channel_options = _PROPERTY_OPTIONS + (
179  ('grpc.ssl_session_cache', cache),)
180 
181  # Initial connection has no session to resume
182  self._do_one_shot_client_rpc(channel_creds,
183  channel_options,
184  port,
185  expect_ssl_session_reused=[b'false'])
186 
187  # Subsequent connections resume sessions
188  self._do_one_shot_client_rpc(channel_creds,
189  channel_options,
190  port,
191  expect_ssl_session_reused=[b'true'])
192  server.stop(None)
193 
194 
195 if __name__ == '__main__':
196  logging.basicConfig()
197  unittest.main(verbosity=2)
grpc.unary_unary_rpc_method_handler
def unary_unary_rpc_method_handler(behavior, request_deserializer=None, response_serializer=None)
Definition: src/python/grpcio/grpc/__init__.py:1510
grpc.insecure_channel
def insecure_channel(target, options=None, compression=None)
Definition: src/python/grpcio/grpc/__init__.py:1962
http2_test_server.format
format
Definition: http2_test_server.py:118
tests.unit._auth_context_test.AuthContextTest
Definition: _auth_context_test.py:62
tests.unit._auth_context_test.AuthContextTest.testSecureClientCert
def testSecureClientCert(self)
Definition: _auth_context_test.py:117
tests.unit._auth_context_test.AuthContextTest.testInsecure
def testInsecure(self)
Definition: _auth_context_test.py:64
tests.unit._auth_context_test.AuthContextTest.testSecureNoCert
def testSecureNoCert(self)
Definition: _auth_context_test.py:87
grpc::experimental
Definition: include/grpcpp/channel.h:46
tests.unit._auth_context_test.AuthContextTest.testSessionResumption
def testSessionResumption(self)
Definition: _auth_context_test.py:162
grpc.ssl_server_credentials
def ssl_server_credentials(private_key_certificate_chain_pairs, root_certificates=None, require_client_auth=False)
Definition: src/python/grpcio/grpc/__init__.py:1709
grpc.method_handlers_generic_handler
def method_handlers_generic_handler(service, method_handlers)
Definition: src/python/grpcio/grpc/__init__.py:1590
tests.unit
Definition: src/python/grpcio_tests/tests/unit/__init__.py:1
grpc.ssl_channel_credentials
def ssl_channel_credentials(root_certificates=None, private_key=None, certificate_chain=None)
Definition: src/python/grpcio/grpc/__init__.py:1607
tests.unit._auth_context_test.AuthContextTest._do_one_shot_client_rpc
def _do_one_shot_client_rpc(self, channel_creds, channel_options, port, expect_ssl_session_reused)
Definition: _auth_context_test.py:151
tests.unit._auth_context_test.handle_unary_unary
def handle_unary_unary(request, servicer_context)
Definition: _auth_context_test.py:54
grpc.secure_channel
def secure_channel(target, credentials, options=None, compression=None)
Definition: src/python/grpcio/grpc/__init__.py:1982


grpc
Author(s):
autogenerated on Thu Mar 13 2025 02:58:26