14 """Tests the gRPC Core shutdown path."""
23 _TIMEOUT_FOR_SEGFAULT = datetime.timedelta(seconds=10)
29 """Originated by https://github.com/grpc/grpc/issues/20299.
31 The grpc_shutdown happens synchronously, but there might be Core object
32 references left in Cython which might lead to ABORT or SIGSEGV.
34 connection_failed = threading.Event()
36 def on_state_change(state):
37 if state
in (grpc.ChannelConnectivity.TRANSIENT_FAILURE,
38 grpc.ChannelConnectivity.SHUTDOWN):
39 connection_failed.set()
43 channel.subscribe(on_state_change,
True)
45 deadline = datetime.datetime.now() + _TIMEOUT_FOR_SEGFAULT
47 while datetime.datetime.now() < deadline:
49 if connection_failed.is_set():
53 if __name__ ==
'__main__':
54 unittest.main(verbosity=2)