_health_servicer_test.py
Go to the documentation of this file.
1 # Copyright 2016 gRPC authors.
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 # http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14 """Tests of grpc_health.v1.health."""
15 
16 import logging
17 import sys
18 import threading
19 import time
20 import unittest
21 
22 import grpc
23 from grpc_health.v1 import health
24 from grpc_health.v1 import health_pb2
25 from grpc_health.v1 import health_pb2_grpc
26 from six.moves import queue
27 
28 from tests.unit import test_common
29 from tests.unit import thread_pool
30 from tests.unit.framework.common import test_constants
31 
32 _SERVING_SERVICE = 'grpc.test.TestServiceServing'
33 _UNKNOWN_SERVICE = 'grpc.test.TestServiceUnknown'
34 _NOT_SERVING_SERVICE = 'grpc.test.TestServiceNotServing'
35 _WATCH_SERVICE = 'grpc.test.WatchService'
36 
37 
38 def _consume_responses(response_iterator, response_queue):
39  for response in response_iterator:
40  response_queue.put(response)
41 
42 
43 class BaseWatchTests(object):
44 
45  @unittest.skipIf(sys.version_info[0] < 3,
46  'ProtoBuf descriptor has moved on from Python2')
47  class WatchTests(unittest.TestCase):
48 
49  def start_server(self, non_blocking=False, thread_pool=None):
50  self._thread_pool = thread_pool
51  self._servicer = health.HealthServicer(
52  experimental_non_blocking=non_blocking,
53  experimental_thread_pool=thread_pool)
54  self._servicer.set(_SERVING_SERVICE,
55  health_pb2.HealthCheckResponse.SERVING)
56  self._servicer.set(_UNKNOWN_SERVICE,
57  health_pb2.HealthCheckResponse.UNKNOWN)
58  self._servicer.set(_NOT_SERVING_SERVICE,
59  health_pb2.HealthCheckResponse.NOT_SERVING)
60  self._server = test_common.test_server()
61  port = self._server.add_insecure_port('[::]:0')
62  health_pb2_grpc.add_HealthServicer_to_server(
63  self._servicer, self._server)
64  self._server.start()
65 
66  self._channel = grpc.insecure_channel('localhost:%d' % port)
67  self._stub = health_pb2_grpc.HealthStub(self._channel)
68 
69  def tearDown(self):
70  self._server.stop(None)
71  self._channel.close()
72 
74  request = health_pb2.HealthCheckRequest(service='')
75  response_queue = queue.Queue()
76  rendezvous = self._stub.Watch(request)
77  thread = threading.Thread(target=_consume_responses,
78  args=(rendezvous, response_queue))
79  thread.start()
80 
81  response = response_queue.get(timeout=test_constants.SHORT_TIMEOUT)
82  self.assertEqual(health_pb2.HealthCheckResponse.SERVING,
83  response.status)
84 
85  rendezvous.cancel()
86  thread.join()
87  self.assertTrue(response_queue.empty())
88 
89  if self._thread_pool is not None:
90  self.assertTrue(self._thread_pool.was_used())
91 
93  request = health_pb2.HealthCheckRequest(service=_WATCH_SERVICE)
94  response_queue = queue.Queue()
95  rendezvous = self._stub.Watch(request)
96  thread = threading.Thread(target=_consume_responses,
97  args=(rendezvous, response_queue))
98  thread.start()
99 
100  response = response_queue.get(timeout=test_constants.SHORT_TIMEOUT)
101  self.assertEqual(health_pb2.HealthCheckResponse.SERVICE_UNKNOWN,
102  response.status)
103 
104  self._servicer.set(_WATCH_SERVICE,
105  health_pb2.HealthCheckResponse.SERVING)
106  response = response_queue.get(timeout=test_constants.SHORT_TIMEOUT)
107  self.assertEqual(health_pb2.HealthCheckResponse.SERVING,
108  response.status)
109 
110  self._servicer.set(_WATCH_SERVICE,
111  health_pb2.HealthCheckResponse.NOT_SERVING)
112  response = response_queue.get(timeout=test_constants.SHORT_TIMEOUT)
113  self.assertEqual(health_pb2.HealthCheckResponse.NOT_SERVING,
114  response.status)
115 
116  rendezvous.cancel()
117  thread.join()
118  self.assertTrue(response_queue.empty())
119 
121  request = health_pb2.HealthCheckRequest(service=_WATCH_SERVICE)
122  response_queue = queue.Queue()
123  rendezvous = self._stub.Watch(request)
124  thread = threading.Thread(target=_consume_responses,
125  args=(rendezvous, response_queue))
126  thread.start()
127 
128  response = response_queue.get(timeout=test_constants.SHORT_TIMEOUT)
129  self.assertEqual(health_pb2.HealthCheckResponse.SERVICE_UNKNOWN,
130  response.status)
131 
132  self._servicer.set('some-other-service',
133  health_pb2.HealthCheckResponse.SERVING)
134  with self.assertRaises(queue.Empty):
135  response_queue.get(timeout=test_constants.SHORT_TIMEOUT)
136 
137  rendezvous.cancel()
138  thread.join()
139  self.assertTrue(response_queue.empty())
140 
141  def test_two_watchers(self):
142  request = health_pb2.HealthCheckRequest(service=_WATCH_SERVICE)
143  response_queue1 = queue.Queue()
144  response_queue2 = queue.Queue()
145  rendezvous1 = self._stub.Watch(request)
146  rendezvous2 = self._stub.Watch(request)
147  thread1 = threading.Thread(target=_consume_responses,
148  args=(rendezvous1, response_queue1))
149  thread2 = threading.Thread(target=_consume_responses,
150  args=(rendezvous2, response_queue2))
151  thread1.start()
152  thread2.start()
153 
154  response1 = response_queue1.get(
155  timeout=test_constants.SHORT_TIMEOUT)
156  response2 = response_queue2.get(
157  timeout=test_constants.SHORT_TIMEOUT)
158  self.assertEqual(health_pb2.HealthCheckResponse.SERVICE_UNKNOWN,
159  response1.status)
160  self.assertEqual(health_pb2.HealthCheckResponse.SERVICE_UNKNOWN,
161  response2.status)
162 
163  self._servicer.set(_WATCH_SERVICE,
164  health_pb2.HealthCheckResponse.SERVING)
165  response1 = response_queue1.get(
166  timeout=test_constants.SHORT_TIMEOUT)
167  response2 = response_queue2.get(
168  timeout=test_constants.SHORT_TIMEOUT)
169  self.assertEqual(health_pb2.HealthCheckResponse.SERVING,
170  response1.status)
171  self.assertEqual(health_pb2.HealthCheckResponse.SERVING,
172  response2.status)
173 
174  rendezvous1.cancel()
175  rendezvous2.cancel()
176  thread1.join()
177  thread2.join()
178  self.assertTrue(response_queue1.empty())
179  self.assertTrue(response_queue2.empty())
180 
181  @unittest.skip("https://github.com/grpc/grpc/issues/18127")
183  request = health_pb2.HealthCheckRequest(service=_WATCH_SERVICE)
184  response_queue = queue.Queue()
185  rendezvous = self._stub.Watch(request)
186  thread = threading.Thread(target=_consume_responses,
187  args=(rendezvous, response_queue))
188  thread.start()
189 
190  response = response_queue.get(timeout=test_constants.SHORT_TIMEOUT)
191  self.assertEqual(health_pb2.HealthCheckResponse.SERVICE_UNKNOWN,
192  response.status)
193 
194  rendezvous.cancel()
195  self._servicer.set(_WATCH_SERVICE,
196  health_pb2.HealthCheckResponse.SERVING)
197  thread.join()
198 
199  # Wait, if necessary, for serving thread to process client cancellation
200  timeout = time.time() + test_constants.TIME_ALLOWANCE
201  while (time.time() < timeout and
202  self._servicer._send_response_callbacks[_WATCH_SERVICE]):
203  time.sleep(1)
204  self.assertFalse(
205  self._servicer._send_response_callbacks[_WATCH_SERVICE],
206  'watch set should be empty')
207  self.assertTrue(response_queue.empty())
208 
210  request = health_pb2.HealthCheckRequest(service='')
211  response_queue = queue.Queue()
212  rendezvous = self._stub.Watch(request)
213  thread = threading.Thread(target=_consume_responses,
214  args=(rendezvous, response_queue))
215  thread.start()
216 
217  response = response_queue.get(timeout=test_constants.SHORT_TIMEOUT)
218  self.assertEqual(health_pb2.HealthCheckResponse.SERVING,
219  response.status)
220 
221  self._servicer.enter_graceful_shutdown()
222  response = response_queue.get(timeout=test_constants.SHORT_TIMEOUT)
223  self.assertEqual(health_pb2.HealthCheckResponse.NOT_SERVING,
224  response.status)
225 
226  # This should be a no-op.
227  self._servicer.set('', health_pb2.HealthCheckResponse.SERVING)
228 
229  rendezvous.cancel()
230  thread.join()
231  self.assertTrue(response_queue.empty())
232 
233 
234 @unittest.skipIf(sys.version_info[0] < 3,
235  'ProtoBuf descriptor has moved on from Python2')
237 
238  def setUp(self):
239  self._thread_pool = thread_pool.RecordingThreadPool(max_workers=None)
240  super(HealthServicerTest,
241  self).start_server(non_blocking=True,
242  thread_pool=self._thread_pool)
243 
245  request = health_pb2.HealthCheckRequest()
246  resp = self._stub.Check(request)
247  self.assertEqual(health_pb2.HealthCheckResponse.SERVING, resp.status)
248 
250  request = health_pb2.HealthCheckRequest(service=_SERVING_SERVICE)
251  resp = self._stub.Check(request)
252  self.assertEqual(health_pb2.HealthCheckResponse.SERVING, resp.status)
253 
255  request = health_pb2.HealthCheckRequest(service=_UNKNOWN_SERVICE)
256  resp = self._stub.Check(request)
257  self.assertEqual(health_pb2.HealthCheckResponse.UNKNOWN, resp.status)
258 
260  request = health_pb2.HealthCheckRequest(service=_NOT_SERVING_SERVICE)
261  resp = self._stub.Check(request)
262  self.assertEqual(health_pb2.HealthCheckResponse.NOT_SERVING,
263  resp.status)
264 
266  request = health_pb2.HealthCheckRequest(service='not-found')
267  with self.assertRaises(grpc.RpcError) as context:
268  resp = self._stub.Check(request)
269 
270  self.assertEqual(grpc.StatusCode.NOT_FOUND, context.exception.code())
271 
273  self.assertEqual(health.SERVICE_NAME, 'grpc.health.v1.Health')
274 
275 
276 @unittest.skipIf(sys.version_info[0] < 3,
277  'ProtoBuf descriptor has moved on from Python2')
279 
280  def setUp(self):
281  super(HealthServicerBackwardsCompatibleWatchTest,
282  self).start_server(non_blocking=False, thread_pool=None)
283 
284 
285 if __name__ == '__main__':
286  logging.basicConfig()
287  unittest.main(verbosity=2)
tests.health_check._health_servicer_test.BaseWatchTests
Definition: _health_servicer_test.py:43
tests.health_check._health_servicer_test.HealthServicerTest.test_check_not_serving_service
def test_check_not_serving_service(self)
Definition: _health_servicer_test.py:259
grpc.insecure_channel
def insecure_channel(target, options=None, compression=None)
Definition: src/python/grpcio/grpc/__init__.py:1962
tests.health_check._health_servicer_test.HealthServicerTest.test_health_service_name
def test_health_service_name(self)
Definition: _health_servicer_test.py:272
tests.health_check._health_servicer_test.BaseWatchTests.WatchTests._channel
_channel
Definition: _health_servicer_test.py:66
tests.health_check._health_servicer_test.BaseWatchTests.WatchTests.tearDown
def tearDown(self)
Definition: _health_servicer_test.py:69
tests.health_check._health_servicer_test.BaseWatchTests.WatchTests.test_two_watchers
def test_two_watchers(self)
Definition: _health_servicer_test.py:141
grpc.RpcError
Definition: src/python/grpcio/grpc/__init__.py:302
tests.health_check._health_servicer_test.BaseWatchTests.WatchTests.test_watch_new_service
def test_watch_new_service(self)
Definition: _health_servicer_test.py:92
tests.health_check._health_servicer_test.HealthServicerTest.test_check_not_found_service
def test_check_not_found_service(self)
Definition: _health_servicer_test.py:265
tests.health_check._health_servicer_test.BaseWatchTests.WatchTests._stub
_stub
Definition: _health_servicer_test.py:67
tests.health_check._health_servicer_test.BaseWatchTests.WatchTests.start_server
def start_server(self, non_blocking=False, thread_pool=None)
Definition: _health_servicer_test.py:49
tests.health_check._health_servicer_test.HealthServicerTest.test_check_serving_service
def test_check_serving_service(self)
Definition: _health_servicer_test.py:249
grpc_health.v1
Definition: src/python/grpcio_health_checking/grpc_health/v1/__init__.py:1
tests.health_check._health_servicer_test.BaseWatchTests.WatchTests.test_cancelled_watch_removed_from_watch_list
def test_cancelled_watch_removed_from_watch_list(self)
Definition: _health_servicer_test.py:182
start
static uint64_t start
Definition: benchmark-pound.c:74
Check
static bool Check(const CheckModeArguments &args, const EVP_MD *md, const Source &source)
Definition: digest.cc:202
tests.health_check._health_servicer_test.BaseWatchTests.WatchTests.test_graceful_shutdown
def test_graceful_shutdown(self)
Definition: _health_servicer_test.py:209
tests.health_check._health_servicer_test.HealthServicerTest
Definition: _health_servicer_test.py:236
tests.health_check._health_servicer_test.BaseWatchTests.WatchTests._servicer
_servicer
Definition: _health_servicer_test.py:51
tests.health_check._health_servicer_test.HealthServicerBackwardsCompatibleWatchTest.setUp
def setUp(self)
Definition: _health_servicer_test.py:280
close
#define close
Definition: test-fs.c:48
tests.health_check._health_servicer_test.BaseWatchTests.WatchTests.test_watch_service_isolation
def test_watch_service_isolation(self)
Definition: _health_servicer_test.py:120
tests.health_check._health_servicer_test.HealthServicerTest.setUp
def setUp(self)
Definition: _health_servicer_test.py:238
tests.health_check._health_servicer_test.BaseWatchTests.WatchTests._server
_server
Definition: _health_servicer_test.py:60
tests.health_check._health_servicer_test._consume_responses
def _consume_responses(response_iterator, response_queue)
Definition: _health_servicer_test.py:38
tests.unit
Definition: src/python/grpcio_tests/tests/unit/__init__.py:1
tests.health_check._health_servicer_test.BaseWatchTests.WatchTests._thread_pool
_thread_pool
Definition: _health_servicer_test.py:50
cpp.gmock_class.set
set
Definition: bloaty/third_party/googletest/googlemock/scripts/generator/cpp/gmock_class.py:44
tests.health_check._health_servicer_test.HealthServicerTest.test_check_unknown_service
def test_check_unknown_service(self)
Definition: _health_servicer_test.py:254
stop
static const char stop[]
Definition: benchmark-async-pummel.c:35
tests.unit.framework.common
Definition: src/python/grpcio_tests/tests/unit/framework/common/__init__.py:1
tests.health_check._health_servicer_test.HealthServicerTest.test_check_empty_service
def test_check_empty_service(self)
Definition: _health_servicer_test.py:244
tests.health_check._health_servicer_test.BaseWatchTests.WatchTests
Definition: _health_servicer_test.py:47
tests.health_check._health_servicer_test.HealthServicerBackwardsCompatibleWatchTest
Definition: _health_servicer_test.py:278
tests.health_check._health_servicer_test.BaseWatchTests.WatchTests.test_watch_empty_service
def test_watch_empty_service(self)
Definition: _health_servicer_test.py:73


grpc
Author(s):
autogenerated on Fri May 16 2025 02:57:38