14 """An example of multiprocessing concurrency with gRPC."""
16 from __future__
import absolute_import
17 from __future__
import division
18 from __future__
import print_function
23 import multiprocessing
32 _MAXIMUM_CANDIDATE = 10000
37 _worker_channel_singleton =
None
38 _worker_stub_singleton =
None
40 _LOGGER = logging.getLogger(__name__)
44 _LOGGER.info(
'Shutting worker process down.')
45 if _worker_channel_singleton
is not None:
46 _worker_channel_singleton.stop()
50 global _worker_channel_singleton
51 global _worker_stub_singleton
52 _LOGGER.info(
'Initializing worker process.')
54 _worker_stub_singleton = prime_pb2_grpc.PrimeCheckerStub(
55 _worker_channel_singleton)
56 atexit.register(_shutdown_worker)
60 _LOGGER.info(
'Checking primality of %s.', primality_candidate)
61 return _worker_stub_singleton.check(
62 prime_pb2.PrimeCandidate(candidate=primality_candidate))
66 worker_pool = multiprocessing.Pool(processes=_PROCESS_COUNT,
67 initializer=_initialize_worker,
68 initargs=(server_address,))
69 check_range =
range(2, _MAXIMUM_CANDIDATE)
70 primality = worker_pool.map(_run_worker_query, check_range)
71 primes = zip(check_range,
map(operator.attrgetter(
'isPrime'), primality))
76 msg =
'Determine the primality of the first {} integers.'.
format(
78 parser = argparse.ArgumentParser(description=msg)
79 parser.add_argument(
'server_address',
80 help=
'The address of the server (e.g. localhost:50051)')
81 args = parser.parse_args()
86 if __name__ ==
'__main__':
87 handler = logging.StreamHandler(sys.stdout)
88 formatter = logging.Formatter(
'[PID %(process)d] %(message)s')
89 handler.setFormatter(formatter)
90 _LOGGER.addHandler(handler)
91 _LOGGER.setLevel(logging.INFO)