xds_interop_server.py
Go to the documentation of this file.
1 # Copyright 2021 The gRPC authors.
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 # http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14 
15 import argparse
16 import collections
17 from concurrent import futures
18 import logging
19 import signal
20 import socket
21 import sys
22 import threading
23 import time
24 from typing import DefaultDict, Dict, List, Mapping, Sequence, Set, Tuple
25 
26 import grpc
27 from grpc_channelz.v1 import channelz
28 from grpc_channelz.v1 import channelz_pb2
29 from grpc_health.v1 import health as grpc_health
30 from grpc_health.v1 import health_pb2
31 from grpc_health.v1 import health_pb2_grpc
32 from grpc_reflection.v1alpha import reflection
33 
34 from src.proto.grpc.testing import empty_pb2
35 from src.proto.grpc.testing import messages_pb2
36 from src.proto.grpc.testing import test_pb2
37 from src.proto.grpc.testing import test_pb2_grpc
38 
39 # NOTE: This interop server is not fully compatible with all xDS interop tests.
40 # It currently only implements enough functionality to pass the xDS security
41 # tests.
42 
43 _LISTEN_HOST = "0.0.0.0"
44 
45 _THREAD_POOL_SIZE = 256
46 
47 logger = logging.getLogger()
48 console_handler = logging.StreamHandler()
49 formatter = logging.Formatter(fmt='%(asctime)s: %(levelname)-8s %(message)s')
50 console_handler.setFormatter(formatter)
51 logger.addHandler(console_handler)
52 
53 
54 class TestService(test_pb2_grpc.TestServiceServicer):
55 
56  def __init__(self, server_id, hostname):
57  self._server_id = server_id
58  self._hostname = hostname
59 
60  def EmptyCall(self, _: empty_pb2.Empty,
61  context: grpc.ServicerContext) -> empty_pb2.Empty:
62  context.send_initial_metadata((('hostname', self._hostname),))
63  return empty_pb2.Empty()
64 
65  def UnaryCall(self, request: messages_pb2.SimpleRequest,
66  context: grpc.ServicerContext) -> messages_pb2.SimpleResponse:
67  context.send_initial_metadata((('hostname', self._hostname),))
68  response = messages_pb2.SimpleResponse()
69  response.server_id = self._server_id
70  response.hostname = self._hostname
71  return response
72 
73 
74 def _configure_maintenance_server(server: grpc.Server,
75  maintenance_port: int) -> None:
76  channelz.add_channelz_servicer(server)
77  listen_address = f"{_LISTEN_HOST}:{maintenance_port}"
78  server.add_insecure_port(listen_address)
79  health_servicer = grpc_health.HealthServicer(
80  experimental_non_blocking=True,
81  experimental_thread_pool=futures.ThreadPoolExecutor(
82  max_workers=_THREAD_POOL_SIZE))
83 
84  health_pb2_grpc.add_HealthServicer_to_server(health_servicer, server)
85  SERVICE_NAMES = (
86  test_pb2.DESCRIPTOR.services_by_name["TestService"].full_name,
87  health_pb2.DESCRIPTOR.services_by_name["Health"].full_name,
88  channelz_pb2.DESCRIPTOR.services_by_name["Channelz"].full_name,
89  reflection.SERVICE_NAME,
90  )
91  for service in SERVICE_NAMES:
92  health_servicer.set(service, health_pb2.HealthCheckResponse.SERVING)
93  reflection.enable_server_reflection(SERVICE_NAMES, server)
94 
95 
96 def _configure_test_server(server: grpc.Server, port: int, secure_mode: bool,
97  server_id: str) -> None:
98  test_pb2_grpc.add_TestServiceServicer_to_server(
99  TestService(server_id, socket.gethostname()), server)
100  listen_address = f"{_LISTEN_HOST}:{port}"
101  if not secure_mode:
102  server.add_insecure_port(listen_address)
103  else:
104  logger.info("Running with xDS Server credentials")
105  server_fallback_creds = grpc.insecure_server_credentials()
106  server_creds = grpc.xds_server_credentials(server_fallback_creds)
107  server.add_secure_port(listen_address, server_creds)
108 
109 
110 def _run(port: int, maintenance_port: int, secure_mode: bool,
111  server_id: str) -> None:
112  if port == maintenance_port:
113  server = grpc.server(
114  futures.ThreadPoolExecutor(max_workers=_THREAD_POOL_SIZE))
115  _configure_test_server(server, port, secure_mode, server_id)
116  _configure_maintenance_server(server, maintenance_port)
117  server.start()
118  logger.info("Test server listening on port %d", port)
119  logger.info("Maintenance server listening on port %d", maintenance_port)
120  server.wait_for_termination()
121  else:
122  test_server = grpc.server(
123  futures.ThreadPoolExecutor(max_workers=_THREAD_POOL_SIZE),
124  xds=secure_mode)
125  _configure_test_server(test_server, port, secure_mode, server_id)
126  test_server.start()
127  logger.info("Test server listening on port %d", port)
128  maintenance_server = grpc.server(
129  futures.ThreadPoolExecutor(max_workers=_THREAD_POOL_SIZE))
130  _configure_maintenance_server(maintenance_server, maintenance_port)
131  maintenance_server.start()
132  logger.info("Maintenance server listening on port %d", maintenance_port)
133  test_server.wait_for_termination()
134  maintenance_server.wait_for_termination()
135 
136 
137 def bool_arg(arg: str) -> bool:
138  if arg.lower() in ("true", "yes", "y"):
139  return True
140  elif arg.lower() in ("false", "no", "n"):
141  return False
142  else:
143  raise argparse.ArgumentTypeError(f"Could not parse '{arg}' as a bool.")
144 
145 
146 if __name__ == "__main__":
147  parser = argparse.ArgumentParser(
148  description="Run Python xDS interop server.")
149  parser.add_argument("--port",
150  type=int,
151  default=8080,
152  help="Port for test server.")
153  parser.add_argument("--maintenance_port",
154  type=int,
155  default=8080,
156  help="Port for servers besides test server.")
157  parser.add_argument(
158  "--secure_mode",
159  type=bool_arg,
160  default="False",
161  help="If specified, uses xDS to retrieve server credentials.")
162  parser.add_argument("--server_id",
163  type=str,
164  default="python_server",
165  help="The server ID to return in responses..")
166  parser.add_argument('--verbose',
167  help='verbose log output',
168  default=False,
169  action='store_true')
170  args = parser.parse_args()
171  if args.verbose:
172  logger.setLevel(logging.DEBUG)
173  else:
174  logger.setLevel(logging.INFO)
175  if args.secure_mode and args.port == args.maintenance_port:
176  raise ValueError(
177  "--port and --maintenance_port must not be the same when --secure_mode is set."
178  )
179  _run(args.port, args.maintenance_port, args.secure_mode, args.server_id)
grpc.insecure_server_credentials
def insecure_server_credentials()
Definition: src/python/grpcio/grpc/__init__.py:1755
xds_interop_server._configure_test_server
None _configure_test_server(grpc.Server server, int port, bool secure_mode, str server_id)
Definition: xds_interop_server.py:96
grpc.xds_server_credentials
def xds_server_credentials(fallback_credentials)
Definition: src/python/grpcio/grpc/__init__.py:1743
xds_interop_server.TestService._hostname
_hostname
Definition: xds_interop_server.py:58
grpc_reflection.v1alpha
Definition: src/python/grpcio_reflection/grpc_reflection/v1alpha/__init__.py:1
xds_interop_server._configure_maintenance_server
None _configure_maintenance_server(grpc.Server server, int maintenance_port)
Definition: xds_interop_server.py:74
grpc_health.v1
Definition: src/python/grpcio_health_checking/grpc_health/v1/__init__.py:1
xds_interop_server.bool_arg
bool bool_arg(str arg)
Definition: xds_interop_server.py:137
xds_interop_server._run
None _run(int port, int maintenance_port, bool secure_mode, str server_id)
Definition: xds_interop_server.py:110
xds_interop_server.TestService._server_id
_server_id
Definition: xds_interop_server.py:57
grpc.server
def server(thread_pool, handlers=None, interceptors=None, options=None, maximum_concurrent_rpcs=None, compression=None, xds=False)
Definition: src/python/grpcio/grpc/__init__.py:2034
xds_interop_server.TestService.EmptyCall
empty_pb2.Empty EmptyCall(self, empty_pb2.Empty _, grpc.ServicerContext context)
Definition: xds_interop_server.py:60
grpc.ServicerContext
Definition: src/python/grpcio/grpc/__init__.py:1083
grpc_channelz.v1
Definition: src/python/grpcio_channelz/grpc_channelz/v1/__init__.py:1
xds_interop_server.TestService.UnaryCall
messages_pb2.SimpleResponse UnaryCall(self, messages_pb2.SimpleRequest request, grpc.ServicerContext context)
Definition: xds_interop_server.py:65
xds_interop_server.TestService.__init__
def __init__(self, server_id, hostname)
Definition: xds_interop_server.py:56
messages_pb2.SimpleResponse
SimpleResponse
Definition: messages_pb2.py:604
xds_interop_server.TestService
Definition: xds_interop_server.py:54


grpc
Author(s):
autogenerated on Fri May 16 2025 03:00:59