14 """Tests of grpc_channelz.v1.channelz."""
16 from concurrent
import futures
28 _SUCCESSFUL_UNARY_UNARY =
'/test/SuccessfulUnaryUnary'
29 _FAILED_UNARY_UNARY =
'/test/FailedUnaryUnary'
30 _SUCCESSFUL_STREAM_STREAM =
'/test/SuccessfulStreamStream'
32 _REQUEST = b
'\x00\x00\x00'
33 _RESPONSE = b
'\x01\x01\x01'
35 _DISABLE_REUSE_PORT = ((
'grpc.so_reuseport', 0),)
36 _ENABLE_CHANNELZ = ((
'grpc.enable_channelz', 1),)
37 _DISABLE_CHANNELZ = ((
'grpc.enable_channelz', 0),)
45 servicer_context.set_code(grpc.StatusCode.INTERNAL)
46 servicer_context.set_details(
"Channelz Test Intended Failure")
50 for _
in request_iterator:
57 if handler_call_details.method == _SUCCESSFUL_UNARY_UNARY:
59 elif handler_call_details.method == _FAILED_UNARY_UNARY:
61 elif handler_call_details.method == _SUCCESSFUL_STREAM_STREAM:
63 _successful_stream_stream)
73 options=_DISABLE_REUSE_PORT +
75 port = self.
server.add_insecure_port(
'[::]:0')
90 pair.server.stop(
None)
94 @unittest.skipIf(sys.version_info[0] < 3,
95 'ProtoBuf descriptor has moved on from Python2')
99 _, r = self.
_pairs[idx].channel.unary_unary(
100 _SUCCESSFUL_UNARY_UNARY).with_call(_REQUEST)
101 self.assertEqual(r.code(), grpc.StatusCode.OK)
105 self.
_pairs[idx].channel.unary_unary(_FAILED_UNARY_UNARY).with_call(
110 self.fail(
"This call supposed to fail")
113 response_iterator = self.
_pairs[idx].channel.stream_stream(
114 _SUCCESSFUL_STREAM_STREAM).__call__(
115 iter([_REQUEST] * test_constants.STREAM_LENGTH))
117 for _
in response_iterator:
119 self.assertEqual(cnt, test_constants.STREAM_LENGTH)
122 """Channel id may not be consecutive"""
124 channelz_pb2.GetTopChannelsRequest(start_channel_id=0))
125 self.assertGreater(
len(resp.channel), idx)
126 return resp.channel[idx].ref.channel_id
133 options=_DISABLE_REUSE_PORT +
135 port = self.
_server.add_insecure_port(
'[::]:0')
136 channelz.add_channelz_servicer(self.
_server)
153 channelz_pb2.GetTopChannelsRequest(start_channel_id=0))
154 self.assertEqual(
len(resp.channel), 1)
155 self.assertEqual(resp.end,
True)
160 channelz_pb2.GetTopChannelsRequest(start_channel_id=10000))
161 self.assertEqual(
len(resp.channel), 0)
162 self.assertEqual(resp.end,
True)
169 self.assertEqual(resp.channel.data.calls_started, 1)
170 self.assertEqual(resp.channel.data.calls_succeeded, 1)
171 self.assertEqual(resp.channel.data.calls_failed, 0)
178 self.assertEqual(resp.channel.data.calls_started, 1)
179 self.assertEqual(resp.channel.data.calls_succeeded, 0)
180 self.assertEqual(resp.channel.data.calls_failed, 1)
186 for i
in range(k_success):
188 for i
in range(k_failed):
192 self.assertEqual(resp.channel.data.calls_started, k_success + k_failed)
193 self.assertEqual(resp.channel.data.calls_succeeded, k_success)
194 self.assertEqual(resp.channel.data.calls_failed, k_failed)
200 channelz_pb2.GetTopChannelsRequest(start_channel_id=0))
201 self.assertEqual(
len(resp.channel), k_channels)
208 for i
in range(k_success):
211 for i
in range(k_failed):
218 self.assertEqual(resp.channel.data.calls_started, k_success)
219 self.assertEqual(resp.channel.data.calls_succeeded, k_success)
220 self.assertEqual(resp.channel.data.calls_failed, 0)
225 self.assertEqual(resp.channel.data.calls_started, k_failed)
226 self.assertEqual(resp.channel.data.calls_succeeded, 0)
227 self.assertEqual(resp.channel.data.calls_failed, k_failed)
232 self.assertEqual(resp.channel.data.calls_started, k_success + k_failed)
233 self.assertEqual(resp.channel.data.calls_succeeded, k_success)
234 self.assertEqual(resp.channel.data.calls_failed, k_failed)
239 self.assertEqual(resp.channel.data.calls_started, 0)
240 self.assertEqual(resp.channel.data.calls_succeeded, 0)
241 self.assertEqual(resp.channel.data.calls_failed, 0)
248 for i
in range(k_success):
251 for i
in range(k_failed):
256 channelz_pb2.GetTopChannelsRequest(start_channel_id=0))
257 self.assertEqual(
len(gtc_resp.channel), k_channels)
258 for i
in range(k_channels):
260 if gtc_resp.channel[i].data.calls_started == 0:
261 self.assertEqual(
len(gtc_resp.channel[i].subchannel_ref), 0)
265 self.assertGreater(
len(gtc_resp.channel[i].subchannel_ref), 0)
267 channelz_pb2.GetSubchannelRequest(
268 subchannel_id=gtc_resp.channel[i].subchannel_ref[0].
270 self.assertEqual(gtc_resp.channel[i].data.calls_started,
271 gsc_resp.subchannel.data.calls_started)
272 self.assertEqual(gtc_resp.channel[i].data.calls_succeeded,
273 gsc_resp.subchannel.data.calls_succeeded)
274 self.assertEqual(gtc_resp.channel[i].data.calls_failed,
275 gsc_resp.subchannel.data.calls_failed)
280 channelz_pb2.GetServersRequest(start_server_id=0))
281 self.assertEqual(
len(resp.server), 1)
286 channelz_pb2.GetServersRequest(start_server_id=0))
287 self.assertEqual(
len(gss_resp.server), 1)
289 channelz_pb2.GetServerRequest(
290 server_id=gss_resp.server[0].ref.server_id))
291 self.assertEqual(gss_resp.server[0].ref.server_id,
292 gs_resp.server.ref.server_id)
298 for i
in range(k_success):
300 for i
in range(k_failed):
304 channelz_pb2.GetServersRequest(start_server_id=0))
305 self.assertEqual(
len(resp.server), 1)
306 self.assertEqual(resp.server[0].data.calls_started,
307 k_success + k_failed)
308 self.assertEqual(resp.server[0].data.calls_succeeded, k_success)
309 self.assertEqual(resp.server[0].data.calls_failed, k_failed)
316 for i
in range(k_success):
319 for i
in range(k_failed):
324 channelz_pb2.GetTopChannelsRequest(start_channel_id=0))
325 self.assertEqual(
len(gtc_resp.channel), k_channels)
326 for i
in range(k_channels):
328 if gtc_resp.channel[i].data.calls_started == 0:
329 self.assertEqual(
len(gtc_resp.channel[i].subchannel_ref), 0)
333 self.assertGreater(
len(gtc_resp.channel[i].subchannel_ref), 0)
335 channelz_pb2.GetSubchannelRequest(
336 subchannel_id=gtc_resp.channel[i].subchannel_ref[0].
338 self.assertEqual(
len(gsc_resp.subchannel.socket_ref), 1)
341 channelz_pb2.GetSocketRequest(
342 socket_id=gsc_resp.subchannel.socket_ref[0].socket_id))
343 self.assertEqual(gsc_resp.subchannel.data.calls_started,
344 gs_resp.socket.data.streams_started)
345 self.assertEqual(gsc_resp.subchannel.data.calls_started,
346 gs_resp.socket.data.streams_succeeded)
348 self.assertEqual(gsc_resp.subchannel.data.calls_started,
349 gs_resp.socket.data.messages_sent)
351 self.assertEqual(gsc_resp.subchannel.data.calls_succeeded,
352 gs_resp.socket.data.messages_received)
354 if gs_resp.socket.remote.HasField(
"tcpip_address"):
355 address = gs_resp.socket.remote.tcpip_address.ip_address
357 len(address) == 4
or len(address) == 16, address)
358 if gs_resp.socket.local.HasField(
"tcpip_address"):
359 address = gs_resp.socket.local.tcpip_address.ip_address
361 len(address) == 4
or len(address) == 16, address)
371 self.assertEqual(gc_resp.channel.data.calls_started, 1)
372 self.assertEqual(gc_resp.channel.data.calls_succeeded, 1)
373 self.assertEqual(gc_resp.channel.data.calls_failed, 0)
375 self.assertGreater(
len(gc_resp.channel.subchannel_ref), 0)
379 channelz_pb2.GetSubchannelRequest(
380 subchannel_id=gc_resp.channel.subchannel_ref[0].
382 if gsc_resp.subchannel.data.calls_started == gsc_resp.subchannel.data.calls_succeeded + gsc_resp.subchannel.data.calls_failed:
384 self.assertEqual(gsc_resp.subchannel.data.calls_started, 1)
385 self.assertEqual(gsc_resp.subchannel.data.calls_failed, 0)
386 self.assertEqual(gsc_resp.subchannel.data.calls_succeeded, 1)
388 self.assertEqual(
len(gsc_resp.subchannel.socket_ref), 1)
392 channelz_pb2.GetSocketRequest(
393 socket_id=gsc_resp.subchannel.socket_ref[0].socket_id))
394 if gs_resp.socket.data.streams_started == gs_resp.socket.data.streams_succeeded + gs_resp.socket.data.streams_failed:
396 self.assertEqual(gs_resp.socket.data.streams_started, 1)
397 self.assertEqual(gs_resp.socket.data.streams_succeeded, 1)
398 self.assertEqual(gs_resp.socket.data.streams_failed, 0)
399 self.assertEqual(gs_resp.socket.data.messages_sent,
400 test_constants.STREAM_LENGTH)
401 self.assertEqual(gs_resp.socket.data.messages_received,
402 test_constants.STREAM_LENGTH)
410 channelz_pb2.GetServersRequest(start_server_id=0))
411 self.assertEqual(
len(gs_resp.server), 1)
412 self.assertEqual(gs_resp.server[0].data.calls_started, 2)
413 self.assertEqual(gs_resp.server[0].data.calls_succeeded, 1)
414 self.assertEqual(gs_resp.server[0].data.calls_failed, 1)
417 channelz_pb2.GetServerSocketsRequest(
418 server_id=gs_resp.server[0].ref.server_id, start_socket_id=0))
426 channelz_pb2.GetServersRequest(start_server_id=0))
427 self.assertEqual(
len(gss_resp.server), 1)
428 self.assertEqual(
len(gss_resp.server[0].listen_socket), 1)
431 channelz_pb2.GetSocketRequest(
432 socket_id=gss_resp.server[0].listen_socket[0].socket_id))
440 channelz_pb2.GetServerRequest(server_id=10000))
441 except BaseException
as e:
442 self.assertIn(
'StatusCode.NOT_FOUND',
str(e))
444 self.fail(
'Invalid query not detected')
449 channelz_pb2.GetChannelRequest(channel_id=10000))
450 except BaseException
as e:
451 self.assertIn(
'StatusCode.NOT_FOUND',
str(e))
453 self.fail(
'Invalid query not detected')
458 channelz_pb2.GetSubchannelRequest(subchannel_id=10000))
459 except BaseException
as e:
460 self.assertIn(
'StatusCode.NOT_FOUND',
str(e))
462 self.fail(
'Invalid query not detected')
467 channelz_pb2.GetSocketRequest(socket_id=10000))
468 except BaseException
as e:
469 self.assertIn(
'StatusCode.NOT_FOUND',
str(e))
471 self.fail(
'Invalid query not detected')
476 channelz_pb2.GetServerSocketsRequest(
480 except BaseException
as e:
481 self.assertIn(
'StatusCode.NOT_FOUND',
str(e))
483 self.fail(
'Invalid query not detected')
486 if __name__ ==
'__main__':
487 unittest.main(verbosity=2)