14 """Tests AsyncIO version of grpcio-health-checking."""
31 _SERVING_SERVICE =
'grpc.test.TestServiceServing'
32 _UNKNOWN_SERVICE =
'grpc.test.TestServiceUnknown'
33 _NOT_SERVING_SERVICE =
'grpc.test.TestServiceNotServing'
34 _WATCH_SERVICE =
'grpc.test.WatchService'
36 _LARGE_NUMBER_OF_STATUS_CHANGES = 1000
40 async
for response
in call:
41 await queue.put(response)
49 health_pb2.HealthCheckResponse.SERVING)
51 health_pb2.HealthCheckResponse.UNKNOWN)
53 health_pb2.HealthCheckResponse.NOT_SERVING)
55 port = self.
_server.add_insecure_port(
'[::]:0')
56 health_pb2_grpc.add_HealthServicer_to_server(self.
_servicer,
60 self.
_channel = aio.insecure_channel(
'localhost:%d' % port)
63 async
def tearDown(self):
67 async
def test_check_empty_service(self):
68 request = health_pb2.HealthCheckRequest()
70 self.assertEqual(health_pb2.HealthCheckResponse.SERVING, resp.status)
72 async
def test_check_serving_service(self):
73 request = health_pb2.HealthCheckRequest(service=_SERVING_SERVICE)
75 self.assertEqual(health_pb2.HealthCheckResponse.SERVING, resp.status)
77 async
def test_check_unknown_service(self):
78 request = health_pb2.HealthCheckRequest(service=_UNKNOWN_SERVICE)
80 self.assertEqual(health_pb2.HealthCheckResponse.UNKNOWN, resp.status)
82 async
def test_check_not_serving_service(self):
83 request = health_pb2.HealthCheckRequest(service=_NOT_SERVING_SERVICE)
85 self.assertEqual(health_pb2.HealthCheckResponse.NOT_SERVING,
88 async
def test_check_not_found_service(self):
89 request = health_pb2.HealthCheckRequest(service=
'not-found')
90 with self.assertRaises(aio.AioRpcError)
as context:
93 self.assertEqual(grpc.StatusCode.NOT_FOUND, context.exception.code())
95 async
def test_health_service_name(self):
96 self.assertEqual(health.SERVICE_NAME,
'grpc.health.v1.Health')
98 async
def test_watch_empty_service(self):
99 request = health_pb2.HealthCheckRequest(service=health.OVERALL_HEALTH)
101 call = self.
_stub.Watch(request)
102 queue = asyncio.Queue()
105 self.assertEqual(health_pb2.HealthCheckResponse.SERVING,
106 (await queue.get()).status)
110 with self.assertRaises(asyncio.CancelledError):
113 self.assertTrue(queue.empty())
115 async
def test_watch_new_service(self):
116 request = health_pb2.HealthCheckRequest(service=_WATCH_SERVICE)
117 call = self.
_stub.Watch(request)
118 queue = asyncio.Queue()
121 self.assertEqual(health_pb2.HealthCheckResponse.SERVICE_UNKNOWN,
122 (await queue.get()).status)
125 health_pb2.HealthCheckResponse.SERVING)
126 self.assertEqual(health_pb2.HealthCheckResponse.SERVING,
127 (await queue.get()).status)
130 health_pb2.HealthCheckResponse.NOT_SERVING)
131 self.assertEqual(health_pb2.HealthCheckResponse.NOT_SERVING,
132 (await queue.get()).status)
136 with self.assertRaises(asyncio.CancelledError):
139 self.assertTrue(queue.empty())
141 async
def test_watch_service_isolation(self):
142 request = health_pb2.HealthCheckRequest(service=_WATCH_SERVICE)
143 call = self.
_stub.Watch(request)
144 queue = asyncio.Queue()
147 self.assertEqual(health_pb2.HealthCheckResponse.SERVICE_UNKNOWN,
148 (await queue.get()).status)
151 health_pb2.HealthCheckResponse.SERVING)
154 with self.assertRaises(asyncio.TimeoutError):
155 await asyncio.wait_for(queue.get(), test_constants.SHORT_TIMEOUT)
159 with self.assertRaises(asyncio.CancelledError):
162 self.assertTrue(queue.empty())
164 async
def test_two_watchers(self):
165 request = health_pb2.HealthCheckRequest(service=_WATCH_SERVICE)
166 queue1 = asyncio.Queue()
167 queue2 = asyncio.Queue()
168 call1 = self.
_stub.Watch(request)
169 call2 = self.
_stub.Watch(request)
173 self.assertEqual(health_pb2.HealthCheckResponse.SERVICE_UNKNOWN,
174 (await queue1.get()).status)
175 self.assertEqual(health_pb2.HealthCheckResponse.SERVICE_UNKNOWN,
176 (await queue2.get()).status)
179 health_pb2.HealthCheckResponse.SERVING)
180 self.assertEqual(health_pb2.HealthCheckResponse.SERVING,
181 (await queue1.get()).status)
182 self.assertEqual(health_pb2.HealthCheckResponse.SERVING,
183 (await queue2.get()).status)
188 with self.assertRaises(asyncio.CancelledError):
191 with self.assertRaises(asyncio.CancelledError):
194 self.assertTrue(queue1.empty())
195 self.assertTrue(queue2.empty())
197 async
def test_cancelled_watch_removed_from_watch_list(self):
198 request = health_pb2.HealthCheckRequest(service=_WATCH_SERVICE)
199 call = self.
_stub.Watch(request)
200 queue = asyncio.Queue()
203 self.assertEqual(health_pb2.HealthCheckResponse.SERVICE_UNKNOWN,
204 (await queue.get()).status)
208 health_pb2.HealthCheckResponse.SERVING)
210 with self.assertRaises(asyncio.CancelledError):
214 timeout = time.monotonic() + test_constants.TIME_ALLOWANCE
215 while (time.monotonic() < timeout
and self.
_servicer._server_watchers):
216 await asyncio.sleep(1)
217 self.assertFalse(self.
_servicer._server_watchers,
218 'There should not be any watcher left')
219 self.assertTrue(queue.empty())
221 async
def test_graceful_shutdown(self):
222 request = health_pb2.HealthCheckRequest(service=health.OVERALL_HEALTH)
223 call = self.
_stub.Watch(request)
224 queue = asyncio.Queue()
227 self.assertEqual(health_pb2.HealthCheckResponse.SERVING,
228 (await queue.get()).status)
230 await self.
_servicer.enter_graceful_shutdown()
231 self.assertEqual(health_pb2.HealthCheckResponse.NOT_SERVING,
232 (await queue.get()).status)
236 health_pb2.HealthCheckResponse.SERVING)
239 self.assertEqual(health_pb2.HealthCheckResponse.NOT_SERVING,
244 with self.assertRaises(asyncio.CancelledError):
247 self.assertTrue(queue.empty())
249 async
def test_no_duplicate_status(self):
250 request = health_pb2.HealthCheckRequest(service=_WATCH_SERVICE)
251 call = self.
_stub.Watch(request)
252 queue = asyncio.Queue()
255 self.assertEqual(health_pb2.HealthCheckResponse.SERVICE_UNKNOWN,
256 (await queue.get()).status)
257 last_status = health_pb2.HealthCheckResponse.SERVICE_UNKNOWN
259 for _
in range(_LARGE_NUMBER_OF_STATUS_CHANGES):
260 if random.randint(0, 1) == 0:
261 status = health_pb2.HealthCheckResponse.SERVING
263 status = health_pb2.HealthCheckResponse.NOT_SERVING
266 if status != last_status:
267 self.assertEqual(status, (await queue.get()).status)
272 with self.assertRaises(asyncio.CancelledError):
275 self.assertTrue(queue.empty())
278 if __name__ ==
'__main__':
279 logging.basicConfig(level=logging.DEBUG)
280 unittest.main(verbosity=2)