16 from absl
import flags
17 from absl.testing
import absltest
19 from framework
import xds_k8s_testcase
23 logger = logging.getLogger(__name__)
24 flags.adopt_module_key_flags(xds_k8s_testcase)
27 _XdsTestServer = xds_k8s_testcase.XdsTestServer
28 _XdsTestClient = xds_k8s_testcase.XdsTestClient
29 _SecurityMode = xds_k8s_testcase.SecurityXdsKubernetesTestCase.SecurityMode
33 class SecurityTest(xds_k8s_testcase.SecurityXdsKubernetesTestCase):
37 if config.client_lang
in (_Lang.CPP | _Lang.GO | _Lang.JAVA |
41 return config.version_gte(
'v1.41.x')
47 Both client and server configured to use TLS and mTLS.
49 self.setupTrafficDirectorGrpc()
50 self.setupSecurityPolicies(server_tls=
True,
55 test_server: _XdsTestServer = self.startSecureTestServer()
56 self.setupServerBackends()
57 test_client: _XdsTestClient = self.startSecureTestClient(test_server)
59 self.assertTestAppSecurity(_SecurityMode.MTLS, test_client, test_server)
60 self.assertSuccessfulRpcs(test_client)
61 logger.info(
'[SUCCESS] mTLS security mode confirmed.')
66 Both client and server configured to use TLS and not use mTLS.
68 self.setupTrafficDirectorGrpc()
69 self.setupSecurityPolicies(server_tls=
True,
74 test_server: _XdsTestServer = self.startSecureTestServer()
75 self.setupServerBackends()
76 test_client: _XdsTestClient = self.startSecureTestClient(test_server)
78 self.assertTestAppSecurity(_SecurityMode.TLS, test_client, test_server)
79 self.assertSuccessfulRpcs(test_client)
80 logger.info(
'[SUCCESS] TLS security mode confirmed.')
83 """Plain-text fallback test.
85 Control plane provides no security config so both client and server
86 fallback to plaintext based on fallback-credentials.
88 self.setupTrafficDirectorGrpc()
89 self.setupSecurityPolicies(server_tls=
False,
94 test_server: _XdsTestServer = self.startSecureTestServer()
95 self.setupServerBackends()
96 test_client: _XdsTestClient = self.startSecureTestClient(test_server)
98 self.assertTestAppSecurity(_SecurityMode.PLAINTEXT, test_client,
100 self.assertSuccessfulRpcs(test_client)
101 logger.info(
'[SUCCESS] Plaintext security mode confirmed.')
104 """Negative test: mTLS Error.
106 Server expects client mTLS cert, but client configured only for TLS.
108 Note: because this is a negative test we need to make sure the mTLS
109 failure happens after receiving the correct configuration at the
110 client. To ensure that we will perform the following steps in that
113 - Creation of a backendService, and attaching the backend (NEG)
114 - Creation of the Server mTLS Policy, and attaching to the ECS
115 - Creation of the Client TLS Policy, and attaching to the backendService
116 - Creation of the urlMap, targetProxy, and forwardingRule
118 With this sequence we are sure that when the client receives the
119 endpoints of the backendService the security-config would also have
120 been received as confirmed by the TD team.
123 self.td.setup_backend_for_grpc(
124 health_check_port=self.server_maintenance_port)
128 test_server: _XdsTestServer = self.startSecureTestServer()
129 self.setupServerBackends(wait_for_healthy_status=
False)
132 self.setupSecurityPolicies(server_tls=
True,
138 self.td.setup_routing_rule_map_for_grpc(self.server_xds_host,
139 self.server_xds_port)
142 self.td.wait_for_backends_healthy_status()
145 test_client: _XdsTestClient = self.startSecureTestClient(
146 test_server, wait_for_active_server_channel=
False)
148 self.assertClientCannotReachServerRepeatedly(test_client)
150 "[SUCCESS] Client's connectivity state is consistent with a mTLS "
151 "error caused by not presenting mTLS certificate to the server.")
154 """Negative test: AuthZ error.
156 Client does not authorize server because of mismatched SAN name.
157 The order of operations is the same as in `test_mtls_error`.
160 self.td.setup_backend_for_grpc(
161 health_check_port=self.server_maintenance_port)
165 test_server: _XdsTestServer = self.startSecureTestServer()
166 self.setupServerBackends(wait_for_healthy_status=
False)
170 self.td.setup_server_security(server_namespace=self.server_namespace,
171 server_name=self.server_name,
172 server_port=self.server_port,
175 incorrect_namespace = f
'incorrect-namespace-{rand.rand_string()}'
176 self.td.setup_client_security(server_namespace=incorrect_namespace,
177 server_name=self.server_name,
182 self.td.setup_routing_rule_map_for_grpc(self.server_xds_host,
183 self.server_xds_port)
186 self.td.wait_for_backends_healthy_status()
189 test_client: _XdsTestClient = self.startSecureTestClient(
190 test_server, wait_for_active_server_channel=
False)
192 self.assertClientCannotReachServerRepeatedly(test_client)
193 logger.info(
"[SUCCESS] Client's connectivity state is consistent with "
194 "AuthZ error caused by server presenting incorrect SAN.")
197 if __name__ ==
'__main__':