examples/python/cancellation/server.py
Go to the documentation of this file.
1 # Copyright 2019 the gRPC authors.
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 # http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14 """An example of cancelling requests in gRPC."""
15 
16 from __future__ import absolute_import
17 from __future__ import division
18 from __future__ import print_function
19 
20 import argparse
21 from concurrent import futures
22 import logging
23 import threading
24 
25 import grpc
26 import search
27 
28 from examples.python.cancellation import hash_name_pb2
29 from examples.python.cancellation import hash_name_pb2_grpc
30 
31 _LOGGER = logging.getLogger(__name__)
32 _SERVER_HOST = 'localhost'
33 
34 _DESCRIPTION = "A server for finding hashes similar to names."
35 
36 
37 class HashFinder(hash_name_pb2_grpc.HashFinderServicer):
38 
39  def __init__(self, maximum_hashes):
40  super(HashFinder, self).__init__()
41  self._maximum_hashes = maximum_hashes
42 
43  def Find(self, request, context):
44  stop_event = threading.Event()
45 
46  def on_rpc_done():
47  _LOGGER.debug("Attempting to regain servicer thread.")
48  stop_event.set()
49 
50  context.add_callback(on_rpc_done)
51  candidates = []
52  try:
53  candidates = list(
54  search.search(request.desired_name,
55  request.ideal_hamming_distance, stop_event,
56  self._maximum_hashes))
58  _LOGGER.info("Cancelling RPC due to exhausted resources.")
59  context.cancel()
60  _LOGGER.debug("Servicer thread returning.")
61  if not candidates:
62  return hash_name_pb2.HashNameResponse()
63  return candidates[-1]
64 
65  def FindRange(self, request, context):
66  stop_event = threading.Event()
67 
68  def on_rpc_done():
69  _LOGGER.debug("Attempting to regain servicer thread.")
70  stop_event.set()
71 
72  context.add_callback(on_rpc_done)
73  secret_generator = search.search(
74  request.desired_name,
75  request.ideal_hamming_distance,
76  stop_event,
77  self._maximum_hashes,
78  interesting_hamming_distance=request.interesting_hamming_distance)
79  try:
80  for candidate in secret_generator:
81  yield candidate
83  _LOGGER.info("Cancelling RPC due to exhausted resources.")
84  context.cancel()
85  _LOGGER.debug("Regained servicer thread.")
86 
87 
88 def _running_server(port, maximum_hashes):
89  # We use only a single servicer thread here to demonstrate that, if managed
90  # carefully, cancelled RPCs can need not continue occupying servicers
91  # threads.
92  server = grpc.server(futures.ThreadPoolExecutor(max_workers=1),
93  maximum_concurrent_rpcs=1)
94  hash_name_pb2_grpc.add_HashFinderServicer_to_server(
95  HashFinder(maximum_hashes), server)
96  address = '{}:{}'.format(_SERVER_HOST, port)
97  actual_port = server.add_insecure_port(address)
98  server.start()
99  print("Server listening at '{}'".format(address))
100  return server
101 
102 
103 def main():
104  parser = argparse.ArgumentParser(description=_DESCRIPTION)
105  parser.add_argument('--port',
106  type=int,
107  default=50051,
108  nargs='?',
109  help='The port on which the server will listen.')
110  parser.add_argument(
111  '--maximum-hashes',
112  type=int,
113  default=1000000,
114  nargs='?',
115  help='The maximum number of hashes to search before cancelling.')
116  args = parser.parse_args()
117  server = _running_server(args.port, args.maximum_hashes)
118  server.wait_for_termination()
119 
120 
121 if __name__ == "__main__":
122  logging.basicConfig()
123  main()
http2_test_server.format
format
Definition: http2_test_server.py:118
server.HashFinder.Find
def Find(self, request, context)
Definition: examples/python/cancellation/server.py:43
server.HashFinder._maximum_hashes
_maximum_hashes
Definition: examples/python/cancellation/server.py:41
server.HashFinder.FindRange
def FindRange(self, request, context)
Definition: examples/python/cancellation/server.py:65
server.HashFinder.__init__
def __init__(self, maximum_hashes)
Definition: examples/python/cancellation/server.py:39
search.search
def search(target, ideal_distance, stop_event, maximum_hashes, interesting_hamming_distance=None)
Definition: search.py:99
search.ResourceLimitExceededError
Definition: search.py:69
grpc.server
def server(thread_pool, handlers=None, interceptors=None, options=None, maximum_concurrent_rpcs=None, compression=None, xds=False)
Definition: src/python/grpcio/grpc/__init__.py:2034
server._running_server
def _running_server(port, maximum_hashes)
Definition: examples/python/cancellation/server.py:88
main
Definition: main.py:1
server.HashFinder
Definition: examples/python/cancellation/server.py:37
server.main
def main()
Definition: examples/python/cancellation/server.py:103


grpc
Author(s):
autogenerated on Thu Mar 13 2025 03:01:16