14 """A test to ensure that admin services are registered correctly."""
16 from concurrent.futures
import ThreadPoolExecutor
25 from grpc_csds
import csds_pb2
26 from grpc_csds
import csds_pb2_grpc
29 @unittest.skipIf(sys.version_info[0] < 3,
30 'ProtoBuf descriptor has moved on from Python2')
35 port = self.
_server.add_insecure_port(
'localhost:0')
46 stub = csds_pb2_grpc.ClientStatusDiscoveryServiceStub(self.
_channel)
47 resp = stub.FetchClientStatus(csds_pb2.ClientStatusRequest())
49 self.assertGreater(
len(resp.config), 0)
52 stub = channelz_pb2_grpc.ChannelzStub(self.
_channel)
53 resp = stub.GetTopChannels(channelz_pb2.GetTopChannelsRequest())
55 self.assertGreater(
len(resp.channel), 0)
58 if __name__ ==
"__main__":
59 logging.basicConfig(level=logging.DEBUG)
60 unittest.main(verbosity=2)