_beta_features_test.py
Go to the documentation of this file.
1 # Copyright 2015 gRPC authors.
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 # http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14 """Tests Face interface compliance of the gRPC Python Beta API."""
15 
16 import threading
17 import unittest
18 
19 from grpc.beta import implementations
20 from grpc.beta import interfaces
21 from grpc.framework.common import cardinality
22 from grpc.framework.interfaces.face import utilities
23 
24 from tests.unit import resources
25 from tests.unit.beta import test_utilities
26 from tests.unit.framework.common import test_constants
27 
28 _SERVER_HOST_OVERRIDE = 'foo.test.google.fr'
29 
30 _PER_RPC_CREDENTIALS_METADATA_KEY = b'my-call-credentials-metadata-key'
31 _PER_RPC_CREDENTIALS_METADATA_VALUE = b'my-call-credentials-metadata-value'
32 
33 _GROUP = 'group'
34 _UNARY_UNARY = 'unary-unary'
35 _UNARY_STREAM = 'unary-stream'
36 _STREAM_UNARY = 'stream-unary'
37 _STREAM_STREAM = 'stream-stream'
38 
39 _REQUEST = b'abc'
40 _RESPONSE = b'123'
41 
42 
43 class _Servicer(object):
44 
45  def __init__(self):
46  self._condition = threading.Condition()
47  self._peer = None
48  self._serviced = False
49 
50  def unary_unary(self, request, context):
51  with self._condition:
52  self._request = request
53  self._peer = context.protocol_context().peer()
54  self._invocation_metadata = context.invocation_metadata()
55  context.protocol_context().disable_next_response_compression()
56  self._serviced = True
57  self._condition.notify_all()
58  return _RESPONSE
59 
60  def unary_stream(self, request, context):
61  with self._condition:
62  self._request = request
63  self._peer = context.protocol_context().peer()
64  self._invocation_metadata = context.invocation_metadata()
65  context.protocol_context().disable_next_response_compression()
66  self._serviced = True
67  self._condition.notify_all()
68  return
69  yield # pylint: disable=unreachable
70 
71  def stream_unary(self, request_iterator, context):
72  for request in request_iterator:
73  self._request = request
74  with self._condition:
75  self._peer = context.protocol_context().peer()
76  self._invocation_metadata = context.invocation_metadata()
77  context.protocol_context().disable_next_response_compression()
78  self._serviced = True
79  self._condition.notify_all()
80  return _RESPONSE
81 
82  def stream_stream(self, request_iterator, context):
83  for request in request_iterator:
84  with self._condition:
85  self._peer = context.protocol_context().peer()
86  context.protocol_context().disable_next_response_compression()
87  yield _RESPONSE
88  with self._condition:
89  self._invocation_metadata = context.invocation_metadata()
90  self._serviced = True
91  self._condition.notify_all()
92 
93  def peer(self):
94  with self._condition:
95  return self._peer
96 
98  with self._condition:
99  while not self._serviced:
100  self._condition.wait()
101 
102 
103 class _BlockingIterator(object):
104 
105  def __init__(self, upstream):
106  self._condition = threading.Condition()
107  self._upstream = upstream
108  self._allowed = []
109 
110  def __iter__(self):
111  return self
112 
113  def __next__(self):
114  return self.next()
115 
116  def next(self):
117  with self._condition:
118  while True:
119  if self._allowed is None:
120  raise StopIteration()
121  elif self._allowed:
122  return self._allowed.pop(0)
123  else:
124  self._condition.wait()
125 
126  def allow(self):
127  with self._condition:
128  try:
129  self._allowed.append(next(self._upstream))
130  except StopIteration:
131  self._allowed = None
132  self._condition.notify_all()
133 
134 
135 def _metadata_plugin(context, callback):
136  callback([
137  (_PER_RPC_CREDENTIALS_METADATA_KEY, _PER_RPC_CREDENTIALS_METADATA_VALUE)
138  ], None)
139 
140 
141 class BetaFeaturesTest(unittest.TestCase):
142 
143  def setUp(self):
145  method_implementations = {
146  (_GROUP, _UNARY_UNARY):
147  utilities.unary_unary_inline(self._servicer.unary_unary),
148  (_GROUP, _UNARY_STREAM):
149  utilities.unary_stream_inline(self._servicer.unary_stream),
150  (_GROUP, _STREAM_UNARY):
151  utilities.stream_unary_inline(self._servicer.stream_unary),
152  (_GROUP, _STREAM_STREAM):
153  utilities.stream_stream_inline(self._servicer.stream_stream),
154  }
155 
156  cardinalities = {
157  _UNARY_UNARY: cardinality.Cardinality.UNARY_UNARY,
158  _UNARY_STREAM: cardinality.Cardinality.UNARY_STREAM,
159  _STREAM_UNARY: cardinality.Cardinality.STREAM_UNARY,
160  _STREAM_STREAM: cardinality.Cardinality.STREAM_STREAM,
161  }
162 
163  server_options = implementations.server_options(
164  thread_pool_size=test_constants.POOL_SIZE)
165  self._server = implementations.server(method_implementations,
166  options=server_options)
167  server_credentials = implementations.ssl_server_credentials([
168  (
169  resources.private_key(),
170  resources.certificate_chain(),
171  ),
172  ])
173  port = self._server.add_secure_port('[::]:0', server_credentials)
174  self._server.start()
175  self._channel_credentials = implementations.ssl_channel_credentials(
176  resources.test_root_certificates())
177  self._call_credentials = implementations.metadata_call_credentials(
178  _metadata_plugin)
179  channel = test_utilities.not_really_secure_channel(
180  'localhost', port, self._channel_credentials, _SERVER_HOST_OVERRIDE)
181  stub_options = implementations.stub_options(
182  thread_pool_size=test_constants.POOL_SIZE)
183  self._dynamic_stub = implementations.dynamic_stub(channel,
184  _GROUP,
185  cardinalities,
186  options=stub_options)
187 
188  def tearDown(self):
189  self._dynamic_stub = None
190  self._server.stop(test_constants.SHORT_TIMEOUT).wait()
191 
192  def test_unary_unary(self):
193  call_options = interfaces.grpc_call_options(
194  disable_compression=True, credentials=self._call_credentials)
195  response = getattr(self._dynamic_stub,
196  _UNARY_UNARY)(_REQUEST,
197  test_constants.LONG_TIMEOUT,
198  protocol_options=call_options)
199  self.assertEqual(_RESPONSE, response)
200  self.assertIsNotNone(self._servicer.peer())
201  invocation_metadata = [
202  (metadatum.key, metadatum.value)
203  for metadatum in self._servicer._invocation_metadata
204  ]
205  self.assertIn((_PER_RPC_CREDENTIALS_METADATA_KEY,
206  _PER_RPC_CREDENTIALS_METADATA_VALUE),
207  invocation_metadata)
208 
209  def test_unary_stream(self):
210  call_options = interfaces.grpc_call_options(
211  disable_compression=True, credentials=self._call_credentials)
212  response_iterator = getattr(self._dynamic_stub, _UNARY_STREAM)(
213  _REQUEST,
214  test_constants.LONG_TIMEOUT,
215  protocol_options=call_options)
216  self._servicer.block_until_serviced()
217  self.assertIsNotNone(self._servicer.peer())
218  invocation_metadata = [
219  (metadatum.key, metadatum.value)
220  for metadatum in self._servicer._invocation_metadata
221  ]
222  self.assertIn((_PER_RPC_CREDENTIALS_METADATA_KEY,
223  _PER_RPC_CREDENTIALS_METADATA_VALUE),
224  invocation_metadata)
225 
226  def test_stream_unary(self):
227  call_options = interfaces.grpc_call_options(
228  credentials=self._call_credentials)
229  request_iterator = _BlockingIterator(iter((_REQUEST,)))
230  response_future = getattr(self._dynamic_stub, _STREAM_UNARY).future(
231  request_iterator,
232  test_constants.LONG_TIMEOUT,
233  protocol_options=call_options)
234  response_future.protocol_context().disable_next_request_compression()
235  request_iterator.allow()
236  response_future.protocol_context().disable_next_request_compression()
237  request_iterator.allow()
238  self._servicer.block_until_serviced()
239  self.assertIsNotNone(self._servicer.peer())
240  self.assertEqual(_RESPONSE, response_future.result())
241  invocation_metadata = [
242  (metadatum.key, metadatum.value)
243  for metadatum in self._servicer._invocation_metadata
244  ]
245  self.assertIn((_PER_RPC_CREDENTIALS_METADATA_KEY,
246  _PER_RPC_CREDENTIALS_METADATA_VALUE),
247  invocation_metadata)
248 
250  call_options = interfaces.grpc_call_options(
251  credentials=self._call_credentials)
252  request_iterator = _BlockingIterator(iter((_REQUEST,)))
253  response_iterator = getattr(self._dynamic_stub, _STREAM_STREAM)(
254  request_iterator,
255  test_constants.SHORT_TIMEOUT,
256  protocol_options=call_options)
257  response_iterator.protocol_context().disable_next_request_compression()
258  request_iterator.allow()
259  response = next(response_iterator)
260  response_iterator.protocol_context().disable_next_request_compression()
261  request_iterator.allow()
262  self._servicer.block_until_serviced()
263  self.assertIsNotNone(self._servicer.peer())
264  self.assertEqual(_RESPONSE, response)
265  invocation_metadata = [
266  (metadatum.key, metadatum.value)
267  for metadatum in self._servicer._invocation_metadata
268  ]
269  self.assertIn((_PER_RPC_CREDENTIALS_METADATA_KEY,
270  _PER_RPC_CREDENTIALS_METADATA_VALUE),
271  invocation_metadata)
272 
273 
274 class ContextManagementAndLifecycleTest(unittest.TestCase):
275 
276  def setUp(self):
279  (_GROUP, _UNARY_UNARY):
280  utilities.unary_unary_inline(self._servicer.unary_unary),
281  (_GROUP, _UNARY_STREAM):
282  utilities.unary_stream_inline(self._servicer.unary_stream),
283  (_GROUP, _STREAM_UNARY):
284  utilities.stream_unary_inline(self._servicer.stream_unary),
285  (_GROUP, _STREAM_STREAM):
286  utilities.stream_stream_inline(self._servicer.stream_stream),
287  }
288 
289  self._cardinalities = {
290  _UNARY_UNARY: cardinality.Cardinality.UNARY_UNARY,
291  _UNARY_STREAM: cardinality.Cardinality.UNARY_STREAM,
292  _STREAM_UNARY: cardinality.Cardinality.STREAM_UNARY,
293  _STREAM_STREAM: cardinality.Cardinality.STREAM_STREAM,
294  }
295 
296  self._server_options = implementations.server_options(
297  thread_pool_size=test_constants.POOL_SIZE)
298  self._server_credentials = implementations.ssl_server_credentials([
299  (
300  resources.private_key(),
301  resources.certificate_chain(),
302  ),
303  ])
304  self._channel_credentials = implementations.ssl_channel_credentials(
305  resources.test_root_certificates())
306  self._stub_options = implementations.stub_options(
307  thread_pool_size=test_constants.POOL_SIZE)
308 
309  def test_stub_context(self):
310  server = implementations.server(self._method_implementations,
311  options=self._server_options)
312  port = server.add_secure_port('[::]:0', self._server_credentials)
313  server.start()
314 
315  channel = test_utilities.not_really_secure_channel(
316  'localhost', port, self._channel_credentials, _SERVER_HOST_OVERRIDE)
317  dynamic_stub = implementations.dynamic_stub(channel,
318  _GROUP,
319  self._cardinalities,
320  options=self._stub_options)
321  for _ in range(100):
322  with dynamic_stub:
323  pass
324  for _ in range(10):
325  with dynamic_stub:
326  call_options = interfaces.grpc_call_options(
327  disable_compression=True)
328  response = getattr(dynamic_stub,
329  _UNARY_UNARY)(_REQUEST,
330  test_constants.LONG_TIMEOUT,
331  protocol_options=call_options)
332  self.assertEqual(_RESPONSE, response)
333  self.assertIsNotNone(self._servicer.peer())
334 
335  server.stop(test_constants.SHORT_TIMEOUT).wait()
336 
338  for _ in range(100):
339  server = implementations.server(self._method_implementations,
340  options=self._server_options)
341  port = server.add_secure_port('[::]:0', self._server_credentials)
342  server.start()
343  server.stop(test_constants.SHORT_TIMEOUT).wait()
344  for _ in range(100):
345  server = implementations.server(self._method_implementations,
346  options=self._server_options)
347  server.add_secure_port('[::]:0', self._server_credentials)
348  server.add_insecure_port('[::]:0')
349  with server:
350  server.stop(test_constants.SHORT_TIMEOUT)
351  server.stop(test_constants.SHORT_TIMEOUT)
352 
353 
354 if __name__ == '__main__':
355  unittest.main(verbosity=2)
tests.unit.beta._beta_features_test.BetaFeaturesTest._servicer
_servicer
Definition: _beta_features_test.py:144
tests.unit.beta._beta_features_test.BetaFeaturesTest.tearDown
def tearDown(self)
Definition: _beta_features_test.py:188
tests.unit.beta._beta_features_test.BetaFeaturesTest
Definition: _beta_features_test.py:141
tests.unit.beta._beta_features_test._Servicer.unary_stream
def unary_stream(self, request, context)
Definition: _beta_features_test.py:60
grpc.framework.interfaces.face
Definition: src/python/grpcio/grpc/framework/interfaces/face/__init__.py:1
capstone.range
range
Definition: third_party/bloaty/third_party/capstone/bindings/python/capstone/__init__.py:6
grpc.beta
Definition: src/python/grpcio/grpc/beta/__init__.py:1
tests.unit.beta._beta_features_test._Servicer.block_until_serviced
def block_until_serviced(self)
Definition: _beta_features_test.py:97
tests.unit.beta._beta_features_test._Servicer._request
_request
Definition: _beta_features_test.py:52
tests.unit.beta._beta_features_test._Servicer.stream_unary
def stream_unary(self, request_iterator, context)
Definition: _beta_features_test.py:71
tests.unit.beta._beta_features_test.ContextManagementAndLifecycleTest._servicer
_servicer
Definition: _beta_features_test.py:277
tests.unit.beta._beta_features_test._BlockingIterator.next
def next(self)
Definition: _beta_features_test.py:116
tests.unit.beta._beta_features_test.BetaFeaturesTest._server
_server
Definition: _beta_features_test.py:165
tests.unit.beta._beta_features_test._Servicer._peer
_peer
Definition: _beta_features_test.py:47
tests.unit.beta._beta_features_test._BlockingIterator._allowed
_allowed
Definition: _beta_features_test.py:108
tests.unit.beta._beta_features_test.BetaFeaturesTest.setUp
def setUp(self)
Definition: _beta_features_test.py:143
tests.unit._exit_scenarios.future
future
Definition: _exit_scenarios.py:217
tests.unit.beta._beta_features_test.ContextManagementAndLifecycleTest.test_stub_context
def test_stub_context(self)
Definition: _beta_features_test.py:309
grpc.framework.common
Definition: src/python/grpcio/grpc/framework/common/__init__.py:1
tests.unit.beta._beta_features_test._Servicer.stream_stream
def stream_stream(self, request_iterator, context)
Definition: _beta_features_test.py:82
tests.unit.beta._beta_features_test.ContextManagementAndLifecycleTest._server_options
_server_options
Definition: _beta_features_test.py:296
start
static uint64_t start
Definition: benchmark-pound.c:74
tests.unit.beta._beta_features_test.ContextManagementAndLifecycleTest._channel_credentials
_channel_credentials
Definition: _beta_features_test.py:304
tests.unit.beta._beta_features_test._Servicer._condition
_condition
Definition: _beta_features_test.py:46
tests.unit.beta._beta_features_test.ContextManagementAndLifecycleTest.setUp
def setUp(self)
Definition: _beta_features_test.py:276
tests.unit.beta
Definition: src/python/grpcio_tests/tests/unit/beta/__init__.py:1
tests.unit.beta._beta_features_test._BlockingIterator.__iter__
def __iter__(self)
Definition: _beta_features_test.py:110
tests.unit.beta._beta_features_test.BetaFeaturesTest.test_stream_unary
def test_stream_unary(self)
Definition: _beta_features_test.py:226
tests.unit.beta._beta_features_test._Servicer._serviced
_serviced
Definition: _beta_features_test.py:48
tests.unit.beta._beta_features_test._BlockingIterator.allow
def allow(self)
Definition: _beta_features_test.py:126
tests.unit.beta._beta_features_test.BetaFeaturesTest.test_stream_stream
def test_stream_stream(self)
Definition: _beta_features_test.py:249
callback
static void callback(void *arg, int status, int timeouts, struct hostent *host)
Definition: acountry.c:224
tests.unit.beta._beta_features_test.ContextManagementAndLifecycleTest._server_credentials
_server_credentials
Definition: _beta_features_test.py:298
tests.unit.beta._beta_features_test._metadata_plugin
def _metadata_plugin(context, callback)
Definition: _beta_features_test.py:135
tests.unit.beta._beta_features_test._BlockingIterator.__next__
def __next__(self)
Definition: _beta_features_test.py:113
tests.unit.beta._beta_features_test.BetaFeaturesTest.test_unary_stream
def test_unary_stream(self)
Definition: _beta_features_test.py:209
tests.unit.beta._beta_features_test._Servicer.peer
def peer(self)
Definition: _beta_features_test.py:93
tests.unit.beta._beta_features_test.ContextManagementAndLifecycleTest._stub_options
_stub_options
Definition: _beta_features_test.py:306
tests.unit.beta._beta_features_test._BlockingIterator._condition
_condition
Definition: _beta_features_test.py:106
tests.unit.beta._beta_features_test.ContextManagementAndLifecycleTest._method_implementations
_method_implementations
Definition: _beta_features_test.py:278
tests.unit
Definition: src/python/grpcio_tests/tests/unit/__init__.py:1
tests.unit.beta._beta_features_test.BetaFeaturesTest._dynamic_stub
_dynamic_stub
Definition: _beta_features_test.py:183
tests.unit.beta._beta_features_test.BetaFeaturesTest._call_credentials
_call_credentials
Definition: _beta_features_test.py:177
tests.unit.beta._beta_features_test.ContextManagementAndLifecycleTest.test_server_lifecycle
def test_server_lifecycle(self)
Definition: _beta_features_test.py:337
next
AllocList * next[kMaxLevel]
Definition: abseil-cpp/absl/base/internal/low_level_alloc.cc:100
tests.unit.beta._beta_features_test._Servicer._invocation_metadata
_invocation_metadata
Definition: _beta_features_test.py:54
wait
static void wait(notification *n)
Definition: alts_tsi_handshaker_test.cc:114
tests.unit.beta._beta_features_test.ContextManagementAndLifecycleTest._cardinalities
_cardinalities
Definition: _beta_features_test.py:289
tests.unit.beta._beta_features_test._BlockingIterator.__init__
def __init__(self, upstream)
Definition: _beta_features_test.py:105
tests.unit.beta._beta_features_test._Servicer
Definition: _beta_features_test.py:43
tests.unit.beta._beta_features_test._Servicer.unary_unary
def unary_unary(self, request, context)
Definition: _beta_features_test.py:50
tests.unit.beta._beta_features_test._BlockingIterator._upstream
_upstream
Definition: _beta_features_test.py:107
tests.unit.beta._beta_features_test.ContextManagementAndLifecycleTest
Definition: _beta_features_test.py:274
stop
static const char stop[]
Definition: benchmark-async-pummel.c:35
tests.unit.beta._beta_features_test._BlockingIterator
Definition: _beta_features_test.py:103
iter
Definition: test_winkernel.cpp:47
tests.unit.framework.common
Definition: src/python/grpcio_tests/tests/unit/framework/common/__init__.py:1
tests.unit.beta._beta_features_test._Servicer.__init__
def __init__(self)
Definition: _beta_features_test.py:45
tests.unit.beta._beta_features_test.BetaFeaturesTest._channel_credentials
_channel_credentials
Definition: _beta_features_test.py:175
tests.unit.beta._beta_features_test.BetaFeaturesTest.test_unary_unary
def test_unary_unary(self)
Definition: _beta_features_test.py:192


grpc
Author(s):
autogenerated on Thu Mar 13 2025 02:58:26