security_test.py
Go to the documentation of this file.
1 # Copyright 2020 gRPC authors.
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 # http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14 import logging
15 
16 from absl import flags
17 from absl.testing import absltest
18 
19 from framework import xds_k8s_testcase
20 from framework.helpers import rand
21 from framework.helpers import skips
22 
23 logger = logging.getLogger(__name__)
24 flags.adopt_module_key_flags(xds_k8s_testcase)
25 
26 # Type aliases
27 _XdsTestServer = xds_k8s_testcase.XdsTestServer
28 _XdsTestClient = xds_k8s_testcase.XdsTestClient
29 _SecurityMode = xds_k8s_testcase.SecurityXdsKubernetesTestCase.SecurityMode
30 _Lang = skips.Lang
31 
32 
33 class SecurityTest(xds_k8s_testcase.SecurityXdsKubernetesTestCase):
34 
35  @staticmethod
36  def is_supported(config: skips.TestConfig) -> bool:
37  if config.client_lang in (_Lang.CPP | _Lang.GO | _Lang.JAVA |
38  _Lang.PYTHON):
39  # Versions prior to v1.41.x don't support PSM Security.
40  # https://github.com/grpc/grpc/blob/master/doc/grpc_xds_features.md
41  return config.version_gte('v1.41.x')
42  return True
43 
44  def test_mtls(self):
45  """mTLS test.
46 
47  Both client and server configured to use TLS and mTLS.
48  """
49  self.setupTrafficDirectorGrpc()
50  self.setupSecurityPolicies(server_tls=True,
51  server_mtls=True,
52  client_tls=True,
53  client_mtls=True)
54 
55  test_server: _XdsTestServer = self.startSecureTestServer()
56  self.setupServerBackends()
57  test_client: _XdsTestClient = self.startSecureTestClient(test_server)
58 
59  self.assertTestAppSecurity(_SecurityMode.MTLS, test_client, test_server)
60  self.assertSuccessfulRpcs(test_client)
61  logger.info('[SUCCESS] mTLS security mode confirmed.')
62 
63  def test_tls(self):
64  """TLS test.
65 
66  Both client and server configured to use TLS and not use mTLS.
67  """
68  self.setupTrafficDirectorGrpc()
69  self.setupSecurityPolicies(server_tls=True,
70  server_mtls=False,
71  client_tls=True,
72  client_mtls=False)
73 
74  test_server: _XdsTestServer = self.startSecureTestServer()
75  self.setupServerBackends()
76  test_client: _XdsTestClient = self.startSecureTestClient(test_server)
77 
78  self.assertTestAppSecurity(_SecurityMode.TLS, test_client, test_server)
79  self.assertSuccessfulRpcs(test_client)
80  logger.info('[SUCCESS] TLS security mode confirmed.')
81 
83  """Plain-text fallback test.
84 
85  Control plane provides no security config so both client and server
86  fallback to plaintext based on fallback-credentials.
87  """
88  self.setupTrafficDirectorGrpc()
89  self.setupSecurityPolicies(server_tls=False,
90  server_mtls=False,
91  client_tls=False,
92  client_mtls=False)
93 
94  test_server: _XdsTestServer = self.startSecureTestServer()
95  self.setupServerBackends()
96  test_client: _XdsTestClient = self.startSecureTestClient(test_server)
97 
98  self.assertTestAppSecurity(_SecurityMode.PLAINTEXT, test_client,
99  test_server)
100  self.assertSuccessfulRpcs(test_client)
101  logger.info('[SUCCESS] Plaintext security mode confirmed.')
102 
103  def test_mtls_error(self):
104  """Negative test: mTLS Error.
105 
106  Server expects client mTLS cert, but client configured only for TLS.
107 
108  Note: because this is a negative test we need to make sure the mTLS
109  failure happens after receiving the correct configuration at the
110  client. To ensure that we will perform the following steps in that
111  sequence:
112 
113  - Creation of a backendService, and attaching the backend (NEG)
114  - Creation of the Server mTLS Policy, and attaching to the ECS
115  - Creation of the Client TLS Policy, and attaching to the backendService
116  - Creation of the urlMap, targetProxy, and forwardingRule
117 
118  With this sequence we are sure that when the client receives the
119  endpoints of the backendService the security-config would also have
120  been received as confirmed by the TD team.
121  """
122  # Create backend service
123  self.td.setup_backend_for_grpc(
124  health_check_port=self.server_maintenance_port)
125 
126  # Start server and attach its NEGs to the backend service, but
127  # until they become healthy.
128  test_server: _XdsTestServer = self.startSecureTestServer()
129  self.setupServerBackends(wait_for_healthy_status=False)
130 
131  # Setup policies and attach them.
132  self.setupSecurityPolicies(server_tls=True,
133  server_mtls=True,
134  client_tls=True,
135  client_mtls=False)
136 
137  # Create the routing rule map.
138  self.td.setup_routing_rule_map_for_grpc(self.server_xds_host,
139  self.server_xds_port)
140  # Now that TD setup is complete, Backend Service can be populated
141  # with healthy backends (NEGs).
142  self.td.wait_for_backends_healthy_status()
143 
144  # Start the client, but don't wait for it to report a healthy channel.
145  test_client: _XdsTestClient = self.startSecureTestClient(
146  test_server, wait_for_active_server_channel=False)
147 
148  self.assertClientCannotReachServerRepeatedly(test_client)
149  logger.info(
150  "[SUCCESS] Client's connectivity state is consistent with a mTLS "
151  "error caused by not presenting mTLS certificate to the server.")
152 
154  """Negative test: AuthZ error.
155 
156  Client does not authorize server because of mismatched SAN name.
157  The order of operations is the same as in `test_mtls_error`.
158  """
159  # Create backend service
160  self.td.setup_backend_for_grpc(
161  health_check_port=self.server_maintenance_port)
162 
163  # Start server and attach its NEGs to the backend service, but
164  # until they become healthy.
165  test_server: _XdsTestServer = self.startSecureTestServer()
166  self.setupServerBackends(wait_for_healthy_status=False)
167 
168  # Regular TLS setup, but with client policy configured using
169  # intentionality incorrect server_namespace.
170  self.td.setup_server_security(server_namespace=self.server_namespace,
171  server_name=self.server_name,
172  server_port=self.server_port,
173  tls=True,
174  mtls=False)
175  incorrect_namespace = f'incorrect-namespace-{rand.rand_string()}'
176  self.td.setup_client_security(server_namespace=incorrect_namespace,
177  server_name=self.server_name,
178  tls=True,
179  mtls=False)
180 
181  # Create the routing rule map.
182  self.td.setup_routing_rule_map_for_grpc(self.server_xds_host,
183  self.server_xds_port)
184  # Now that TD setup is complete, Backend Service can be populated
185  # with healthy backends (NEGs).
186  self.td.wait_for_backends_healthy_status()
187 
188  # Start the client, but don't wait for it to report a healthy channel.
189  test_client: _XdsTestClient = self.startSecureTestClient(
190  test_server, wait_for_active_server_channel=False)
191 
192  self.assertClientCannotReachServerRepeatedly(test_client)
193  logger.info("[SUCCESS] Client's connectivity state is consistent with "
194  "AuthZ error caused by server presenting incorrect SAN.")
195 
196 
197 if __name__ == '__main__':
198  absltest.main()
tests.security_test.SecurityTest.test_mtls
def test_mtls(self)
Definition: security_test.py:44
tests.security_test.SecurityTest.test_server_authz_error
def test_server_authz_error(self)
Definition: security_test.py:153
tests.security_test.SecurityTest
Definition: security_test.py:33
framework.helpers
Definition: tools/run_tests/xds_k8s_test_driver/framework/helpers/__init__.py:1
tests.security_test.SecurityTest.is_supported
bool is_supported(skips.TestConfig config)
Definition: security_test.py:36
tests.security_test.SecurityTest.test_plaintext_fallback
def test_plaintext_fallback(self)
Definition: security_test.py:82
tests.security_test.SecurityTest.test_tls
def test_tls(self)
Definition: security_test.py:63
tests.security_test.SecurityTest.test_mtls_error
def test_mtls_error(self)
Definition: security_test.py:103


grpc
Author(s):
autogenerated on Thu Mar 13 2025 03:01:16