17 from concurrent
import futures
24 from typing
import DefaultDict, Dict, List, Mapping, Sequence, Set, Tuple
34 from src.proto.grpc.testing
import empty_pb2
35 from src.proto.grpc.testing
import messages_pb2
36 from src.proto.grpc.testing
import test_pb2
37 from src.proto.grpc.testing
import test_pb2_grpc
43 _LISTEN_HOST =
"0.0.0.0"
45 _THREAD_POOL_SIZE = 256
47 logger = logging.getLogger()
48 console_handler = logging.StreamHandler()
49 formatter = logging.Formatter(fmt=
'%(asctime)s: %(levelname)-8s %(message)s')
50 console_handler.setFormatter(formatter)
51 logger.addHandler(console_handler)
62 context.send_initial_metadata(((
'hostname', self.
_hostname),))
63 return empty_pb2.Empty()
65 def UnaryCall(self, request: messages_pb2.SimpleRequest,
67 context.send_initial_metadata(((
'hostname', self.
_hostname),))
75 maintenance_port: int) ->
None:
76 channelz.add_channelz_servicer(server)
77 listen_address = f
"{_LISTEN_HOST}:{maintenance_port}"
78 server.add_insecure_port(listen_address)
79 health_servicer = grpc_health.HealthServicer(
80 experimental_non_blocking=
True,
81 experimental_thread_pool=futures.ThreadPoolExecutor(
82 max_workers=_THREAD_POOL_SIZE))
84 health_pb2_grpc.add_HealthServicer_to_server(health_servicer, server)
86 test_pb2.DESCRIPTOR.services_by_name[
"TestService"].full_name,
87 health_pb2.DESCRIPTOR.services_by_name[
"Health"].full_name,
88 channelz_pb2.DESCRIPTOR.services_by_name[
"Channelz"].full_name,
89 reflection.SERVICE_NAME,
91 for service
in SERVICE_NAMES:
92 health_servicer.set(service, health_pb2.HealthCheckResponse.SERVING)
93 reflection.enable_server_reflection(SERVICE_NAMES, server)
97 server_id: str) ->
None:
98 test_pb2_grpc.add_TestServiceServicer_to_server(
99 TestService(server_id, socket.gethostname()), server)
100 listen_address = f
"{_LISTEN_HOST}:{port}"
102 server.add_insecure_port(listen_address)
104 logger.info(
"Running with xDS Server credentials")
107 server.add_secure_port(listen_address, server_creds)
110 def _run(port: int, maintenance_port: int, secure_mode: bool,
111 server_id: str) ->
None:
112 if port == maintenance_port:
114 futures.ThreadPoolExecutor(max_workers=_THREAD_POOL_SIZE))
118 logger.info(
"Test server listening on port %d", port)
119 logger.info(
"Maintenance server listening on port %d", maintenance_port)
120 server.wait_for_termination()
123 futures.ThreadPoolExecutor(max_workers=_THREAD_POOL_SIZE),
127 logger.info(
"Test server listening on port %d", port)
129 futures.ThreadPoolExecutor(max_workers=_THREAD_POOL_SIZE))
131 maintenance_server.start()
132 logger.info(
"Maintenance server listening on port %d", maintenance_port)
133 test_server.wait_for_termination()
134 maintenance_server.wait_for_termination()
138 if arg.lower()
in (
"true",
"yes",
"y"):
140 elif arg.lower()
in (
"false",
"no",
"n"):
143 raise argparse.ArgumentTypeError(f
"Could not parse '{arg}' as a bool.")
146 if __name__ ==
"__main__":
147 parser = argparse.ArgumentParser(
148 description=
"Run Python xDS interop server.")
149 parser.add_argument(
"--port",
152 help=
"Port for test server.")
153 parser.add_argument(
"--maintenance_port",
156 help=
"Port for servers besides test server.")
161 help=
"If specified, uses xDS to retrieve server credentials.")
162 parser.add_argument(
"--server_id",
164 default=
"python_server",
165 help=
"The server ID to return in responses..")
166 parser.add_argument(
'--verbose',
167 help=
'verbose log output',
170 args = parser.parse_args()
172 logger.setLevel(logging.DEBUG)
174 logger.setLevel(logging.INFO)
175 if args.secure_mode
and args.port == args.maintenance_port:
177 "--port and --maintenance_port must not be the same when --secure_mode is set."
179 _run(args.port, args.maintenance_port, args.secure_mode, args.server_id)