test_csds.py
Go to the documentation of this file.
1 # Copyright 2021 The gRPC Authors
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 # http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14 """A simple test to ensure that the Python wrapper can get xDS config."""
15 
16 from concurrent.futures import ThreadPoolExecutor
17 import logging
18 import os
19 import sys
20 import time
21 import unittest
22 
23 from envoy.service.status.v3 import csds_pb2
24 from envoy.service.status.v3 import csds_pb2_grpc
25 from google.protobuf import json_format
26 import grpc
27 import grpc_csds
28 from six.moves import queue
29 
30 _DUMMY_XDS_ADDRESS = 'xds:///foo.bar'
31 _DUMMY_BOOTSTRAP_FILE = """
32 {
33  \"xds_servers\": [
34  {
35  \"server_uri\": \"fake:///xds_server\",
36  \"channel_creds\": [
37  {
38  \"type\": \"fake\"
39  }
40  ],
41  \"server_features\": [\"xds_v3\"]
42  }
43  ],
44  \"node\": {
45  \"id\": \"python_test_csds\",
46  \"cluster\": \"test\",
47  \"metadata\": {
48  \"foo\": \"bar\"
49  },
50  \"locality\": {
51  \"region\": \"corp\",
52  \"zone\": \"svl\",
53  \"sub_zone\": \"mp3\"
54  }
55  }
56 }\
57 """
58 
59 
60 @unittest.skipIf(sys.version_info[0] < 3,
61  'ProtoBuf descriptor has moved on from Python2')
62 class TestCsds(unittest.TestCase):
63 
64  def setUp(self):
65  os.environ['GRPC_XDS_BOOTSTRAP_CONFIG'] = _DUMMY_BOOTSTRAP_FILE
66  self._server = grpc.server(ThreadPoolExecutor())
67  port = self._server.add_insecure_port('localhost:0')
69  self._server.start()
70 
71  self._channel = grpc.insecure_channel('localhost:%s' % port)
72  self._stub = csds_pb2_grpc.ClientStatusDiscoveryServiceStub(
73  self._channel)
74 
75  def tearDown(self):
76  self._channel.close()
77  self._server.stop(0)
78  os.environ.pop('GRPC_XDS_BOOTSTRAP_CONFIG', None)
79 
81  return self._stub.FetchClientStatus(csds_pb2.ClientStatusRequest())
82 
83  def test_has_node(self):
84  resp = self.get_xds_config_dump()
85  self.assertEqual(1, len(resp.config))
86  self.assertEqual('python_test_csds', resp.config[0].node.id)
87  self.assertEqual('test', resp.config[0].node.cluster)
88 
89  def test_no_lds_found(self):
90  dummy_channel = grpc.insecure_channel(_DUMMY_XDS_ADDRESS)
91 
92  # Force the XdsClient to initialize and request a resource
93  with self.assertRaises(grpc.RpcError) as rpc_error:
94  dummy_channel.unary_unary('')(b'', wait_for_ready=False)
95  self.assertEqual(grpc.StatusCode.UNAVAILABLE,
96  rpc_error.exception.code())
97 
98  # The resource request will fail with DOES_NOT_EXIST (after 15s)
99  while True:
100  resp = self.get_xds_config_dump()
101  config = json_format.MessageToDict(resp)
102  ok = False
103  try:
104  for xds_config in config["config"][0].get("xdsConfig", []):
105  if "listenerConfig" in xds_config:
106  listener = xds_config["listenerConfig"][
107  "dynamicListeners"][0]
108  if listener['clientStatus'] == 'DOES_NOT_EXIST':
109  ok = True
110  break
111  for generic_xds_config in config["config"][0].get(
112  "genericXdsConfigs", []):
113  if "Listener" in generic_xds_config["typeUrl"]:
114  if generic_xds_config[
115  'clientStatus'] == 'DOES_NOT_EXIST':
116  ok = True
117  break
118  except KeyError as e:
119  logging.debug("Invalid config: %s\n%s: %s", config, type(e), e)
120  pass
121  if ok:
122  break
123  time.sleep(1)
124  dummy_channel.close()
125 
126 
127 @unittest.skipIf(sys.version_info[0] < 3,
128  'ProtoBuf descriptor has moved on from Python2')
130 
132  if not hasattr(self, 'request_queue'):
133  request_queue = queue.Queue()
134  response_iterator = self._stub.StreamClientStatus(
135  iter(request_queue.get, None))
136  request_queue.put(csds_pb2.ClientStatusRequest())
137  return next(response_iterator)
138 
139 
140 if __name__ == "__main__":
141  logging.basicConfig(level=logging.DEBUG)
142  unittest.main(verbosity=2)
grpc.insecure_channel
def insecure_channel(target, options=None, compression=None)
Definition: src/python/grpcio/grpc/__init__.py:1962
get
absl::string_view get(const Cont &c)
Definition: abseil-cpp/absl/strings/str_replace_test.cc:185
test_csds.TestCsdsStream.get_xds_config_dump
def get_xds_config_dump(self)
Definition: test_csds.py:131
test_csds.TestCsds.get_xds_config_dump
def get_xds_config_dump(self)
Definition: test_csds.py:80
google::protobuf
Definition: bloaty/third_party/protobuf/benchmarks/util/data_proto2_to_proto3_util.h:12
test_csds.TestCsds
Definition: test_csds.py:62
grpc.RpcError
Definition: src/python/grpcio/grpc/__init__.py:302
test_csds.TestCsds.tearDown
def tearDown(self)
Definition: test_csds.py:75
start
static uint64_t start
Definition: benchmark-pound.c:74
test_csds.TestCsds._server
_server
Definition: test_csds.py:66
test_csds.TestCsds._channel
_channel
Definition: test_csds.py:71
test_csds.TestCsds._stub
_stub
Definition: test_csds.py:72
close
#define close
Definition: test-fs.c:48
grpc.server
def server(thread_pool, handlers=None, interceptors=None, options=None, maximum_concurrent_rpcs=None, compression=None, xds=False)
Definition: src/python/grpcio/grpc/__init__.py:2034
test_csds.TestCsds.test_no_lds_found
def test_no_lds_found(self)
Definition: test_csds.py:89
test_csds.TestCsdsStream
Definition: test_csds.py:129
next
AllocList * next[kMaxLevel]
Definition: abseil-cpp/absl/base/internal/low_level_alloc.cc:100
test_csds.TestCsds.setUp
def setUp(self)
Definition: test_csds.py:64
stop
static const char stop[]
Definition: benchmark-async-pummel.c:35
grpc_csds.add_csds_servicer
def add_csds_servicer(server)
Definition: src/python/grpcio_csds/grpc_csds/__init__.py:41
iter
Definition: test_winkernel.cpp:47
asyncio_get_stats.type
type
Definition: asyncio_get_stats.py:37
len
int len
Definition: abseil-cpp/absl/base/internal/low_level_alloc_test.cc:46
test_csds.TestCsds.test_has_node
def test_has_node(self)
Definition: test_csds.py:83


grpc
Author(s):
autogenerated on Thu Mar 13 2025 03:01:31