_client_test.py
Go to the documentation of this file.
1 # Copyright 2017 gRPC authors.
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 # http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14 
15 from concurrent import futures
16 import time
17 import unittest
18 
19 import grpc
20 from grpc.framework.foundation import logging_pool
21 import grpc_testing
22 
23 from tests.testing import _application_common
24 from tests.testing import _application_testing_common
25 from tests.testing import _client_application
26 from tests.testing.proto import requests_pb2
27 from tests.testing.proto import services_pb2
28 from tests.unit.framework.common import test_constants
29 
30 
31 # TODO(https://github.com/protocolbuffers/protobuf/issues/3452): Drop this skip.
32 @unittest.skipIf(
33  services_pb2.DESCRIPTOR.services_by_name.get('FirstService') is None,
34  'Fix protobuf issue 3452!')
35 class ClientTest(unittest.TestCase):
36 
37  def setUp(self):
38  # In this test the client-side application under test executes in
39  # a separate thread while we retain use of the test thread to "play
40  # server".
41  self._client_execution_thread_pool = logging_pool.pool(1)
42 
46  services_pb2.DESCRIPTOR.services_by_name.values(), self._fake_time)
48  services_pb2.DESCRIPTOR.services_by_name.values(), self._real_time)
49 
50  def tearDown(self):
51  self._client_execution_thread_pool.shutdown(wait=True)
52 
54  application_future = self._client_execution_thread_pool.submit(
55  _client_application.run, _client_application.Scenario.UNARY_UNARY,
56  self._real_time_channel)
57  invocation_metadata, request, rpc = (
58  self._real_time_channel.take_unary_unary(
59  _application_testing_common.FIRST_SERVICE_UNUN))
60  rpc.send_initial_metadata(())
61  rpc.terminate(_application_common.UNARY_UNARY_RESPONSE, (),
62  grpc.StatusCode.OK, '')
63  application_return_value = application_future.result()
64 
65  self.assertEqual(_application_common.UNARY_UNARY_REQUEST, request)
66  self.assertIs(application_return_value.kind,
67  _client_application.Outcome.Kind.SATISFACTORY)
68 
70  application_future = self._client_execution_thread_pool.submit(
71  _client_application.run, _client_application.Scenario.UNARY_STREAM,
72  self._fake_time_channel)
73  invocation_metadata, request, rpc = (
74  self._fake_time_channel.take_unary_stream(
75  _application_testing_common.FIRST_SERVICE_UNSTRE))
76  rpc.send_initial_metadata(())
77  rpc.terminate((), grpc.StatusCode.OK, '')
78  application_return_value = application_future.result()
79 
80  self.assertEqual(_application_common.UNARY_STREAM_REQUEST, request)
81  self.assertIs(application_return_value.kind,
82  _client_application.Outcome.Kind.SATISFACTORY)
83 
85  application_future = self._client_execution_thread_pool.submit(
86  _client_application.run, _client_application.Scenario.STREAM_UNARY,
87  self._real_time_channel)
88  invocation_metadata, rpc = self._real_time_channel.take_stream_unary(
89  _application_testing_common.FIRST_SERVICE_STREUN)
90  rpc.send_initial_metadata(())
91  first_request = rpc.take_request()
92  second_request = rpc.take_request()
93  third_request = rpc.take_request()
94  rpc.requests_closed()
95  rpc.terminate(_application_common.STREAM_UNARY_RESPONSE, (),
96  grpc.StatusCode.OK, '')
97  application_return_value = application_future.result()
98 
99  self.assertEqual(_application_common.STREAM_UNARY_REQUEST,
100  first_request)
101  self.assertEqual(_application_common.STREAM_UNARY_REQUEST,
102  second_request)
103  self.assertEqual(_application_common.STREAM_UNARY_REQUEST,
104  third_request)
105  self.assertIs(application_return_value.kind,
106  _client_application.Outcome.Kind.SATISFACTORY)
107 
109  application_future = self._client_execution_thread_pool.submit(
110  _client_application.run, _client_application.Scenario.STREAM_STREAM,
111  self._fake_time_channel)
112  invocation_metadata, rpc = self._fake_time_channel.take_stream_stream(
113  _application_testing_common.FIRST_SERVICE_STRESTRE)
114  first_request = rpc.take_request()
115  rpc.send_response(_application_common.STREAM_STREAM_RESPONSE)
116  rpc.send_response(_application_common.STREAM_STREAM_RESPONSE)
117  second_request = rpc.take_request()
118  rpc.send_response(_application_common.STREAM_STREAM_RESPONSE)
119  rpc.send_response(_application_common.STREAM_STREAM_RESPONSE)
120  rpc.requests_closed()
121  rpc.terminate((), grpc.StatusCode.OK, '')
122  application_return_value = application_future.result()
123 
124  self.assertEqual(_application_common.STREAM_STREAM_REQUEST,
125  first_request)
126  self.assertEqual(_application_common.STREAM_STREAM_REQUEST,
127  second_request)
128  self.assertIs(application_return_value.kind,
129  _client_application.Outcome.Kind.SATISFACTORY)
130 
132  application_future = self._client_execution_thread_pool.submit(
133  _client_application.run,
134  _client_application.Scenario.CONCURRENT_STREAM_STREAM,
135  self._real_time_channel)
136  rpcs = []
137  for _ in range(test_constants.RPC_CONCURRENCY):
138  invocation_metadata, rpc = (
139  self._real_time_channel.take_stream_stream(
140  _application_testing_common.FIRST_SERVICE_STRESTRE))
141  rpcs.append(rpc)
142  requests = {}
143  for rpc in rpcs:
144  requests[rpc] = [rpc.take_request()]
145  for rpc in rpcs:
146  rpc.send_response(_application_common.STREAM_STREAM_RESPONSE)
147  rpc.send_response(_application_common.STREAM_STREAM_RESPONSE)
148  for rpc in rpcs:
149  requests[rpc].append(rpc.take_request())
150  for rpc in rpcs:
151  rpc.send_response(_application_common.STREAM_STREAM_RESPONSE)
152  rpc.send_response(_application_common.STREAM_STREAM_RESPONSE)
153  for rpc in rpcs:
154  rpc.requests_closed()
155  for rpc in rpcs:
156  rpc.terminate((), grpc.StatusCode.OK, '')
157  application_return_value = application_future.result()
158 
159  for requests_of_one_rpc in requests.values():
160  for request in requests_of_one_rpc:
161  self.assertEqual(_application_common.STREAM_STREAM_REQUEST,
162  request)
163  self.assertIs(application_return_value.kind,
164  _client_application.Outcome.Kind.SATISFACTORY)
165 
167  application_future = self._client_execution_thread_pool.submit(
168  _client_application.run,
169  _client_application.Scenario.CANCEL_UNARY_UNARY,
170  self._fake_time_channel)
171  invocation_metadata, request, rpc = (
172  self._fake_time_channel.take_unary_unary(
173  _application_testing_common.FIRST_SERVICE_UNUN))
174  rpc.send_initial_metadata(())
175  rpc.cancelled()
176  application_return_value = application_future.result()
177 
178  self.assertEqual(_application_common.UNARY_UNARY_REQUEST, request)
179  self.assertIs(application_return_value.kind,
180  _client_application.Outcome.Kind.SATISFACTORY)
181 
183  application_future = self._client_execution_thread_pool.submit(
184  _client_application.run,
185  _client_application.Scenario.CONCURRENT_STREAM_UNARY,
186  self._fake_time_channel)
187  rpcs = tuple(
188  self._fake_time_channel.take_stream_unary(
189  _application_testing_common.FIRST_SERVICE_STREUN)[1]
190  for _ in range(test_constants.THREAD_CONCURRENCY))
191  for rpc in rpcs:
192  rpc.take_request()
193  rpc.take_request()
194  rpc.take_request()
195  rpc.requests_closed()
196  rpc.send_initial_metadata(((
197  'my_metadata_key',
198  'My Metadata Value!',
199  ),))
200  for rpc in rpcs[:-1]:
201  rpc.terminate(_application_common.STREAM_UNARY_RESPONSE, (),
202  grpc.StatusCode.OK, '')
203  rpcs[-1].terminate(_application_common.STREAM_UNARY_RESPONSE, (),
204  grpc.StatusCode.RESOURCE_EXHAUSTED,
205  'nope; not able to handle all those RPCs!')
206  application_return_value = application_future.result()
207 
208  self.assertIs(application_return_value.kind,
209  _client_application.Outcome.Kind.UNSATISFACTORY)
210 
212  code = grpc.StatusCode.DEADLINE_EXCEEDED
213  details = 'test deadline exceeded!'
214 
215  application_future = self._client_execution_thread_pool.submit(
216  _client_application.run, _client_application.Scenario.STREAM_STREAM,
217  self._real_time_channel)
218  invocation_metadata, rpc = self._real_time_channel.take_stream_stream(
219  _application_testing_common.FIRST_SERVICE_STRESTRE)
220  first_request = rpc.take_request()
221  rpc.send_response(_application_common.STREAM_STREAM_RESPONSE)
222  rpc.send_response(_application_common.STREAM_STREAM_RESPONSE)
223  second_request = rpc.take_request()
224  rpc.send_response(_application_common.STREAM_STREAM_RESPONSE)
225  rpc.send_response(_application_common.STREAM_STREAM_RESPONSE)
226  rpc.requests_closed()
227  rpc.terminate((), code, details)
228  application_return_value = application_future.result()
229 
230  self.assertEqual(_application_common.STREAM_STREAM_REQUEST,
231  first_request)
232  self.assertEqual(_application_common.STREAM_STREAM_REQUEST,
233  second_request)
234  self.assertIs(application_return_value.kind,
235  _client_application.Outcome.Kind.RPC_ERROR)
236  self.assertIs(application_return_value.code, code)
237  self.assertEqual(application_return_value.details, details)
238 
240  application_future = self._client_execution_thread_pool.submit(
241  _client_application.run, _client_application.Scenario.UNARY_UNARY,
242  self._fake_time_channel)
243  invocation_metadata, request, rpc = (
244  self._fake_time_channel.take_unary_unary(
245  _application_testing_common.FIRST_SERVICE_UNUN))
246  rpc.send_initial_metadata(())
247  rpc.terminate(_application_common.ERRONEOUS_UNARY_UNARY_RESPONSE, (),
248  grpc.StatusCode.OK, '')
249  application_return_value = application_future.result()
250 
251  self.assertEqual(_application_common.UNARY_UNARY_REQUEST, request)
252  self.assertIs(application_return_value.kind,
253  _client_application.Outcome.Kind.UNSATISFACTORY)
254 
256  application_future = self._client_execution_thread_pool.submit(
257  _client_application.run, _client_application.Scenario.STREAM_STREAM,
258  self._real_time_channel)
259  invocation_metadata, rpc = self._real_time_channel.take_stream_stream(
260  _application_testing_common.FIRST_SERVICE_STRESTRE)
261  first_request = rpc.take_request()
262  rpc.send_response(_application_common.STREAM_STREAM_RESPONSE)
263  rpc.send_response(_application_common.STREAM_STREAM_RESPONSE)
264  rpc.send_response(_application_common.STREAM_STREAM_RESPONSE)
265  second_request = rpc.take_request()
266  rpc.send_response(_application_common.STREAM_STREAM_RESPONSE)
267  rpc.send_response(_application_common.STREAM_STREAM_RESPONSE)
268  rpc.send_response(_application_common.STREAM_STREAM_RESPONSE)
269  rpc.requests_closed()
270  rpc.terminate((), grpc.StatusCode.OK, '')
271  application_return_value = application_future.result()
272 
273  self.assertEqual(_application_common.STREAM_STREAM_REQUEST,
274  first_request)
275  self.assertEqual(_application_common.STREAM_STREAM_REQUEST,
276  second_request)
277  self.assertIs(application_return_value.kind,
278  _client_application.Outcome.Kind.UNSATISFACTORY)
279 
281  application_future = self._client_execution_thread_pool.submit(
282  _client_application.run,
283  _client_application.Scenario.INFINITE_REQUEST_STREAM,
284  self._real_time_channel)
285  invocation_metadata, rpc = self._real_time_channel.take_stream_unary(
286  _application_testing_common.FIRST_SERVICE_STREUN)
287  rpc.send_initial_metadata(())
288  first_request = rpc.take_request()
289  second_request = rpc.take_request()
290  third_request = rpc.take_request()
291  self._real_time.sleep_for(
292  _application_common.INFINITE_REQUEST_STREAM_TIMEOUT)
293  rpc.terminate(_application_common.STREAM_UNARY_RESPONSE, (),
294  grpc.StatusCode.DEADLINE_EXCEEDED, '')
295  application_return_value = application_future.result()
296 
297  self.assertEqual(_application_common.STREAM_UNARY_REQUEST,
298  first_request)
299  self.assertEqual(_application_common.STREAM_UNARY_REQUEST,
300  second_request)
301  self.assertEqual(_application_common.STREAM_UNARY_REQUEST,
302  third_request)
303  self.assertIs(application_return_value.kind,
304  _client_application.Outcome.Kind.SATISFACTORY)
305 
306 
307 if __name__ == '__main__':
308  unittest.main(verbosity=2)
grpc_testing.channel
def channel(service_descriptors, time)
Definition: src/python/grpcio_testing/grpc_testing/__init__.py:666
tests.testing._client_test.ClientTest._fake_time
_fake_time
Definition: _client_test.py:43
grpc.framework.foundation
Definition: src/python/grpcio/grpc/framework/foundation/__init__.py:1
tests.testing._client_test.ClientTest.test_successful_stream_stream
def test_successful_stream_stream(self)
Definition: _client_test.py:108
capstone.range
range
Definition: third_party/bloaty/third_party/capstone/bindings/python/capstone/__init__.py:6
tests.testing._client_test.ClientTest.test_successful_unary_stream
def test_successful_unary_stream(self)
Definition: _client_test.py:69
tests.testing
Definition: src/python/grpcio_tests/tests/testing/__init__.py:1
tests.testing._client_test.ClientTest._client_execution_thread_pool
_client_execution_thread_pool
Definition: _client_test.py:41
tests.testing._client_test.ClientTest.test_concurrent_stream_stream
def test_concurrent_stream_stream(self)
Definition: _client_test.py:131
tests.testing._client_test.ClientTest.test_misbehaving_server_stream_stream
def test_misbehaving_server_stream_stream(self)
Definition: _client_test.py:255
tests.testing._client_test.ClientTest._real_time
_real_time
Definition: _client_test.py:44
grpc_testing.strict_fake_time
def strict_fake_time(now)
Definition: src/python/grpcio_testing/grpc_testing/__init__.py:647
tests.testing._client_test.ClientTest.test_status_stream_stream
def test_status_stream_stream(self)
Definition: _client_test.py:211
tests.testing._client_test.ClientTest._fake_time_channel
_fake_time_channel
Definition: _client_test.py:45
tests.testing._client_test.ClientTest
Definition: _client_test.py:35
tests.testing._client_test.ClientTest.test_infinite_request_stream_real_time
def test_infinite_request_stream_real_time(self)
Definition: _client_test.py:280
tests.testing._client_test.ClientTest.test_successful_unary_unary
def test_successful_unary_unary(self)
Definition: _client_test.py:53
tests.testing._client_test.ClientTest.test_status_stream_unary
def test_status_stream_unary(self)
Definition: _client_test.py:182
tests.testing._client_test.ClientTest.setUp
def setUp(self)
Definition: _client_test.py:37
tests.testing.proto
Definition: src/python/grpcio_tests/tests/testing/proto/__init__.py:1
tests.testing._client_test.ClientTest._real_time_channel
_real_time_channel
Definition: _client_test.py:47
tests.unit.framework.common
Definition: src/python/grpcio_tests/tests/unit/framework/common/__init__.py:1
tests.testing._client_test.ClientTest.test_successful_stream_unary
def test_successful_stream_unary(self)
Definition: _client_test.py:84
tests.testing._client_test.ClientTest.test_cancelled_unary_unary
def test_cancelled_unary_unary(self)
Definition: _client_test.py:166
tests.testing._client_test.ClientTest.tearDown
def tearDown(self)
Definition: _client_test.py:50
tests.testing._client_test.ClientTest.test_misbehaving_server_unary_unary
def test_misbehaving_server_unary_unary(self)
Definition: _client_test.py:239
grpc_testing.strict_real_time
def strict_real_time()
Definition: src/python/grpcio_testing/grpc_testing/__init__.py:632


grpc
Author(s):
autogenerated on Thu Mar 13 2025 02:58:27