14 """Tests of grpc_channelz.v1.channelz."""
29 _SUCCESSFUL_UNARY_UNARY =
'/test/SuccessfulUnaryUnary'
30 _FAILED_UNARY_UNARY =
'/test/FailedUnaryUnary'
31 _SUCCESSFUL_STREAM_STREAM =
'/test/SuccessfulStreamStream'
33 _REQUEST = b
'\x00\x00\x00'
34 _RESPONSE = b
'\x01\x01\x01'
36 _DISABLE_REUSE_PORT = ((
'grpc.so_reuseport', 0),)
37 _ENABLE_CHANNELZ = ((
'grpc.enable_channelz', 1),)
38 _DISABLE_CHANNELZ = ((
'grpc.enable_channelz', 0),)
40 _LARGE_UNASSIGNED_ID = 10000
48 servicer_context.set_code(grpc.StatusCode.INTERNAL)
49 servicer_context.set_details(
"Channelz Test Intended Failure")
53 async
for _
in request_iterator:
60 if handler_call_details.method == _SUCCESSFUL_UNARY_UNARY:
62 elif handler_call_details.method == _FAILED_UNARY_UNARY:
64 elif handler_call_details.method == _SUCCESSFUL_STREAM_STREAM:
66 _successful_stream_stream)
82 self.
server = aio.server(options=_DISABLE_REUSE_PORT + _ENABLE_CHANNELZ)
83 port = self.
server.add_insecure_port(
'[::]:0')
84 self.
address =
'localhost:%d' % port
90 options=_ENABLE_CHANNELZ)
93 resp = await channelz_stub.GetTopChannels(
94 channelz_pb2.GetTopChannelsRequest(start_channel_id=0))
95 for channel
in resp.channel:
96 if channel.data.target == self.
address:
99 resp = await channelz_stub.GetServers(
100 channelz_pb2.GetServersRequest(start_server_id=0))
109 """Create channel-server pairs."""
114 await pair.bind_channelz(channelz_stub)
128 self.
_server = aio.server(options=_DISABLE_REUSE_PORT +
130 port = self.
_server.add_insecure_port(
'[::]:0')
131 channelz.add_channelz_servicer(self.
_server)
136 self.
_channel = aio.insecure_channel(
'localhost:%d' % port,
137 options=_DISABLE_CHANNELZ)
145 """Server id may not be consecutive"""
147 channelz_pb2.GetServersRequest(start_server_id=ref_id))
148 self.assertEqual(ref_id, resp.server[0].ref.server_id)
149 return resp.server[0]
152 call = pair.channel.unary_unary(_SUCCESSFUL_UNARY_UNARY)(_REQUEST)
153 self.assertEqual(grpc.StatusCode.OK, await call.code())
157 await pair.channel.unary_unary(_FAILED_UNARY_UNARY)(_REQUEST)
161 self.fail(
"This call supposed to fail")
164 call = pair.channel.stream_stream(_SUCCESSFUL_STREAM_STREAM)(
iter(
165 [_REQUEST] * test_constants.STREAM_LENGTH))
169 self.assertEqual(cnt, test_constants.STREAM_LENGTH)
175 channelz_pb2.GetTopChannelsRequest(
176 start_channel_id=_LARGE_UNASSIGNED_ID))
177 self.assertEqual(
len(resp.channel), 0)
178 self.assertEqual(resp.end,
True)
187 channelz_pb2.GetChannelRequest(channel_id=pairs[0].channel_ref_id))
189 self.assertEqual(resp.channel.data.calls_started, 1)
190 self.assertEqual(resp.channel.data.calls_succeeded, 1)
191 self.assertEqual(resp.channel.data.calls_failed, 0)
200 channelz_pb2.GetChannelRequest(channel_id=pairs[0].channel_ref_id))
201 self.assertEqual(resp.channel.data.calls_started, 1)
202 self.assertEqual(resp.channel.data.calls_succeeded, 0)
203 self.assertEqual(resp.channel.data.calls_failed, 1)
212 for i
in range(k_success):
214 for i
in range(k_failed):
217 channelz_pb2.GetChannelRequest(channel_id=pairs[0].channel_ref_id))
218 self.assertEqual(resp.channel.data.calls_started, k_success + k_failed)
219 self.assertEqual(resp.channel.data.calls_succeeded, k_success)
220 self.assertEqual(resp.channel.data.calls_failed, k_failed)
230 for i
in range(k_success):
233 for i
in range(k_failed):
239 channelz_pb2.GetChannelRequest(channel_id=pairs[0].channel_ref_id))
240 self.assertEqual(resp.channel.data.calls_started, k_success)
241 self.assertEqual(resp.channel.data.calls_succeeded, k_success)
242 self.assertEqual(resp.channel.data.calls_failed, 0)
246 channelz_pb2.GetChannelRequest(channel_id=pairs[1].channel_ref_id))
247 self.assertEqual(resp.channel.data.calls_started, k_failed)
248 self.assertEqual(resp.channel.data.calls_succeeded, 0)
249 self.assertEqual(resp.channel.data.calls_failed, k_failed)
253 channelz_pb2.GetChannelRequest(channel_id=pairs[2].channel_ref_id))
254 self.assertEqual(resp.channel.data.calls_started, k_success + k_failed)
255 self.assertEqual(resp.channel.data.calls_succeeded, k_success)
256 self.assertEqual(resp.channel.data.calls_failed, k_failed)
260 channelz_pb2.GetChannelRequest(channel_id=pairs[3].channel_ref_id))
261 self.assertEqual(resp.channel.data.calls_started, 0)
262 self.assertEqual(resp.channel.data.calls_succeeded, 0)
263 self.assertEqual(resp.channel.data.calls_failed, 0)
273 for i
in range(k_success):
276 for i
in range(k_failed):
280 for i
in range(k_channels):
282 channelz_pb2.GetChannelRequest(
283 channel_id=pairs[i].channel_ref_id))
285 if gc_resp.channel.data.calls_started == 0:
286 self.assertEqual(
len(gc_resp.channel.subchannel_ref), 0)
290 self.assertGreater(
len(gc_resp.channel.subchannel_ref), 0)
292 channelz_pb2.GetSubchannelRequest(
293 subchannel_id=gc_resp.channel.subchannel_ref[0].
295 self.assertEqual(gc_resp.channel.data.calls_started,
296 gsc_resp.subchannel.data.calls_started)
297 self.assertEqual(gc_resp.channel.data.calls_succeeded,
298 gsc_resp.subchannel.data.calls_succeeded)
299 self.assertEqual(gc_resp.channel.data.calls_failed,
300 gsc_resp.subchannel.data.calls_failed)
309 for i
in range(k_success):
311 for i
in range(k_failed):
315 self.assertEqual(resp.data.calls_started, k_success + k_failed)
316 self.assertEqual(resp.data.calls_succeeded, k_success)
317 self.assertEqual(resp.data.calls_failed, k_failed)
327 for i
in range(k_success):
330 for i
in range(k_failed):
334 for i
in range(k_channels):
336 channelz_pb2.GetChannelRequest(
337 channel_id=pairs[i].channel_ref_id))
340 if gc_resp.channel.data.calls_started == 0:
341 self.assertEqual(
len(gc_resp.channel.subchannel_ref), 0)
345 self.assertGreater(
len(gc_resp.channel.subchannel_ref), 0)
347 channelz_pb2.GetSubchannelRequest(
348 subchannel_id=gc_resp.channel.subchannel_ref[0].
350 self.assertEqual(
len(gsc_resp.subchannel.socket_ref), 1)
353 channelz_pb2.GetSocketRequest(
354 socket_id=gsc_resp.subchannel.socket_ref[0].socket_id))
355 self.assertEqual(gsc_resp.subchannel.data.calls_started,
356 gs_resp.socket.data.streams_started)
357 self.assertEqual(0, gs_resp.socket.data.streams_failed)
359 self.assertEqual(gsc_resp.subchannel.data.calls_started,
360 gs_resp.socket.data.messages_sent)
371 channelz_pb2.GetChannelRequest(channel_id=pairs[0].channel_ref_id))
372 self.assertEqual(gc_resp.channel.data.calls_started, 1)
373 self.assertEqual(gc_resp.channel.data.calls_succeeded, 1)
374 self.assertEqual(gc_resp.channel.data.calls_failed, 0)
376 self.assertGreater(
len(gc_resp.channel.subchannel_ref), 0)
380 channelz_pb2.GetSubchannelRequest(
381 subchannel_id=gc_resp.channel.subchannel_ref[0].
383 if gsc_resp.subchannel.data.calls_started == gsc_resp.subchannel.data.calls_succeeded + gsc_resp.subchannel.data.calls_failed:
385 self.assertEqual(gsc_resp.subchannel.data.calls_started, 1)
386 self.assertEqual(gsc_resp.subchannel.data.calls_failed, 0)
387 self.assertEqual(gsc_resp.subchannel.data.calls_succeeded, 1)
389 self.assertEqual(
len(gsc_resp.subchannel.socket_ref), 1)
393 channelz_pb2.GetSocketRequest(
394 socket_id=gsc_resp.subchannel.socket_ref[0].socket_id))
395 if gs_resp.socket.data.streams_started == gs_resp.socket.data.streams_succeeded + gs_resp.socket.data.streams_failed:
397 self.assertEqual(gs_resp.socket.data.streams_started, 1)
398 self.assertEqual(gs_resp.socket.data.streams_failed, 0)
399 self.assertEqual(gs_resp.socket.data.streams_succeeded, 1)
400 self.assertEqual(gs_resp.socket.data.messages_sent,
401 test_constants.STREAM_LENGTH)
402 self.assertEqual(gs_resp.socket.data.messages_received,
403 test_constants.STREAM_LENGTH)
414 self.assertEqual(resp.data.calls_started, 2)
415 self.assertEqual(resp.data.calls_succeeded, 1)
416 self.assertEqual(resp.data.calls_failed, 1)
419 channelz_pb2.GetServerSocketsRequest(server_id=resp.ref.server_id,
429 self.assertEqual(
len(resp.listen_socket), 1)
432 channelz_pb2.GetSocketRequest(
433 socket_id=resp.listen_socket[0].socket_id))
439 with self.assertRaises(aio.AioRpcError)
as exception_context:
441 channelz_pb2.GetServerRequest(server_id=_LARGE_UNASSIGNED_ID))
442 self.assertEqual(grpc.StatusCode.NOT_FOUND,
443 exception_context.exception.code())
446 with self.assertRaises(aio.AioRpcError)
as exception_context:
448 channelz_pb2.GetChannelRequest(channel_id=_LARGE_UNASSIGNED_ID))
449 self.assertEqual(grpc.StatusCode.NOT_FOUND,
450 exception_context.exception.code())
453 with self.assertRaises(aio.AioRpcError)
as exception_context:
455 channelz_pb2.GetSubchannelRequest(
456 subchannel_id=_LARGE_UNASSIGNED_ID))
457 self.assertEqual(grpc.StatusCode.NOT_FOUND,
458 exception_context.exception.code())
461 with self.assertRaises(aio.AioRpcError)
as exception_context:
463 channelz_pb2.GetSocketRequest(socket_id=_LARGE_UNASSIGNED_ID))
464 self.assertEqual(grpc.StatusCode.NOT_FOUND,
465 exception_context.exception.code())
468 with self.assertRaises(aio.AioRpcError)
as exception_context:
470 channelz_pb2.GetServerSocketsRequest(
471 server_id=_LARGE_UNASSIGNED_ID,
474 self.assertEqual(grpc.StatusCode.NOT_FOUND,
475 exception_context.exception.code())
478 if __name__ ==
'__main__':
479 logging.basicConfig(level=logging.DEBUG)
480 unittest.main(verbosity=2)