14 """An example of multiprocess concurrency with gRPC."""
16 from __future__
import absolute_import
17 from __future__
import division
18 from __future__
import print_function
20 from concurrent
import futures
25 import multiprocessing
34 _LOGGER = logging.getLogger(__name__)
36 _ONE_DAY = datetime.timedelta(days=1)
37 _PROCESS_COUNT = multiprocessing.cpu_count()
38 _THREAD_CONCURRENCY = _PROCESS_COUNT
42 for i
in range(2,
int(math.ceil(math.sqrt(n)))):
51 def check(self, request, context):
52 _LOGGER.info(
'Determining primality of %s', request.candidate)
53 return prime_pb2.Primality(isPrime=
is_prime(request.candidate))
59 time.sleep(_ONE_DAY.total_seconds())
60 except KeyboardInterrupt:
65 """Start a server in a subprocess."""
66 _LOGGER.info(
'Starting new server.')
67 options = ((
'grpc.so_reuseport', 1),)
70 max_workers=_THREAD_CONCURRENCY,),
72 prime_pb2_grpc.add_PrimeCheckerServicer_to_server(
PrimeChecker(), server)
73 server.add_insecure_port(bind_address)
78 @contextlib.contextmanager
80 """Find and reserve a port for all subprocesses to use."""
81 sock = socket.socket(socket.AF_INET6, socket.SOCK_STREAM)
82 sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
83 if sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT) == 0:
84 raise RuntimeError(
"Failed to set SO_REUSEPORT.")
87 yield sock.getsockname()[1]
94 bind_address =
'localhost:{}'.
format(port)
95 _LOGGER.info(
"Binding to '%s'", bind_address)
98 for _
in range(_PROCESS_COUNT):
102 worker = multiprocessing.Process(target=_run_server,
103 args=(bind_address,))
105 workers.append(worker)
106 for worker
in workers:
110 if __name__ ==
'__main__':
111 handler = logging.StreamHandler(sys.stdout)
112 formatter = logging.Formatter(
'[PID %(process)d] %(message)s')
113 handler.setFormatter(formatter)
114 _LOGGER.addHandler(handler)
115 _LOGGER.setLevel(logging.INFO)