_exit_scenarios.py
Go to the documentation of this file.
1 # Copyright 2016 gRPC authors.
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 # http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14 """Defines a number of module-scope gRPC scenarios to test clean exit."""
15 
16 import argparse
17 import logging
18 import threading
19 import time
20 
21 import grpc
22 
23 from tests.unit.framework.common import test_constants
24 
25 WAIT_TIME = 1000
26 
27 REQUEST = b'request'
28 
29 UNSTARTED_SERVER = 'unstarted_server'
30 RUNNING_SERVER = 'running_server'
31 POLL_CONNECTIVITY_NO_SERVER = 'poll_connectivity_no_server'
32 POLL_CONNECTIVITY = 'poll_connectivity'
33 IN_FLIGHT_UNARY_UNARY_CALL = 'in_flight_unary_unary_call'
34 IN_FLIGHT_UNARY_STREAM_CALL = 'in_flight_unary_stream_call'
35 IN_FLIGHT_STREAM_UNARY_CALL = 'in_flight_stream_unary_call'
36 IN_FLIGHT_STREAM_STREAM_CALL = 'in_flight_stream_stream_call'
37 IN_FLIGHT_PARTIAL_UNARY_STREAM_CALL = 'in_flight_partial_unary_stream_call'
38 IN_FLIGHT_PARTIAL_STREAM_UNARY_CALL = 'in_flight_partial_stream_unary_call'
39 IN_FLIGHT_PARTIAL_STREAM_STREAM_CALL = 'in_flight_partial_stream_stream_call'
40 
41 UNARY_UNARY = b'/test/UnaryUnary'
42 UNARY_STREAM = b'/test/UnaryStream'
43 STREAM_UNARY = b'/test/StreamUnary'
44 STREAM_STREAM = b'/test/StreamStream'
45 PARTIAL_UNARY_STREAM = b'/test/PartialUnaryStream'
46 PARTIAL_STREAM_UNARY = b'/test/PartialStreamUnary'
47 PARTIAL_STREAM_STREAM = b'/test/PartialStreamStream'
48 
49 TEST_TO_METHOD = {
50  IN_FLIGHT_UNARY_UNARY_CALL: UNARY_UNARY,
51  IN_FLIGHT_UNARY_STREAM_CALL: UNARY_STREAM,
52  IN_FLIGHT_STREAM_UNARY_CALL: STREAM_UNARY,
53  IN_FLIGHT_STREAM_STREAM_CALL: STREAM_STREAM,
54  IN_FLIGHT_PARTIAL_UNARY_STREAM_CALL: PARTIAL_UNARY_STREAM,
55  IN_FLIGHT_PARTIAL_STREAM_UNARY_CALL: PARTIAL_STREAM_UNARY,
56  IN_FLIGHT_PARTIAL_STREAM_STREAM_CALL: PARTIAL_STREAM_STREAM,
57 }
58 
59 
60 def hang_unary_unary(request, servicer_context):
61  time.sleep(WAIT_TIME)
62 
63 
64 def hang_unary_stream(request, servicer_context):
65  time.sleep(WAIT_TIME)
66 
67 
68 def hang_partial_unary_stream(request, servicer_context):
69  for _ in range(test_constants.STREAM_LENGTH // 2):
70  yield request
71  time.sleep(WAIT_TIME)
72 
73 
74 def hang_stream_unary(request_iterator, servicer_context):
75  time.sleep(WAIT_TIME)
76 
77 
78 def hang_partial_stream_unary(request_iterator, servicer_context):
79  for _ in range(test_constants.STREAM_LENGTH // 2):
80  next(request_iterator)
81  time.sleep(WAIT_TIME)
82 
83 
84 def hang_stream_stream(request_iterator, servicer_context):
85  time.sleep(WAIT_TIME)
86 
87 
88 def hang_partial_stream_stream(request_iterator, servicer_context):
89  for _ in range(test_constants.STREAM_LENGTH // 2):
90  yield next(request_iterator) #pylint: disable=stop-iteration-return
91  time.sleep(WAIT_TIME)
92 
93 
95 
96  def __init__(self, request_streaming, response_streaming, partial_hang):
97  self.request_streaming = request_streaming
98  self.response_streaming = response_streaming
101  self.unary_unary = None
102  self.unary_stream = None
103  self.stream_unary = None
104  self.stream_stream = None
105  if self.request_streaming and self.response_streaming:
106  if partial_hang:
107  self.stream_stream = hang_partial_stream_stream
108  else:
109  self.stream_stream = hang_stream_stream
110  elif self.request_streaming:
111  if partial_hang:
112  self.stream_unary = hang_partial_stream_unary
113  else:
114  self.stream_unary = hang_stream_unary
115  elif self.response_streaming:
116  if partial_hang:
117  self.unary_stream = hang_partial_unary_stream
118  else:
119  self.unary_stream = hang_unary_stream
120  else:
121  self.unary_unary = hang_unary_unary
122 
123 
125 
126  def service(self, handler_call_details):
127  if handler_call_details.method == UNARY_UNARY:
128  return MethodHandler(False, False, False)
129  elif handler_call_details.method == UNARY_STREAM:
130  return MethodHandler(False, True, False)
131  elif handler_call_details.method == STREAM_UNARY:
132  return MethodHandler(True, False, False)
133  elif handler_call_details.method == STREAM_STREAM:
134  return MethodHandler(True, True, False)
135  elif handler_call_details.method == PARTIAL_UNARY_STREAM:
136  return MethodHandler(False, True, True)
137  elif handler_call_details.method == PARTIAL_STREAM_UNARY:
138  return MethodHandler(True, False, True)
139  elif handler_call_details.method == PARTIAL_STREAM_STREAM:
140  return MethodHandler(True, True, True)
141  else:
142  return None
143 
144 
145 # Traditional executors will not exit until all their
146 # current jobs complete. Because we submit jobs that will
147 # never finish, we don't want to block exit on these jobs.
148 class DaemonPool(object):
149 
150  def submit(self, fn, *args, **kwargs):
151  thread = threading.Thread(target=fn, args=args, kwargs=kwargs)
152  thread.daemon = True
153  thread.start()
154 
155  def shutdown(self, wait=True):
156  pass
157 
158 
160  while True:
161  yield REQUEST
162 
163 
164 if __name__ == '__main__':
165  logging.basicConfig()
166  parser = argparse.ArgumentParser()
167  parser.add_argument('scenario', type=str)
168  parser.add_argument('--wait_for_interrupt',
169  dest='wait_for_interrupt',
170  action='store_true')
171  args = parser.parse_args()
172 
173  if args.scenario == UNSTARTED_SERVER:
174  server = grpc.server(DaemonPool(), options=(('grpc.so_reuseport', 0),))
175  if args.wait_for_interrupt:
176  time.sleep(WAIT_TIME)
177  elif args.scenario == RUNNING_SERVER:
178  server = grpc.server(DaemonPool(), options=(('grpc.so_reuseport', 0),))
179  port = server.add_insecure_port('[::]:0')
180  server.start()
181  if args.wait_for_interrupt:
182  time.sleep(WAIT_TIME)
183  elif args.scenario == POLL_CONNECTIVITY_NO_SERVER:
184  channel = grpc.insecure_channel('localhost:12345')
185 
186  def connectivity_callback(connectivity):
187  pass
188 
189  channel.subscribe(connectivity_callback, try_to_connect=True)
190  if args.wait_for_interrupt:
191  time.sleep(WAIT_TIME)
192  elif args.scenario == POLL_CONNECTIVITY:
193  server = grpc.server(DaemonPool(), options=(('grpc.so_reuseport', 0),))
194  port = server.add_insecure_port('[::]:0')
195  server.start()
196  channel = grpc.insecure_channel('localhost:%d' % port)
197 
198  def connectivity_callback(connectivity):
199  pass
200 
201  channel.subscribe(connectivity_callback, try_to_connect=True)
202  if args.wait_for_interrupt:
203  time.sleep(WAIT_TIME)
204 
205  else:
206  handler = GenericHandler()
207  server = grpc.server(DaemonPool(), options=(('grpc.so_reuseport', 0),))
208  port = server.add_insecure_port('[::]:0')
209  server.add_generic_rpc_handlers((handler,))
210  server.start()
211  channel = grpc.insecure_channel('localhost:%d' % port)
212 
213  method = TEST_TO_METHOD[args.scenario]
214 
215  if args.scenario == IN_FLIGHT_UNARY_UNARY_CALL:
216  multi_callable = channel.unary_unary(method)
217  future = multi_callable.future(REQUEST)
218  result, call = multi_callable.with_call(REQUEST)
219  elif (args.scenario == IN_FLIGHT_UNARY_STREAM_CALL or
220  args.scenario == IN_FLIGHT_PARTIAL_UNARY_STREAM_CALL):
221  multi_callable = channel.unary_stream(method)
222  response_iterator = multi_callable(REQUEST)
223  for response in response_iterator:
224  pass
225  elif (args.scenario == IN_FLIGHT_STREAM_UNARY_CALL or
226  args.scenario == IN_FLIGHT_PARTIAL_STREAM_UNARY_CALL):
227  multi_callable = channel.stream_unary(method)
228  future = multi_callable.future(infinite_request_iterator())
229  result, call = multi_callable.with_call(
230  iter([REQUEST] * test_constants.STREAM_LENGTH))
231  elif (args.scenario == IN_FLIGHT_STREAM_STREAM_CALL or
232  args.scenario == IN_FLIGHT_PARTIAL_STREAM_STREAM_CALL):
233  multi_callable = channel.stream_stream(method)
234  response_iterator = multi_callable(infinite_request_iterator())
235  for response in response_iterator:
236  pass
grpc.insecure_channel
def insecure_channel(target, options=None, compression=None)
Definition: src/python/grpcio/grpc/__init__.py:1962
tests.unit._exit_scenarios.connectivity_callback
connectivity_callback
Definition: _exit_scenarios.py:189
tests.unit._exit_scenarios.MethodHandler.unary_stream
unary_stream
Definition: _exit_scenarios.py:102
tests.unit._exit_scenarios.MethodHandler.stream_stream
stream_stream
Definition: _exit_scenarios.py:104
tests.unit._exit_scenarios.infinite_request_iterator
def infinite_request_iterator()
Definition: _exit_scenarios.py:159
capstone.range
range
Definition: third_party/bloaty/third_party/capstone/bindings/python/capstone/__init__.py:6
tests.unit._exit_scenarios.MethodHandler.stream_unary
stream_unary
Definition: _exit_scenarios.py:103
tests.unit._exit_scenarios.hang_unary_unary
def hang_unary_unary(request, servicer_context)
Definition: _exit_scenarios.py:60
tests.unit._exit_scenarios.hang_partial_stream_unary
def hang_partial_stream_unary(request_iterator, servicer_context)
Definition: _exit_scenarios.py:78
tests.unit._exit_scenarios.MethodHandler.unary_unary
unary_unary
Definition: _exit_scenarios.py:101
tests.unit._exit_scenarios.MethodHandler.request_deserializer
request_deserializer
Definition: _exit_scenarios.py:99
tests.unit._exit_scenarios.multi_callable
multi_callable
Definition: _exit_scenarios.py:216
grpc.GenericRpcHandler
Definition: src/python/grpcio/grpc/__init__.py:1333
tests.unit._exit_scenarios.GenericHandler
Definition: _exit_scenarios.py:124
grpc.server
def server(thread_pool, handlers=None, interceptors=None, options=None, maximum_concurrent_rpcs=None, compression=None, xds=False)
Definition: src/python/grpcio/grpc/__init__.py:2034
tests.unit._exit_scenarios.hang_stream_stream
def hang_stream_stream(request_iterator, servicer_context)
Definition: _exit_scenarios.py:84
tests.unit._exit_scenarios.MethodHandler.__init__
def __init__(self, request_streaming, response_streaming, partial_hang)
Definition: _exit_scenarios.py:96
tests.unit._exit_scenarios.DaemonPool.submit
def submit(self, fn, *args, **kwargs)
Definition: _exit_scenarios.py:150
next
AllocList * next[kMaxLevel]
Definition: abseil-cpp/absl/base/internal/low_level_alloc.cc:100
tests.unit._exit_scenarios.DaemonPool.shutdown
def shutdown(self, wait=True)
Definition: _exit_scenarios.py:155
tests.unit._exit_scenarios.DaemonPool
Definition: _exit_scenarios.py:148
tests.unit._exit_scenarios.hang_partial_stream_stream
def hang_partial_stream_stream(request_iterator, servicer_context)
Definition: _exit_scenarios.py:88
tests.unit._exit_scenarios.MethodHandler.response_serializer
response_serializer
Definition: _exit_scenarios.py:100
tests.unit._exit_scenarios.MethodHandler
Definition: _exit_scenarios.py:94
tests.unit._exit_scenarios.hang_unary_stream
def hang_unary_stream(request, servicer_context)
Definition: _exit_scenarios.py:64
tests.unit._exit_scenarios.GenericHandler.service
def service(self, handler_call_details)
Definition: _exit_scenarios.py:126
iter
Definition: test_winkernel.cpp:47
tests.unit.framework.common
Definition: src/python/grpcio_tests/tests/unit/framework/common/__init__.py:1
tests.unit._exit_scenarios.hang_stream_unary
def hang_stream_unary(request_iterator, servicer_context)
Definition: _exit_scenarios.py:74
tests.unit._exit_scenarios.MethodHandler.response_streaming
response_streaming
Definition: _exit_scenarios.py:98
tests.unit._exit_scenarios.hang_partial_unary_stream
def hang_partial_unary_stream(request, servicer_context)
Definition: _exit_scenarios.py:68
grpc.RpcMethodHandler
Definition: src/python/grpcio/grpc/__init__.py:1288
tests.unit._exit_scenarios.MethodHandler.request_streaming
request_streaming
Definition: _exit_scenarios.py:97


grpc
Author(s):
autogenerated on Fri May 16 2025 02:57:38