run_channelz.py
Go to the documentation of this file.
1 # Copyright 2020 gRPC authors.
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 # http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14 """Channelz debugging tool for xDS test client/server.
15 
16 This is intended as a debugging / local development helper and not executed
17 as a part of interop test suites.
18 
19 Typical usage examples:
20 
21  # Show channel and server socket pair
22  python -m bin.run_channelz --flagfile=config/local-dev.cfg
23 
24  # Evaluate setup for different security configurations
25  python -m bin.run_channelz --flagfile=config/local-dev.cfg --security=tls
26  python -m bin.run_channelz --flagfile=config/local-dev.cfg --security=mtls_error
27 
28  # More information and usage options
29  python -m bin.run_channelz --helpfull
30 """
31 import hashlib
32 import logging
33 
34 from absl import app
35 from absl import flags
36 
37 from framework import xds_flags
38 from framework import xds_k8s_flags
39 from framework.infrastructure import k8s
40 from framework.rpc import grpc_channelz
41 from framework.test_app import client_app
42 from framework.test_app import server_app
43 
44 logger = logging.getLogger(__name__)
45 # Flags
46 _SERVER_RPC_HOST = flags.DEFINE_string('server_rpc_host',
47  default='127.0.0.1',
48  help='Server RPC host')
49 _CLIENT_RPC_HOST = flags.DEFINE_string('client_rpc_host',
50  default='127.0.0.1',
51  help='Client RPC host')
52 _SECURITY = flags.DEFINE_enum('security',
53  default=None,
54  enum_values=[
55  'mtls', 'tls', 'plaintext', 'mtls_error',
56  'server_authz_error'
57  ],
58  help='Show info for a security setup')
59 flags.adopt_module_key_flags(xds_flags)
60 flags.adopt_module_key_flags(xds_k8s_flags)
61 # Running outside of a test suite, so require explicit resource_suffix.
62 flags.mark_flag_as_required("resource_suffix")
63 
64 # Type aliases
65 _Channel = grpc_channelz.Channel
66 _Socket = grpc_channelz.Socket
67 _ChannelState = grpc_channelz.ChannelState
68 _XdsTestServer = server_app.XdsTestServer
69 _XdsTestClient = client_app.XdsTestClient
70 
71 
72 def debug_cert(cert):
73  if not cert:
74  return '<missing>'
75  sha1 = hashlib.sha1(cert)
76  return f'sha1={sha1.hexdigest()}, len={len(cert)}'
77 
78 
79 def debug_sock_tls(tls):
80  return (f'local: {debug_cert(tls.local_certificate)}\n'
81  f'remote: {debug_cert(tls.remote_certificate)}')
82 
83 
84 def get_deployment_pod_ips(k8s_ns, deployment_name):
85  deployment = k8s_ns.get_deployment(deployment_name)
86  pods = k8s_ns.list_deployment_pods(deployment)
87  return [pod.status.pod_ip for pod in pods]
88 
89 
91  """Debug negative cases: mTLS Error, Server AuthZ error
92 
93  1) mTLS Error: Server expects client mTLS cert,
94  but client configured only for TLS.
95  2) AuthZ error: Client does not authorize server because of mismatched
96  SAN name.
97  """
98  # Client side.
99  client_correct_setup = True
100  channel: _Channel = test_client.wait_for_server_channel_state(
101  state=_ChannelState.TRANSIENT_FAILURE)
102  try:
103  subchannel, *subchannels = list(
104  test_client.channelz.list_channel_subchannels(channel))
105  except ValueError:
106  print("Client setup fail: subchannel not found. "
107  "Common causes: test client didn't connect to TD; "
108  "test client exhausted retries, and closed all subchannels.")
109  return
110 
111  # Client must have exactly one subchannel.
112  logger.debug('Found subchannel, %s', subchannel)
113  if subchannels:
114  client_correct_setup = False
115  print(f'Unexpected subchannels {subchannels}')
116  subchannel_state: _ChannelState = subchannel.data.state.state
117  if subchannel_state is not _ChannelState.TRANSIENT_FAILURE:
118  client_correct_setup = False
119  print('Subchannel expected to be in '
120  'TRANSIENT_FAILURE, same as its channel')
121 
122  # Client subchannel must have no sockets.
123  sockets = list(test_client.channelz.list_subchannels_sockets(subchannel))
124  if sockets:
125  client_correct_setup = False
126  print(f'Unexpected subchannel sockets {sockets}')
127 
128  # Results.
129  if client_correct_setup:
130  print('Client setup pass: the channel '
131  'to the server has exactly one subchannel '
132  'in TRANSIENT_FAILURE, and no sockets')
133 
134 
135 def debug_security_setup_positive(test_client, test_server):
136  """Debug positive cases: mTLS, TLS, Plaintext."""
137  test_client.wait_for_active_server_channel()
138  client_sock: _Socket = test_client.get_active_server_channel_socket()
139  server_sock: _Socket = test_server.get_server_socket_matching_client(
140  client_sock)
141 
142  server_tls = server_sock.security.tls
143  client_tls = client_sock.security.tls
144 
145  print(f'\nServer certs:\n{debug_sock_tls(server_tls)}')
146  print(f'\nClient certs:\n{debug_sock_tls(client_tls)}')
147  print()
148 
149  if server_tls.local_certificate:
150  eq = server_tls.local_certificate == client_tls.remote_certificate
151  print(f'(TLS) Server local matches client remote: {eq}')
152  else:
153  print('(TLS) Not detected')
154 
155  if server_tls.remote_certificate:
156  eq = server_tls.remote_certificate == client_tls.local_certificate
157  print(f'(mTLS) Server remote matches client local: {eq}')
158  else:
159  print('(mTLS) Not detected')
160 
161 
162 def debug_basic_setup(test_client, test_server):
163  """Show channel and server socket pair"""
164  test_client.wait_for_active_server_channel()
165  client_sock: _Socket = test_client.get_active_server_channel_socket()
166  server_sock: _Socket = test_server.get_server_socket_matching_client(
167  client_sock)
168 
169  print(f'Client socket:\n{client_sock}\n')
170  print(f'Matching server:\n{server_sock}\n')
171 
172 
173 def main(argv):
174  if len(argv) > 1:
175  raise app.UsageError('Too many command-line arguments.')
176 
177  k8s_api_manager = k8s.KubernetesApiManager(xds_k8s_flags.KUBE_CONTEXT.value)
178 
179  # Resource names.
180  resource_prefix: str = xds_flags.RESOURCE_PREFIX.value
181 
182  # Server
183  server_name = xds_flags.SERVER_NAME.value
184  server_namespace = resource_prefix
185  server_k8s_ns = k8s.KubernetesNamespace(k8s_api_manager, server_namespace)
186  server_pod_ip = get_deployment_pod_ips(server_k8s_ns, server_name)[0]
187  test_server: _XdsTestServer = _XdsTestServer(
188  ip=server_pod_ip,
189  rpc_port=xds_flags.SERVER_PORT.value,
190  xds_host=xds_flags.SERVER_XDS_HOST.value,
191  xds_port=xds_flags.SERVER_XDS_PORT.value,
192  rpc_host=_SERVER_RPC_HOST.value)
193 
194  # Client
195  client_name = xds_flags.CLIENT_NAME.value
196  client_namespace = resource_prefix
197  client_k8s_ns = k8s.KubernetesNamespace(k8s_api_manager, client_namespace)
198  client_pod_ip = get_deployment_pod_ips(client_k8s_ns, client_name)[0]
199  test_client: _XdsTestClient = _XdsTestClient(
200  ip=client_pod_ip,
201  server_target=test_server.xds_uri,
202  rpc_port=xds_flags.CLIENT_PORT.value,
203  rpc_host=_CLIENT_RPC_HOST.value)
204 
205  if _SECURITY.value in ('mtls', 'tls', 'plaintext'):
206  debug_security_setup_positive(test_client, test_server)
207  elif _SECURITY.value == ('mtls_error', 'server_authz_error'):
208  debug_security_setup_negative(test_client)
209  else:
210  debug_basic_setup(test_client, test_server)
211 
212  test_client.close()
213  test_server.close()
214 
215 
216 if __name__ == '__main__':
217  app.run(main)
bin.run_channelz.debug_cert
def debug_cert(cert)
Definition: run_channelz.py:72
bin.run_channelz.debug_security_setup_positive
def debug_security_setup_positive(test_client, test_server)
Definition: run_channelz.py:135
bin.run_channelz._XdsTestServer
_XdsTestServer
Definition: run_channelz.py:68
bin.run_channelz.debug_security_setup_negative
def debug_security_setup_negative(test_client)
Definition: run_channelz.py:90
framework.rpc
Definition: tools/run_tests/xds_k8s_test_driver/framework/rpc/__init__.py:1
framework.test_app
Definition: tools/run_tests/xds_k8s_test_driver/framework/test_app/__init__.py:1
bin.run_channelz.debug_basic_setup
def debug_basic_setup(test_client, test_server)
Definition: run_channelz.py:162
framework.infrastructure
Definition: tools/run_tests/xds_k8s_test_driver/framework/infrastructure/__init__.py:1
bin.run_channelz._XdsTestClient
_XdsTestClient
Definition: run_channelz.py:69
bin.run_channelz.debug_sock_tls
def debug_sock_tls(tls)
Definition: run_channelz.py:79
len
int len
Definition: abseil-cpp/absl/base/internal/low_level_alloc_test.cc:46
bin.run_channelz.main
def main(argv)
Definition: run_channelz.py:173
bin.run_channelz.get_deployment_pod_ips
def get_deployment_pod_ips(k8s_ns, deployment_name)
Definition: run_channelz.py:84


grpc
Author(s):
autogenerated on Fri May 16 2025 03:00:08