14 """A simple test to ensure that the Python wrapper can get xDS config."""
16 from concurrent.futures
import ThreadPoolExecutor
23 from envoy.service.status.v3
import csds_pb2
24 from envoy.service.status.v3
import csds_pb2_grpc
28 from six.moves
import queue
30 _DUMMY_XDS_ADDRESS =
'xds:///foo.bar'
31 _DUMMY_BOOTSTRAP_FILE =
"""
35 \"server_uri\": \"fake:///xds_server\",
41 \"server_features\": [\"xds_v3\"]
45 \"id\": \"python_test_csds\",
46 \"cluster\": \"test\",
60 @unittest.skipIf(sys.version_info[0] < 3,
61 'ProtoBuf descriptor has moved on from Python2')
65 os.environ[
'GRPC_XDS_BOOTSTRAP_CONFIG'] = _DUMMY_BOOTSTRAP_FILE
67 port = self.
_server.add_insecure_port(
'localhost:0')
72 self.
_stub = csds_pb2_grpc.ClientStatusDiscoveryServiceStub(
78 os.environ.pop(
'GRPC_XDS_BOOTSTRAP_CONFIG',
None)
81 return self.
_stub.FetchClientStatus(csds_pb2.ClientStatusRequest())
85 self.assertEqual(1,
len(resp.config))
86 self.assertEqual(
'python_test_csds', resp.config[0].node.id)
87 self.assertEqual(
'test', resp.config[0].node.cluster)
94 dummy_channel.unary_unary(
'')(b
'', wait_for_ready=
False)
95 self.assertEqual(grpc.StatusCode.UNAVAILABLE,
96 rpc_error.exception.code())
101 config = json_format.MessageToDict(resp)
104 for xds_config
in config[
"config"][0].
get(
"xdsConfig", []):
105 if "listenerConfig" in xds_config:
106 listener = xds_config[
"listenerConfig"][
107 "dynamicListeners"][0]
108 if listener[
'clientStatus'] ==
'DOES_NOT_EXIST':
111 for generic_xds_config
in config[
"config"][0].
get(
112 "genericXdsConfigs", []):
113 if "Listener" in generic_xds_config[
"typeUrl"]:
114 if generic_xds_config[
115 'clientStatus'] ==
'DOES_NOT_EXIST':
118 except KeyError
as e:
119 logging.debug(
"Invalid config: %s\n%s: %s", config,
type(e), e)
124 dummy_channel.close()
127 @unittest.skipIf(sys.version_info[0] < 3,
128 'ProtoBuf descriptor has moved on from Python2')
132 if not hasattr(self,
'request_queue'):
133 request_queue = queue.Queue()
134 response_iterator = self.
_stub.StreamClientStatus(
135 iter(request_queue.get,
None))
136 request_queue.put(csds_pb2.ClientStatusRequest())
137 return next(response_iterator)
140 if __name__ ==
"__main__":
141 logging.basicConfig(level=logging.DEBUG)
142 unittest.main(verbosity=2)