examples/python/multiprocessing/server.py
Go to the documentation of this file.
1 # Copyright 2019 gRPC authors.
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 # http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14 """An example of multiprocess concurrency with gRPC."""
15 
16 from __future__ import absolute_import
17 from __future__ import division
18 from __future__ import print_function
19 
20 from concurrent import futures
21 import contextlib
22 import datetime
23 import logging
24 import math
25 import multiprocessing
26 import socket
27 import sys
28 import time
29 
30 import grpc
31 import prime_pb2
32 import prime_pb2_grpc
33 
34 _LOGGER = logging.getLogger(__name__)
35 
36 _ONE_DAY = datetime.timedelta(days=1)
37 _PROCESS_COUNT = multiprocessing.cpu_count()
38 _THREAD_CONCURRENCY = _PROCESS_COUNT
39 
40 
41 def is_prime(n):
42  for i in range(2, int(math.ceil(math.sqrt(n)))):
43  if n % i == 0:
44  return False
45  else:
46  return True
47 
48 
49 class PrimeChecker(prime_pb2_grpc.PrimeCheckerServicer):
50 
51  def check(self, request, context):
52  _LOGGER.info('Determining primality of %s', request.candidate)
53  return prime_pb2.Primality(isPrime=is_prime(request.candidate))
54 
55 
56 def _wait_forever(server):
57  try:
58  while True:
59  time.sleep(_ONE_DAY.total_seconds())
60  except KeyboardInterrupt:
61  server.stop(None)
62 
63 
64 def _run_server(bind_address):
65  """Start a server in a subprocess."""
66  _LOGGER.info('Starting new server.')
67  options = (('grpc.so_reuseport', 1),)
68 
69  server = grpc.server(futures.ThreadPoolExecutor(
70  max_workers=_THREAD_CONCURRENCY,),
71  options=options)
72  prime_pb2_grpc.add_PrimeCheckerServicer_to_server(PrimeChecker(), server)
73  server.add_insecure_port(bind_address)
74  server.start()
75  _wait_forever(server)
76 
77 
78 @contextlib.contextmanager
80  """Find and reserve a port for all subprocesses to use."""
81  sock = socket.socket(socket.AF_INET6, socket.SOCK_STREAM)
82  sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
83  if sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT) == 0:
84  raise RuntimeError("Failed to set SO_REUSEPORT.")
85  sock.bind(('', 0))
86  try:
87  yield sock.getsockname()[1]
88  finally:
89  sock.close()
90 
91 
92 def main():
93  with _reserve_port() as port:
94  bind_address = 'localhost:{}'.format(port)
95  _LOGGER.info("Binding to '%s'", bind_address)
96  sys.stdout.flush()
97  workers = []
98  for _ in range(_PROCESS_COUNT):
99  # NOTE: It is imperative that the worker subprocesses be forked before
100  # any gRPC servers start up. See
101  # https://github.com/grpc/grpc/issues/16001 for more details.
102  worker = multiprocessing.Process(target=_run_server,
103  args=(bind_address,))
104  worker.start()
105  workers.append(worker)
106  for worker in workers:
107  worker.join()
108 
109 
110 if __name__ == '__main__':
111  handler = logging.StreamHandler(sys.stdout)
112  formatter = logging.Formatter('[PID %(process)d] %(message)s')
113  handler.setFormatter(formatter)
114  _LOGGER.addHandler(handler)
115  _LOGGER.setLevel(logging.INFO)
116  main()
http2_test_server.format
format
Definition: http2_test_server.py:118
capstone.range
range
Definition: third_party/bloaty/third_party/capstone/bindings/python/capstone/__init__.py:6
xds_interop_client.int
int
Definition: xds_interop_client.py:113
server.PrimeChecker
Definition: examples/python/multiprocessing/server.py:49
server.PrimeChecker.check
def check(self, request, context)
Definition: examples/python/multiprocessing/server.py:51
server._wait_forever
def _wait_forever(server)
Definition: examples/python/multiprocessing/server.py:56
server._reserve_port
def _reserve_port()
Definition: examples/python/multiprocessing/server.py:79
grpc.server
def server(thread_pool, handlers=None, interceptors=None, options=None, maximum_concurrent_rpcs=None, compression=None, xds=False)
Definition: src/python/grpcio/grpc/__init__.py:2034
server._run_server
def _run_server(bind_address)
Definition: examples/python/multiprocessing/server.py:64
main
Definition: main.py:1
server.main
def main()
Definition: examples/python/cancellation/server.py:103
server.is_prime
def is_prime(n)
Definition: examples/python/multiprocessing/server.py:41


grpc
Author(s):
autogenerated on Thu Mar 13 2025 03:01:16