health_servicer_test.py
Go to the documentation of this file.
1 # Copyright 2020 The gRPC Authors
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 # http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14 """Tests AsyncIO version of grpcio-health-checking."""
15 
16 import asyncio
17 import logging
18 import random
19 import time
20 import unittest
21 
22 import grpc
23 from grpc.experimental import aio
24 from grpc_health.v1 import health
25 from grpc_health.v1 import health_pb2
26 from grpc_health.v1 import health_pb2_grpc
27 
28 from tests.unit.framework.common import test_constants
29 from tests_aio.unit._test_base import AioTestBase
30 
31 _SERVING_SERVICE = 'grpc.test.TestServiceServing'
32 _UNKNOWN_SERVICE = 'grpc.test.TestServiceUnknown'
33 _NOT_SERVING_SERVICE = 'grpc.test.TestServiceNotServing'
34 _WATCH_SERVICE = 'grpc.test.WatchService'
35 
36 _LARGE_NUMBER_OF_STATUS_CHANGES = 1000
37 
38 
39 async def _pipe_to_queue(call, queue):
40  async for response in call:
41  await queue.put(response)
42 
43 
45 
46  async def setUp(self):
47  self._servicer = health.aio.HealthServicer()
48  await self._servicer.set(_SERVING_SERVICE,
49  health_pb2.HealthCheckResponse.SERVING)
50  await self._servicer.set(_UNKNOWN_SERVICE,
51  health_pb2.HealthCheckResponse.UNKNOWN)
52  await self._servicer.set(_NOT_SERVING_SERVICE,
53  health_pb2.HealthCheckResponse.NOT_SERVING)
54  self._server = aio.server()
55  port = self._server.add_insecure_port('[::]:0')
56  health_pb2_grpc.add_HealthServicer_to_server(self._servicer,
57  self._server)
58  await self._server.start()
59 
60  self._channel = aio.insecure_channel('localhost:%d' % port)
61  self._stub = health_pb2_grpc.HealthStub(self._channel)
62 
63  async def tearDown(self):
64  await self._channel.close()
65  await self._server.stop(None)
66 
67  async def test_check_empty_service(self):
68  request = health_pb2.HealthCheckRequest()
69  resp = await self._stub.Check(request)
70  self.assertEqual(health_pb2.HealthCheckResponse.SERVING, resp.status)
71 
72  async def test_check_serving_service(self):
73  request = health_pb2.HealthCheckRequest(service=_SERVING_SERVICE)
74  resp = await self._stub.Check(request)
75  self.assertEqual(health_pb2.HealthCheckResponse.SERVING, resp.status)
76 
77  async def test_check_unknown_service(self):
78  request = health_pb2.HealthCheckRequest(service=_UNKNOWN_SERVICE)
79  resp = await self._stub.Check(request)
80  self.assertEqual(health_pb2.HealthCheckResponse.UNKNOWN, resp.status)
81 
82  async def test_check_not_serving_service(self):
83  request = health_pb2.HealthCheckRequest(service=_NOT_SERVING_SERVICE)
84  resp = await self._stub.Check(request)
85  self.assertEqual(health_pb2.HealthCheckResponse.NOT_SERVING,
86  resp.status)
87 
88  async def test_check_not_found_service(self):
89  request = health_pb2.HealthCheckRequest(service='not-found')
90  with self.assertRaises(aio.AioRpcError) as context:
91  await self._stub.Check(request)
92 
93  self.assertEqual(grpc.StatusCode.NOT_FOUND, context.exception.code())
94 
95  async def test_health_service_name(self):
96  self.assertEqual(health.SERVICE_NAME, 'grpc.health.v1.Health')
97 
98  async def test_watch_empty_service(self):
99  request = health_pb2.HealthCheckRequest(service=health.OVERALL_HEALTH)
100 
101  call = self._stub.Watch(request)
102  queue = asyncio.Queue()
103  task = self.loop.create_task(_pipe_to_queue(call, queue))
104 
105  self.assertEqual(health_pb2.HealthCheckResponse.SERVING,
106  (await queue.get()).status)
107 
108  call.cancel()
109 
110  with self.assertRaises(asyncio.CancelledError):
111  await task
112 
113  self.assertTrue(queue.empty())
114 
115  async def test_watch_new_service(self):
116  request = health_pb2.HealthCheckRequest(service=_WATCH_SERVICE)
117  call = self._stub.Watch(request)
118  queue = asyncio.Queue()
119  task = self.loop.create_task(_pipe_to_queue(call, queue))
120 
121  self.assertEqual(health_pb2.HealthCheckResponse.SERVICE_UNKNOWN,
122  (await queue.get()).status)
123 
124  await self._servicer.set(_WATCH_SERVICE,
125  health_pb2.HealthCheckResponse.SERVING)
126  self.assertEqual(health_pb2.HealthCheckResponse.SERVING,
127  (await queue.get()).status)
128 
129  await self._servicer.set(_WATCH_SERVICE,
130  health_pb2.HealthCheckResponse.NOT_SERVING)
131  self.assertEqual(health_pb2.HealthCheckResponse.NOT_SERVING,
132  (await queue.get()).status)
133 
134  call.cancel()
135 
136  with self.assertRaises(asyncio.CancelledError):
137  await task
138 
139  self.assertTrue(queue.empty())
140 
141  async def test_watch_service_isolation(self):
142  request = health_pb2.HealthCheckRequest(service=_WATCH_SERVICE)
143  call = self._stub.Watch(request)
144  queue = asyncio.Queue()
145  task = self.loop.create_task(_pipe_to_queue(call, queue))
146 
147  self.assertEqual(health_pb2.HealthCheckResponse.SERVICE_UNKNOWN,
148  (await queue.get()).status)
149 
150  await self._servicer.set('some-other-service',
151  health_pb2.HealthCheckResponse.SERVING)
152  # The change of health status in other service should be isolated.
153  # Hence, no additional notification should be observed.
154  with self.assertRaises(asyncio.TimeoutError):
155  await asyncio.wait_for(queue.get(), test_constants.SHORT_TIMEOUT)
156 
157  call.cancel()
158 
159  with self.assertRaises(asyncio.CancelledError):
160  await task
161 
162  self.assertTrue(queue.empty())
163 
164  async def test_two_watchers(self):
165  request = health_pb2.HealthCheckRequest(service=_WATCH_SERVICE)
166  queue1 = asyncio.Queue()
167  queue2 = asyncio.Queue()
168  call1 = self._stub.Watch(request)
169  call2 = self._stub.Watch(request)
170  task1 = self.loop.create_task(_pipe_to_queue(call1, queue1))
171  task2 = self.loop.create_task(_pipe_to_queue(call2, queue2))
172 
173  self.assertEqual(health_pb2.HealthCheckResponse.SERVICE_UNKNOWN,
174  (await queue1.get()).status)
175  self.assertEqual(health_pb2.HealthCheckResponse.SERVICE_UNKNOWN,
176  (await queue2.get()).status)
177 
178  await self._servicer.set(_WATCH_SERVICE,
179  health_pb2.HealthCheckResponse.SERVING)
180  self.assertEqual(health_pb2.HealthCheckResponse.SERVING,
181  (await queue1.get()).status)
182  self.assertEqual(health_pb2.HealthCheckResponse.SERVING,
183  (await queue2.get()).status)
184 
185  call1.cancel()
186  call2.cancel()
187 
188  with self.assertRaises(asyncio.CancelledError):
189  await task1
190 
191  with self.assertRaises(asyncio.CancelledError):
192  await task2
193 
194  self.assertTrue(queue1.empty())
195  self.assertTrue(queue2.empty())
196 
197  async def test_cancelled_watch_removed_from_watch_list(self):
198  request = health_pb2.HealthCheckRequest(service=_WATCH_SERVICE)
199  call = self._stub.Watch(request)
200  queue = asyncio.Queue()
201  task = self.loop.create_task(_pipe_to_queue(call, queue))
202 
203  self.assertEqual(health_pb2.HealthCheckResponse.SERVICE_UNKNOWN,
204  (await queue.get()).status)
205 
206  call.cancel()
207  await self._servicer.set(_WATCH_SERVICE,
208  health_pb2.HealthCheckResponse.SERVING)
209 
210  with self.assertRaises(asyncio.CancelledError):
211  await task
212 
213  # Wait for the serving coroutine to process client cancellation.
214  timeout = time.monotonic() + test_constants.TIME_ALLOWANCE
215  while (time.monotonic() < timeout and self._servicer._server_watchers):
216  await asyncio.sleep(1)
217  self.assertFalse(self._servicer._server_watchers,
218  'There should not be any watcher left')
219  self.assertTrue(queue.empty())
220 
221  async def test_graceful_shutdown(self):
222  request = health_pb2.HealthCheckRequest(service=health.OVERALL_HEALTH)
223  call = self._stub.Watch(request)
224  queue = asyncio.Queue()
225  task = self.loop.create_task(_pipe_to_queue(call, queue))
226 
227  self.assertEqual(health_pb2.HealthCheckResponse.SERVING,
228  (await queue.get()).status)
229 
230  await self._servicer.enter_graceful_shutdown()
231  self.assertEqual(health_pb2.HealthCheckResponse.NOT_SERVING,
232  (await queue.get()).status)
233 
234  # This should be a no-op.
235  await self._servicer.set(health.OVERALL_HEALTH,
236  health_pb2.HealthCheckResponse.SERVING)
237 
238  resp = await self._stub.Check(request)
239  self.assertEqual(health_pb2.HealthCheckResponse.NOT_SERVING,
240  resp.status)
241 
242  call.cancel()
243 
244  with self.assertRaises(asyncio.CancelledError):
245  await task
246 
247  self.assertTrue(queue.empty())
248 
249  async def test_no_duplicate_status(self):
250  request = health_pb2.HealthCheckRequest(service=_WATCH_SERVICE)
251  call = self._stub.Watch(request)
252  queue = asyncio.Queue()
253  task = self.loop.create_task(_pipe_to_queue(call, queue))
254 
255  self.assertEqual(health_pb2.HealthCheckResponse.SERVICE_UNKNOWN,
256  (await queue.get()).status)
257  last_status = health_pb2.HealthCheckResponse.SERVICE_UNKNOWN
258 
259  for _ in range(_LARGE_NUMBER_OF_STATUS_CHANGES):
260  if random.randint(0, 1) == 0:
261  status = health_pb2.HealthCheckResponse.SERVING
262  else:
263  status = health_pb2.HealthCheckResponse.NOT_SERVING
264 
265  await self._servicer.set(_WATCH_SERVICE, status)
266  if status != last_status:
267  self.assertEqual(status, (await queue.get()).status)
268  last_status = status
269 
270  call.cancel()
271 
272  with self.assertRaises(asyncio.CancelledError):
273  await task
274 
275  self.assertTrue(queue.empty())
276 
277 
278 if __name__ == '__main__':
279  logging.basicConfig(level=logging.DEBUG)
280  unittest.main(verbosity=2)
tests_aio.health_check.health_servicer_test._pipe_to_queue
def _pipe_to_queue(call, queue)
Definition: health_servicer_test.py:39
tests_aio.unit._test_base.AioTestBase.loop
def loop(self)
Definition: _test_base.py:55
capstone.range
range
Definition: third_party/bloaty/third_party/capstone/bindings/python/capstone/__init__.py:6
tests_aio.unit._test_base
Definition: _test_base.py:1
tests_aio.health_check.health_servicer_test.HealthServicerTest
Definition: health_servicer_test.py:44
tests_aio.health_check.health_servicer_test.HealthServicerTest._server
_server
Definition: health_servicer_test.py:54
grpc_health.v1
Definition: src/python/grpcio_health_checking/grpc_health/v1/__init__.py:1
start
static uint64_t start
Definition: benchmark-pound.c:74
tests_aio.health_check.health_servicer_test.HealthServicerTest._servicer
_servicer
Definition: health_servicer_test.py:47
grpc::experimental
Definition: include/grpcpp/channel.h:46
Check
static bool Check(const CheckModeArguments &args, const EVP_MD *md, const Source &source)
Definition: digest.cc:202
close
#define close
Definition: test-fs.c:48
tests_aio.health_check.health_servicer_test.HealthServicerTest._stub
_stub
Definition: health_servicer_test.py:61
tests_aio.health_check.health_servicer_test.HealthServicerTest._channel
_channel
Definition: health_servicer_test.py:60
cpp.gmock_class.set
set
Definition: bloaty/third_party/googletest/googlemock/scripts/generator/cpp/gmock_class.py:44
tests_aio.health_check.health_servicer_test.HealthServicerTest.setUp
def setUp(self)
Definition: health_servicer_test.py:46
stop
static const char stop[]
Definition: benchmark-async-pummel.c:35
tests.unit.framework.common
Definition: src/python/grpcio_tests/tests/unit/framework/common/__init__.py:1
tests_aio.unit._test_base.AioTestBase
Definition: _test_base.py:49


grpc
Author(s):
autogenerated on Fri May 16 2025 02:59:01