14 """Tests of grpc_health.v1.health."""
26 from six.moves
import queue
32 _SERVING_SERVICE =
'grpc.test.TestServiceServing'
33 _UNKNOWN_SERVICE =
'grpc.test.TestServiceUnknown'
34 _NOT_SERVING_SERVICE =
'grpc.test.TestServiceNotServing'
35 _WATCH_SERVICE =
'grpc.test.WatchService'
39 for response
in response_iterator:
40 response_queue.put(response)
45 @unittest.skipIf(sys.version_info[0] < 3,
46 'ProtoBuf descriptor has moved on from Python2')
52 experimental_non_blocking=non_blocking,
53 experimental_thread_pool=thread_pool)
55 health_pb2.HealthCheckResponse.SERVING)
57 health_pb2.HealthCheckResponse.UNKNOWN)
59 health_pb2.HealthCheckResponse.NOT_SERVING)
61 port = self.
_server.add_insecure_port(
'[::]:0')
62 health_pb2_grpc.add_HealthServicer_to_server(
74 request = health_pb2.HealthCheckRequest(service=
'')
75 response_queue = queue.Queue()
76 rendezvous = self.
_stub.Watch(request)
77 thread = threading.Thread(target=_consume_responses,
78 args=(rendezvous, response_queue))
81 response = response_queue.get(timeout=test_constants.SHORT_TIMEOUT)
82 self.assertEqual(health_pb2.HealthCheckResponse.SERVING,
87 self.assertTrue(response_queue.empty())
93 request = health_pb2.HealthCheckRequest(service=_WATCH_SERVICE)
94 response_queue = queue.Queue()
95 rendezvous = self.
_stub.Watch(request)
96 thread = threading.Thread(target=_consume_responses,
97 args=(rendezvous, response_queue))
100 response = response_queue.get(timeout=test_constants.SHORT_TIMEOUT)
101 self.assertEqual(health_pb2.HealthCheckResponse.SERVICE_UNKNOWN,
105 health_pb2.HealthCheckResponse.SERVING)
106 response = response_queue.get(timeout=test_constants.SHORT_TIMEOUT)
107 self.assertEqual(health_pb2.HealthCheckResponse.SERVING,
111 health_pb2.HealthCheckResponse.NOT_SERVING)
112 response = response_queue.get(timeout=test_constants.SHORT_TIMEOUT)
113 self.assertEqual(health_pb2.HealthCheckResponse.NOT_SERVING,
118 self.assertTrue(response_queue.empty())
121 request = health_pb2.HealthCheckRequest(service=_WATCH_SERVICE)
122 response_queue = queue.Queue()
123 rendezvous = self.
_stub.Watch(request)
124 thread = threading.Thread(target=_consume_responses,
125 args=(rendezvous, response_queue))
128 response = response_queue.get(timeout=test_constants.SHORT_TIMEOUT)
129 self.assertEqual(health_pb2.HealthCheckResponse.SERVICE_UNKNOWN,
133 health_pb2.HealthCheckResponse.SERVING)
134 with self.assertRaises(queue.Empty):
135 response_queue.get(timeout=test_constants.SHORT_TIMEOUT)
139 self.assertTrue(response_queue.empty())
142 request = health_pb2.HealthCheckRequest(service=_WATCH_SERVICE)
143 response_queue1 = queue.Queue()
144 response_queue2 = queue.Queue()
145 rendezvous1 = self.
_stub.Watch(request)
146 rendezvous2 = self.
_stub.Watch(request)
147 thread1 = threading.Thread(target=_consume_responses,
148 args=(rendezvous1, response_queue1))
149 thread2 = threading.Thread(target=_consume_responses,
150 args=(rendezvous2, response_queue2))
154 response1 = response_queue1.get(
155 timeout=test_constants.SHORT_TIMEOUT)
156 response2 = response_queue2.get(
157 timeout=test_constants.SHORT_TIMEOUT)
158 self.assertEqual(health_pb2.HealthCheckResponse.SERVICE_UNKNOWN,
160 self.assertEqual(health_pb2.HealthCheckResponse.SERVICE_UNKNOWN,
164 health_pb2.HealthCheckResponse.SERVING)
165 response1 = response_queue1.get(
166 timeout=test_constants.SHORT_TIMEOUT)
167 response2 = response_queue2.get(
168 timeout=test_constants.SHORT_TIMEOUT)
169 self.assertEqual(health_pb2.HealthCheckResponse.SERVING,
171 self.assertEqual(health_pb2.HealthCheckResponse.SERVING,
178 self.assertTrue(response_queue1.empty())
179 self.assertTrue(response_queue2.empty())
181 @unittest.skip(
"https://github.com/grpc/grpc/issues/18127")
183 request = health_pb2.HealthCheckRequest(service=_WATCH_SERVICE)
184 response_queue = queue.Queue()
185 rendezvous = self.
_stub.Watch(request)
186 thread = threading.Thread(target=_consume_responses,
187 args=(rendezvous, response_queue))
190 response = response_queue.get(timeout=test_constants.SHORT_TIMEOUT)
191 self.assertEqual(health_pb2.HealthCheckResponse.SERVICE_UNKNOWN,
196 health_pb2.HealthCheckResponse.SERVING)
200 timeout = time.time() + test_constants.TIME_ALLOWANCE
201 while (time.time() < timeout
and
202 self.
_servicer._send_response_callbacks[_WATCH_SERVICE]):
205 self.
_servicer._send_response_callbacks[_WATCH_SERVICE],
206 'watch set should be empty')
207 self.assertTrue(response_queue.empty())
210 request = health_pb2.HealthCheckRequest(service=
'')
211 response_queue = queue.Queue()
212 rendezvous = self.
_stub.Watch(request)
213 thread = threading.Thread(target=_consume_responses,
214 args=(rendezvous, response_queue))
217 response = response_queue.get(timeout=test_constants.SHORT_TIMEOUT)
218 self.assertEqual(health_pb2.HealthCheckResponse.SERVING,
222 response = response_queue.get(timeout=test_constants.SHORT_TIMEOUT)
223 self.assertEqual(health_pb2.HealthCheckResponse.NOT_SERVING,
227 self.
_servicer.
set(
'', health_pb2.HealthCheckResponse.SERVING)
231 self.assertTrue(response_queue.empty())
234 @unittest.skipIf(sys.version_info[0] < 3,
235 'ProtoBuf descriptor has moved on from Python2')
240 super(HealthServicerTest,
245 request = health_pb2.HealthCheckRequest()
247 self.assertEqual(health_pb2.HealthCheckResponse.SERVING, resp.status)
250 request = health_pb2.HealthCheckRequest(service=_SERVING_SERVICE)
252 self.assertEqual(health_pb2.HealthCheckResponse.SERVING, resp.status)
255 request = health_pb2.HealthCheckRequest(service=_UNKNOWN_SERVICE)
257 self.assertEqual(health_pb2.HealthCheckResponse.UNKNOWN, resp.status)
260 request = health_pb2.HealthCheckRequest(service=_NOT_SERVING_SERVICE)
262 self.assertEqual(health_pb2.HealthCheckResponse.NOT_SERVING,
266 request = health_pb2.HealthCheckRequest(service=
'not-found')
270 self.assertEqual(grpc.StatusCode.NOT_FOUND, context.exception.code())
273 self.assertEqual(health.SERVICE_NAME,
'grpc.health.v1.Health')
276 @unittest.skipIf(sys.version_info[0] < 3,
277 'ProtoBuf descriptor has moved on from Python2')
281 super(HealthServicerBackwardsCompatibleWatchTest,
282 self).
start_server(non_blocking=
False, thread_pool=
None)
285 if __name__ ==
'__main__':
286 logging.basicConfig()
287 unittest.main(verbosity=2)