14 """Test of RPCs made against gRPC Python's application-layer API."""
16 from concurrent
import futures
29 stream_stream_non_blocking_multi_callable
31 unary_stream_non_blocking_multi_callable
57 self.assertEqual(grpc.StatusCode.UNIMPLEMENTED,
58 exception_context.exception.code())
67 metadata=((
'test',
'SuccessfulUnaryRequestBlockingUnaryResponse'),))
69 self.assertEqual(expected_response, response)
76 response, call = multi_callable.with_call(
79 'SuccessfulUnaryRequestBlockingUnaryResponseWithCall'),))
81 self.assertEqual(expected_response, response)
82 self.assertIs(grpc.StatusCode.OK, call.code())
83 self.assertEqual(
'', call.debug_error_string())
90 response_future = multi_callable.future(
92 metadata=((
'test',
'SuccessfulUnaryRequestFutureUnaryResponse'),))
93 response = response_future.result()
96 self.assertIsInstance(response_future,
grpc.Call)
97 self.assertEqual(expected_response, response)
98 self.assertIsNone(response_future.exception())
99 self.assertIsNone(response_future.traceback())
102 request = b
'\x37\x58'
103 expected_responses = tuple(
109 metadata=((
'test',
'SuccessfulUnaryRequestStreamResponse'),))
110 responses = tuple(response_iterator)
112 self.assertSequenceEqual(expected_responses, responses)
116 b
'\x07\x08' for _
in range(test_constants.STREAM_LENGTH))
118 iter(requests),
None)
119 request_iterator =
iter(requests)
125 'SuccessfulStreamRequestBlockingUnaryResponse'),))
127 self.assertEqual(expected_response, response)
131 b
'\x07\x08' for _
in range(test_constants.STREAM_LENGTH))
133 iter(requests),
None)
134 request_iterator =
iter(requests)
137 response, call = multi_callable.with_call(
141 'SuccessfulStreamRequestBlockingUnaryResponseWithCall'),))
143 self.assertEqual(expected_response, response)
144 self.assertIs(grpc.StatusCode.OK, call.code())
148 b
'\x07\x08' for _
in range(test_constants.STREAM_LENGTH))
150 iter(requests),
None)
151 request_iterator =
iter(requests)
154 response_future = multi_callable.future(
156 metadata=((
'test',
'SuccessfulStreamRequestFutureUnaryResponse'),))
157 response = response_future.result()
159 self.assertEqual(expected_response, response)
160 self.assertIsNone(response_future.exception())
161 self.assertIsNone(response_future.traceback())
165 b
'\x77\x58' for _
in range(test_constants.STREAM_LENGTH))
167 expected_responses = tuple(
169 request_iterator =
iter(requests)
174 metadata=((
'test',
'SuccessfulStreamRequestStreamResponse'),))
175 responses = tuple(response_iterator)
177 self.assertSequenceEqual(expected_responses, responses)
180 first_request = b
'\x07\x08'
181 second_request = b
'\x0809'
185 second_request,
None)
190 'SequentialInvocations'),))
193 'SequentialInvocations'),))
195 self.assertEqual(expected_first_response, first_response)
196 self.assertEqual(expected_second_response, second_response)
199 pool = logging_pool.pool(test_constants.THREAD_CONCURRENCY)
201 b
'\x07\x08' for _
in range(test_constants.STREAM_LENGTH))
203 iter(requests),
None)
204 expected_responses = [expected_response
205 ] * test_constants.THREAD_CONCURRENCY
206 response_futures = [
None] * test_constants.THREAD_CONCURRENCY
209 for index
in range(test_constants.THREAD_CONCURRENCY):
210 request_iterator =
iter(requests)
211 response_future = pool.submit(
214 metadata=((
'test',
'ConcurrentBlockingInvocations'),))
215 response_futures[index] = response_future
217 response_future.result()
for response_future
in response_futures)
219 pool.shutdown(wait=
True)
220 self.assertSequenceEqual(expected_responses, responses)
224 b
'\x07\x08' for _
in range(test_constants.STREAM_LENGTH))
226 iter(requests),
None)
227 expected_responses = [expected_response
228 ] * test_constants.THREAD_CONCURRENCY
229 response_futures = [
None] * test_constants.THREAD_CONCURRENCY
232 for index
in range(test_constants.THREAD_CONCURRENCY):
233 request_iterator =
iter(requests)
234 response_future = multi_callable.future(
236 metadata=((
'test',
'ConcurrentFutureInvocations'),))
237 response_futures[index] = response_future
239 response_future.result()
for response_future
in response_futures)
241 self.assertSequenceEqual(expected_responses, responses)
244 pool = logging_pool.pool(test_constants.THREAD_CONCURRENCY)
245 request = b
'\x67\x68'
247 response_futures = [
None] * test_constants.THREAD_CONCURRENCY
248 lock = threading.Lock()
249 test_is_running_cell = [
True]
251 def wrap_future(future):
255 return future.result()
258 if test_is_running_cell[0]:
265 for index
in range(test_constants.THREAD_CONCURRENCY):
266 inner_response_future = multi_callable.future(
270 'WaitingForSomeButNotAllConcurrentFutureInvocations'),))
271 outer_response_future = pool.submit(
272 wrap_future(inner_response_future))
273 response_futures[index] = outer_response_future
275 some_completed_response_futures_iterator = itertools.islice(
276 futures.as_completed(response_futures),
277 test_constants.THREAD_CONCURRENCY // 2)
278 for response_future
in some_completed_response_futures_iterator:
279 self.assertEqual(expected_response, response_future.result())
281 test_is_running_cell[0] =
False
316 request = b
'\x07\x17'
320 response_future = multi_callable.future(
322 metadata=((
'test',
'CancelledUnaryRequestUnaryResponse'),))
323 response_future.cancel()
325 self.assertIs(grpc.StatusCode.CANCELLED, response_future.code())
326 self.assertTrue(response_future.cancelled())
328 response_future.result()
330 response_future.exception()
332 response_future.traceback()
344 b
'\x07\x08' for _
in range(test_constants.STREAM_LENGTH))
345 request_iterator =
iter(requests)
349 response_future = multi_callable.future(
351 metadata=((
'test',
'CancelledStreamRequestUnaryResponse'),))
353 response_future.cancel()
355 self.assertIs(grpc.StatusCode.CANCELLED, response_future.code())
356 self.assertTrue(response_future.cancelled())
358 response_future.result()
360 response_future.exception()
362 response_future.traceback()
363 self.assertIsNotNone(response_future.initial_metadata())
364 self.assertIsNotNone(response_future.details())
365 self.assertIsNotNone(response_future.trailing_metadata())
376 request = b
'\x07\x17'
381 multi_callable.with_call(
383 timeout=TIMEOUT_SHORT,
385 'ExpiredUnaryRequestBlockingUnaryResponse'),))
387 self.assertIsInstance(exception_context.exception,
grpc.Call)
388 self.assertIsNotNone(exception_context.exception.initial_metadata())
389 self.assertIs(grpc.StatusCode.DEADLINE_EXCEEDED,
390 exception_context.exception.code())
391 self.assertIsNotNone(exception_context.exception.details())
392 self.assertIsNotNone(exception_context.exception.trailing_metadata())
395 request = b
'\x07\x17'
400 response_future = multi_callable.future(
402 timeout=TIMEOUT_SHORT,
403 metadata=((
'test',
'ExpiredUnaryRequestFutureUnaryResponse'),))
404 response_future.add_done_callback(callback)
405 value_passed_to_callback = callback.value()
407 self.assertIs(response_future, value_passed_to_callback)
408 self.assertIsNotNone(response_future.initial_metadata())
409 self.assertIs(grpc.StatusCode.DEADLINE_EXCEEDED, response_future.code())
410 self.assertIsNotNone(response_future.details())
411 self.assertIsNotNone(response_future.trailing_metadata())
413 response_future.result()
414 self.assertIs(grpc.StatusCode.DEADLINE_EXCEEDED,
415 exception_context.exception.code())
416 self.assertIsInstance(response_future.exception(),
grpc.RpcError)
417 self.assertIsNotNone(response_future.traceback())
418 self.assertIs(grpc.StatusCode.DEADLINE_EXCEEDED,
419 response_future.exception().
code())
430 if __name__ ==
'__main__':
431 logging.basicConfig()
432 unittest.main(verbosity=2)