14 """Tests server certificate rotation.
16 Here we test various aspects of gRPC Python, and in some cases gRPC
17 Core by extension, support for server certificate rotation.
19 * ServerSSLCertReloadTestWithClientAuth: test ability to rotate
20 server's SSL cert for use in future channels with clients while not
21 affecting any existing channel. The server requires client
24 * ServerSSLCertReloadTestWithoutClientAuth: like
25 ServerSSLCertReloadTestWithClientAuth except that the server does
26 not authenticate the client.
28 * ServerSSLCertReloadTestCertConfigReuse: tests gRPC Python's ability
29 to deal with user's reuse of ServerCertificateConfiguration instances.
34 from concurrent
import futures
49 CA_1_PEM = resources.cert_hier_1_root_ca_cert()
50 CA_2_PEM = resources.cert_hier_2_root_ca_cert()
52 CLIENT_KEY_1_PEM = resources.cert_hier_1_client_1_key()
53 CLIENT_CERT_CHAIN_1_PEM = (resources.cert_hier_1_client_1_cert() +
54 resources.cert_hier_1_intermediate_ca_cert())
56 CLIENT_KEY_2_PEM = resources.cert_hier_2_client_1_key()
57 CLIENT_CERT_CHAIN_2_PEM = (resources.cert_hier_2_client_1_cert() +
58 resources.cert_hier_2_intermediate_ca_cert())
60 SERVER_KEY_1_PEM = resources.cert_hier_1_server_1_key()
61 SERVER_CERT_CHAIN_1_PEM = (resources.cert_hier_1_server_1_cert() +
62 resources.cert_hier_1_intermediate_ca_cert())
64 SERVER_KEY_2_PEM = resources.cert_hier_2_server_1_key()
65 SERVER_CERT_CHAIN_2_PEM = (resources.cert_hier_2_server_1_cert() +
66 resources.cert_hier_2_intermediate_ca_cert())
70 Call = collections.namedtuple(
'Call', [
'did_raise',
'returned_cert_config'])
82 return services_pb2_grpc.FirstServiceStub(channel)
100 assert not (should_raise
and cert_config), (
101 "should not specify both should_raise and a cert_config at the same time"
115 raise ValueError(
'just for fun, should not affect the test')
122 six.with_metaclass(abc.ABCMeta, unittest.TestCase)):
125 super(_ServerSSLCertReloadTest, self).
__init__(*args, **kwargs)
131 raise NotImplementedError()
134 self.
server = test_common.test_server()
135 services_pb2_grpc.add_FirstServiceServicer_to_server(
136 _server_application.FirstServiceServicer(), self.
server)
137 switch_cert_on_client_num = 10
139 [(SERVER_KEY_1_PEM, SERVER_CERT_CHAIN_1_PEM)],
140 root_certificates=CA_2_PEM)
146 self.
port = self.
server.add_secure_port(
'[::]:0', server_credentials)
157 request = _application_common.UNARY_UNARY_REQUEST
159 response = client_stub.UnUn(request)
160 self.assertEqual(response, _application_common.UNARY_UNARY_RESPONSE)
163 client_stub.UnUn(request)
170 self.assertTrue(exception_context.exception.code(
171 )
in [grpc.StatusCode.UNAVAILABLE, grpc.StatusCode.UNKNOWN])
175 root_certificates=None,
177 certificate_chain=None):
179 root_certificates=root_certificates,
180 private_key=private_key,
181 certificate_chain=certificate_chain)
190 root_certificates=CA_1_PEM,
191 private_key=CLIENT_KEY_2_PEM,
192 certificate_chain=CLIENT_CERT_CHAIN_2_PEM)
194 self.assertEqual(
len(actual_calls), 1)
195 self.assertFalse(actual_calls[0].did_raise)
196 self.assertIsNone(actual_calls[0].returned_cert_config)
203 root_certificates=CA_2_PEM,
204 private_key=CLIENT_KEY_2_PEM,
205 certificate_chain=CLIENT_CERT_CHAIN_2_PEM)
207 self.assertGreaterEqual(
len(actual_calls), 1)
208 self.assertFalse(actual_calls[0].did_raise)
209 for i, call
in enumerate(actual_calls):
210 self.assertFalse(call.did_raise,
'i= {}'.
format(i))
211 self.assertIsNone(call.returned_cert_config,
'i= {}'.
format(i))
217 root_certificates=CA_1_PEM,
218 private_key=CLIENT_KEY_2_PEM,
219 certificate_chain=CLIENT_CERT_CHAIN_2_PEM)
221 self.assertEqual(
len(actual_calls), 1)
222 self.assertTrue(actual_calls[0].did_raise)
223 self.assertIsNone(actual_calls[0].returned_cert_config)
231 root_certificates=CA_1_PEM,
232 private_key=CLIENT_KEY_1_PEM,
233 certificate_chain=CLIENT_CERT_CHAIN_1_PEM)
235 self.assertGreaterEqual(
len(actual_calls), 1)
236 for i, call
in enumerate(actual_calls):
237 self.assertFalse(call.did_raise,
'i= {}'.
format(i))
238 self.assertIsNone(call.returned_cert_config,
'i= {}'.
format(i))
244 root_certificates=CA_1_PEM,
245 private_key=CLIENT_KEY_2_PEM,
246 certificate_chain=CLIENT_CERT_CHAIN_2_PEM)
248 self.assertEqual(
len(actual_calls), 1)
249 self.assertFalse(actual_calls[0].did_raise)
250 self.assertIsNone(actual_calls[0].returned_cert_config)
258 root_certificates=CA_1_PEM,
259 private_key=CLIENT_KEY_2_PEM,
260 certificate_chain=CLIENT_CERT_CHAIN_2_PEM))
264 self.assertEqual(
len(actual_calls), 1)
265 self.assertFalse(actual_calls[0].did_raise)
266 self.assertIsNone(actual_calls[0].returned_cert_config)
273 root_certificates=CA_1_PEM,
274 private_key=CLIENT_KEY_2_PEM,
275 certificate_chain=CLIENT_CERT_CHAIN_2_PEM))
279 self.assertEqual(
len(actual_calls), 1)
280 self.assertFalse(actual_calls[0].did_raise)
281 self.assertIsNone(actual_calls[0].returned_cert_config)
286 [(SERVER_KEY_2_PEM, SERVER_CERT_CHAIN_2_PEM)],
287 root_certificates=CA_1_PEM)
291 root_certificates=CA_1_PEM,
292 private_key=CLIENT_KEY_2_PEM,
293 certificate_chain=CLIENT_CERT_CHAIN_2_PEM)
295 self.assertGreaterEqual(
len(actual_calls), 1)
296 self.assertFalse(actual_calls[0].did_raise)
297 for i, call
in enumerate(actual_calls):
298 self.assertFalse(call.did_raise,
'i= {}'.
format(i))
299 self.assertEqual(call.returned_cert_config, cert_config,
306 root_certificates=CA_2_PEM,
307 private_key=CLIENT_KEY_1_PEM,
308 certificate_chain=CLIENT_CERT_CHAIN_1_PEM)
310 self.assertEqual(
len(actual_calls), 1)
311 self.assertFalse(actual_calls[0].did_raise)
312 self.assertIsNone(actual_calls[0].returned_cert_config)
318 root_certificates=CA_2_PEM,
319 private_key=CLIENT_KEY_2_PEM,
320 certificate_chain=CLIENT_CERT_CHAIN_2_PEM)
322 self.assertGreaterEqual(
len(actual_calls), 1)
323 for i, call
in enumerate(actual_calls):
324 self.assertFalse(call.did_raise,
'i= {}'.
format(i))
325 self.assertIsNone(call.returned_cert_config,
'i= {}'.
format(i))
331 root_certificates=CA_1_PEM,
332 private_key=CLIENT_KEY_2_PEM,
333 certificate_chain=CLIENT_CERT_CHAIN_2_PEM)
335 self.assertGreaterEqual(
len(actual_calls), 1)
336 for i, call
in enumerate(actual_calls):
337 self.assertFalse(call.did_raise,
'i= {}'.
format(i))
338 self.assertIsNone(call.returned_cert_config,
'i= {}'.
format(i))
345 self.assertEqual(
len(actual_calls), 0)
351 self.assertEqual(
len(actual_calls), 0)
360 with self.assertRaises(TypeError):
362 with self.assertRaises(TypeError):
367 [(SERVER_KEY_2_PEM, SERVER_CERT_CHAIN_2_PEM)],
368 root_certificates=CA_1_PEM)
369 with self.assertRaises(TypeError):
371 with self.assertRaises(TypeError):
380 test = _ServerSSLCertReloadTest._test
388 test = _ServerSSLCertReloadTest._test
392 """Ensures that `ServerCertificateConfiguration` instances can be reused.
394 Because gRPC Core takes ownership of the
395 `grpc_ssl_server_certificate_config` encapsulated by
396 `ServerCertificateConfiguration`, this test reuses the same
397 `ServerCertificateConfiguration` instances multiple times to make sure
398 gRPC Python takes care of maintaining the validity of
399 `ServerCertificateConfiguration` instances, so that such instances can be
400 re-used by user application.
408 services_pb2_grpc.add_FirstServiceServicer_to_server(
409 _server_application.FirstServiceServicer(), self.
server)
411 [(SERVER_KEY_1_PEM, SERVER_CERT_CHAIN_1_PEM)],
412 root_certificates=CA_2_PEM)
414 [(SERVER_KEY_2_PEM, SERVER_CERT_CHAIN_2_PEM)],
415 root_certificates=CA_1_PEM)
420 require_client_authentication=
True)
421 self.
port = self.
server.add_secure_port(
'[::]:0', server_credentials)
430 root_certificates=CA_1_PEM,
431 private_key=CLIENT_KEY_2_PEM,
432 certificate_chain=CLIENT_CERT_CHAIN_2_PEM)
434 self.assertEqual(
len(actual_calls), 1)
435 self.assertFalse(actual_calls[0].did_raise)
436 self.assertEqual(actual_calls[0].returned_cert_config,
443 root_certificates=CA_2_PEM,
444 private_key=CLIENT_KEY_1_PEM,
445 certificate_chain=CLIENT_CERT_CHAIN_1_PEM)
447 self.assertGreaterEqual(
len(actual_calls), 1)
448 self.assertFalse(actual_calls[0].did_raise)
449 for i, call
in enumerate(actual_calls):
450 self.assertFalse(call.did_raise,
'i= {}'.
format(i))
451 self.assertEqual(call.returned_cert_config, self.
cert_config_A,
458 root_certificates=CA_1_PEM,
459 private_key=CLIENT_KEY_2_PEM,
460 certificate_chain=CLIENT_CERT_CHAIN_2_PEM)
462 self.assertEqual(
len(actual_calls), 1)
463 self.assertFalse(actual_calls[0].did_raise)
464 self.assertEqual(actual_calls[0].returned_cert_config,
471 root_certificates=CA_2_PEM,
472 private_key=CLIENT_KEY_1_PEM,
473 certificate_chain=CLIENT_CERT_CHAIN_1_PEM)
475 self.assertEqual(
len(actual_calls), 1)
476 self.assertFalse(actual_calls[0].did_raise)
477 self.assertEqual(actual_calls[0].returned_cert_config,
484 root_certificates=CA_1_PEM,
485 private_key=CLIENT_KEY_2_PEM,
486 certificate_chain=CLIENT_CERT_CHAIN_2_PEM)
488 self.assertGreaterEqual(
len(actual_calls), 1)
489 self.assertFalse(actual_calls[0].did_raise)
490 for i, call
in enumerate(actual_calls):
491 self.assertFalse(call.did_raise,
'i= {}'.
format(i))
492 self.assertEqual(call.returned_cert_config, self.
cert_config_B,
499 root_certificates=CA_2_PEM,
500 private_key=CLIENT_KEY_1_PEM,
501 certificate_chain=CLIENT_CERT_CHAIN_1_PEM)
503 self.assertEqual(
len(actual_calls), 1)
504 self.assertFalse(actual_calls[0].did_raise)
505 self.assertEqual(actual_calls[0].returned_cert_config,
509 if __name__ ==
'__main__':
510 logging.basicConfig()
511 unittest.main(verbosity=2)