test_admin.py
Go to the documentation of this file.
1 # Copyright 2021 The gRPC Authors
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 # http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14 """A test to ensure that admin services are registered correctly."""
15 
16 from concurrent.futures import ThreadPoolExecutor
17 import logging
18 import sys
19 import unittest
20 
21 import grpc
22 import grpc_admin
23 from grpc_channelz.v1 import channelz_pb2
24 from grpc_channelz.v1 import channelz_pb2_grpc
25 from grpc_csds import csds_pb2
26 from grpc_csds import csds_pb2_grpc
27 
28 
29 @unittest.skipIf(sys.version_info[0] < 3,
30  'ProtoBuf descriptor has moved on from Python2')
31 class TestAdmin(unittest.TestCase):
32 
33  def setUp(self):
34  self._server = grpc.server(ThreadPoolExecutor())
35  port = self._server.add_insecure_port('localhost:0')
37  self._server.start()
38 
39  self._channel = grpc.insecure_channel('localhost:%s' % port)
40 
41  def tearDown(self):
42  self._channel.close()
43  self._server.stop(0)
44 
45  def test_has_csds(self):
46  stub = csds_pb2_grpc.ClientStatusDiscoveryServiceStub(self._channel)
47  resp = stub.FetchClientStatus(csds_pb2.ClientStatusRequest())
48  # No exception raised and the response is valid
49  self.assertGreater(len(resp.config), 0)
50 
51  def test_has_channelz(self):
52  stub = channelz_pb2_grpc.ChannelzStub(self._channel)
53  resp = stub.GetTopChannels(channelz_pb2.GetTopChannelsRequest())
54  # No exception raised and the response is valid
55  self.assertGreater(len(resp.channel), 0)
56 
57 
58 if __name__ == "__main__":
59  logging.basicConfig(level=logging.DEBUG)
60  unittest.main(verbosity=2)
grpc.insecure_channel
def insecure_channel(target, options=None, compression=None)
Definition: src/python/grpcio/grpc/__init__.py:1962
test_admin.TestAdmin.tearDown
def tearDown(self)
Definition: test_admin.py:41
test_admin.TestAdmin.test_has_channelz
def test_has_channelz(self)
Definition: test_admin.py:51
start
static uint64_t start
Definition: benchmark-pound.c:74
close
#define close
Definition: test-fs.c:48
test_admin.TestAdmin
Definition: test_admin.py:31
grpc.server
def server(thread_pool, handlers=None, interceptors=None, options=None, maximum_concurrent_rpcs=None, compression=None, xds=False)
Definition: src/python/grpcio/grpc/__init__.py:2034
test_admin.TestAdmin._channel
_channel
Definition: test_admin.py:39
grpc_channelz.v1
Definition: src/python/grpcio_channelz/grpc_channelz/v1/__init__.py:1
test_admin.TestAdmin.setUp
def setUp(self)
Definition: test_admin.py:33
test_admin.TestAdmin.test_has_csds
def test_has_csds(self)
Definition: test_admin.py:45
test_admin.TestAdmin._server
_server
Definition: test_admin.py:34
stop
static const char stop[]
Definition: benchmark-async-pummel.c:35
len
int len
Definition: abseil-cpp/absl/base/internal/low_level_alloc_test.cc:46
grpc_admin.add_admin_servicers
def add_admin_servicers(server)
Definition: src/python/grpcio_admin/grpc_admin/__init__.py:20


grpc
Author(s):
autogenerated on Thu Mar 13 2025 03:01:31