client_unary_unary_interceptor_test.py
Go to the documentation of this file.
1 # Copyright 2019 The gRPC Authors.
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 # http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14 import asyncio
15 import logging
16 import unittest
17 
18 import grpc
19 from grpc.experimental import aio
20 
21 from src.proto.grpc.testing import messages_pb2
22 from src.proto.grpc.testing import test_pb2_grpc
23 from tests_aio.unit import _common
24 from tests_aio.unit import _constants
25 from tests_aio.unit._test_base import AioTestBase
26 from tests_aio.unit._test_server import _INITIAL_METADATA_KEY
27 from tests_aio.unit._test_server import _TRAILING_METADATA_KEY
28 from tests_aio.unit._test_server import start_test_server
29 
30 _LOCAL_CANCEL_DETAILS_EXPECTATION = 'Locally cancelled by application!'
31 _INITIAL_METADATA_TO_INJECT = aio.Metadata(
32  (_INITIAL_METADATA_KEY, 'extra info'),
33  (_TRAILING_METADATA_KEY, b'\x13\x37'),
34 )
35 _TIMEOUT_CHECK_IF_CALLBACK_WAS_CALLED = 1.0
36 
37 
39 
40  async def setUp(self):
41  self._server_target, self._server = await start_test_server()
42 
43  async def tearDown(self):
44  await self._server.stop(None)
45 
46  def test_invalid_interceptor(self):
47 
48  class InvalidInterceptor:
49  """Just an invalid Interceptor"""
50 
51  with self.assertRaises(ValueError):
52  aio.insecure_channel("", interceptors=[InvalidInterceptor()])
53 
54  async def test_executed_right_order(self):
55 
56  interceptors_executed = []
57 
58  class Interceptor(aio.UnaryUnaryClientInterceptor):
59  """Interceptor used for testing if the interceptor is being called"""
60 
61  async def intercept_unary_unary(self, continuation,
62  client_call_details, request):
63  interceptors_executed.append(self)
64  call = await continuation(client_call_details, request)
65  return call
66 
67  interceptors = [Interceptor() for i in range(2)]
68 
69  async with aio.insecure_channel(self._server_target,
70  interceptors=interceptors) as channel:
71  multicallable = channel.unary_unary(
72  '/grpc.testing.TestService/UnaryCall',
73  request_serializer=messages_pb2.SimpleRequest.SerializeToString,
74  response_deserializer=messages_pb2.SimpleResponse.FromString)
75  call = multicallable(messages_pb2.SimpleRequest())
76  response = await call
77 
78  # Check that all interceptors were executed, and were executed
79  # in the right order.
80  self.assertSequenceEqual(interceptors_executed, interceptors)
81 
82  self.assertIsInstance(response, messages_pb2.SimpleResponse)
83 
84  @unittest.expectedFailure
85  # TODO(https://github.com/grpc/grpc/issues/20144) Once metadata support is
86  # implemented in the client-side, this test must be implemented.
87  def test_modify_metadata(self):
88  raise NotImplementedError()
89 
90  @unittest.expectedFailure
91  # TODO(https://github.com/grpc/grpc/issues/20532) Once credentials support is
92  # implemented in the client-side, this test must be implemented.
93  def test_modify_credentials(self):
94  raise NotImplementedError()
95 
96  async def test_status_code_Ok(self):
97 
98  class StatusCodeOkInterceptor(aio.UnaryUnaryClientInterceptor):
99  """Interceptor used for observing status code Ok returned by the RPC"""
100 
101  def __init__(self):
103 
104  async def intercept_unary_unary(self, continuation,
105  client_call_details, request):
106  call = await continuation(client_call_details, request)
107  code = await call.code()
108  if code == grpc.StatusCode.OK:
109  self.status_code_Ok_observed = True
110 
111  return call
112 
113  interceptor = StatusCodeOkInterceptor()
114 
115  async with aio.insecure_channel(self._server_target,
116  interceptors=[interceptor]) as channel:
117 
118  # when no error StatusCode.OK must be observed
119  multicallable = channel.unary_unary(
120  '/grpc.testing.TestService/UnaryCall',
121  request_serializer=messages_pb2.SimpleRequest.SerializeToString,
122  response_deserializer=messages_pb2.SimpleResponse.FromString)
123 
124  await multicallable(messages_pb2.SimpleRequest())
125 
126  self.assertTrue(interceptor.status_code_Ok_observed)
127 
128  async def test_add_timeout(self):
129 
130  class TimeoutInterceptor(aio.UnaryUnaryClientInterceptor):
131  """Interceptor used for adding a timeout to the RPC"""
132 
133  async def intercept_unary_unary(self, continuation,
134  client_call_details, request):
135  new_client_call_details = aio.ClientCallDetails(
136  method=client_call_details.method,
137  timeout=_constants.UNARY_CALL_WITH_SLEEP_VALUE / 2,
138  metadata=client_call_details.metadata,
139  credentials=client_call_details.credentials,
140  wait_for_ready=client_call_details.wait_for_ready)
141  return await continuation(new_client_call_details, request)
142 
143  interceptor = TimeoutInterceptor()
144 
145  async with aio.insecure_channel(self._server_target,
146  interceptors=[interceptor]) as channel:
147 
148  multicallable = channel.unary_unary(
149  '/grpc.testing.TestService/UnaryCallWithSleep',
150  request_serializer=messages_pb2.SimpleRequest.SerializeToString,
151  response_deserializer=messages_pb2.SimpleResponse.FromString)
152 
153  call = multicallable(messages_pb2.SimpleRequest())
154 
155  with self.assertRaises(aio.AioRpcError) as exception_context:
156  await call
157 
158  self.assertEqual(exception_context.exception.code(),
159  grpc.StatusCode.DEADLINE_EXCEEDED)
160 
161  self.assertTrue(call.done())
162  self.assertEqual(grpc.StatusCode.DEADLINE_EXCEEDED, await
163  call.code())
164 
165  async def test_retry(self):
166 
167  class RetryInterceptor(aio.UnaryUnaryClientInterceptor):
168  """Simulates a Retry Interceptor which ends up by making
169  two RPC calls."""
170 
171  def __init__(self):
172  self.calls = []
173 
174  async def intercept_unary_unary(self, continuation,
175  client_call_details, request):
176 
177  new_client_call_details = aio.ClientCallDetails(
178  method=client_call_details.method,
179  timeout=_constants.UNARY_CALL_WITH_SLEEP_VALUE / 2,
180  metadata=client_call_details.metadata,
181  credentials=client_call_details.credentials,
182  wait_for_ready=client_call_details.wait_for_ready)
183 
184  try:
185  call = await continuation(new_client_call_details, request)
186  await call
187  except grpc.RpcError:
188  pass
189 
190  self.calls.append(call)
191 
192  new_client_call_details = aio.ClientCallDetails(
193  method=client_call_details.method,
194  timeout=None,
195  metadata=client_call_details.metadata,
196  credentials=client_call_details.credentials,
197  wait_for_ready=client_call_details.wait_for_ready)
198 
199  call = await continuation(new_client_call_details, request)
200  self.calls.append(call)
201  return call
202 
203  interceptor = RetryInterceptor()
204 
205  async with aio.insecure_channel(self._server_target,
206  interceptors=[interceptor]) as channel:
207 
208  multicallable = channel.unary_unary(
209  '/grpc.testing.TestService/UnaryCallWithSleep',
210  request_serializer=messages_pb2.SimpleRequest.SerializeToString,
211  response_deserializer=messages_pb2.SimpleResponse.FromString)
212 
213  call = multicallable(messages_pb2.SimpleRequest())
214 
215  await call
216 
217  self.assertEqual(grpc.StatusCode.OK, await call.code())
218 
219  # Check that two calls were made, first one finishing with
220  # a deadline and second one finishing ok..
221  self.assertEqual(len(interceptor.calls), 2)
222  self.assertEqual(await interceptor.calls[0].code(),
223  grpc.StatusCode.DEADLINE_EXCEEDED)
224  self.assertEqual(await interceptor.calls[1].code(),
225  grpc.StatusCode.OK)
226 
227  async def test_rpcresponse(self):
228 
229  class Interceptor(aio.UnaryUnaryClientInterceptor):
230  """Raw responses are seen as reegular calls"""
231 
232  async def intercept_unary_unary(self, continuation,
233  client_call_details, request):
234  call = await continuation(client_call_details, request)
235  response = await call
236  return call
237 
238  class ResponseInterceptor(aio.UnaryUnaryClientInterceptor):
239  """Return a raw response"""
240  response = messages_pb2.SimpleResponse()
241 
242  async def intercept_unary_unary(self, continuation,
243  client_call_details, request):
244  return ResponseInterceptor.response
245 
246  interceptor, interceptor_response = Interceptor(), ResponseInterceptor()
247 
248  async with aio.insecure_channel(
249  self._server_target,
250  interceptors=[interceptor, interceptor_response]) as channel:
251 
252  multicallable = channel.unary_unary(
253  '/grpc.testing.TestService/UnaryCall',
254  request_serializer=messages_pb2.SimpleRequest.SerializeToString,
255  response_deserializer=messages_pb2.SimpleResponse.FromString)
256 
257  call = multicallable(messages_pb2.SimpleRequest())
258  response = await call
259 
260  # Check that the response returned is the one returned by the
261  # interceptor
262  self.assertEqual(id(response), id(ResponseInterceptor.response))
263 
264  # Check all of the UnaryUnaryCallResponse attributes
265  self.assertTrue(call.done())
266  self.assertFalse(call.cancel())
267  self.assertFalse(call.cancelled())
268  self.assertEqual(await call.code(), grpc.StatusCode.OK)
269  self.assertEqual(await call.details(), '')
270  self.assertEqual(await call.initial_metadata(), None)
271  self.assertEqual(await call.trailing_metadata(), None)
272  self.assertEqual(await call.debug_error_string(), None)
273 
274 
276 
277  async def setUp(self):
278  self._server_target, self._server = await start_test_server()
279 
280  async def tearDown(self):
281  await self._server.stop(None)
282 
283  async def test_call_ok(self):
284 
285  class Interceptor(aio.UnaryUnaryClientInterceptor):
286 
287  async def intercept_unary_unary(self, continuation,
288  client_call_details, request):
289  call = await continuation(client_call_details, request)
290  return call
291 
292  async with aio.insecure_channel(self._server_target,
293  interceptors=[Interceptor()
294  ]) as channel:
295 
296  multicallable = channel.unary_unary(
297  '/grpc.testing.TestService/UnaryCall',
298  request_serializer=messages_pb2.SimpleRequest.SerializeToString,
299  response_deserializer=messages_pb2.SimpleResponse.FromString)
300  call = multicallable(messages_pb2.SimpleRequest())
301  response = await call
302 
303  self.assertTrue(call.done())
304  self.assertFalse(call.cancelled())
305  self.assertEqual(type(response), messages_pb2.SimpleResponse)
306  self.assertEqual(await call.code(), grpc.StatusCode.OK)
307  self.assertEqual(await call.details(), '')
308  self.assertEqual(await call.initial_metadata(), aio.Metadata())
309  self.assertEqual(await call.trailing_metadata(), aio.Metadata())
310 
311  async def test_call_ok_awaited(self):
312 
313  class Interceptor(aio.UnaryUnaryClientInterceptor):
314 
315  async def intercept_unary_unary(self, continuation,
316  client_call_details, request):
317  call = await continuation(client_call_details, request)
318  await call
319  return call
320 
321  async with aio.insecure_channel(self._server_target,
322  interceptors=[Interceptor()
323  ]) as channel:
324 
325  multicallable = channel.unary_unary(
326  '/grpc.testing.TestService/UnaryCall',
327  request_serializer=messages_pb2.SimpleRequest.SerializeToString,
328  response_deserializer=messages_pb2.SimpleResponse.FromString)
329  call = multicallable(messages_pb2.SimpleRequest())
330  response = await call
331 
332  self.assertTrue(call.done())
333  self.assertFalse(call.cancelled())
334  self.assertEqual(type(response), messages_pb2.SimpleResponse)
335  self.assertEqual(await call.code(), grpc.StatusCode.OK)
336  self.assertEqual(await call.details(), '')
337  self.assertEqual(await call.initial_metadata(), aio.Metadata())
338  self.assertEqual(await call.trailing_metadata(), aio.Metadata())
339 
340  async def test_call_rpc_error(self):
341 
342  class Interceptor(aio.UnaryUnaryClientInterceptor):
343 
344  async def intercept_unary_unary(self, continuation,
345  client_call_details, request):
346  call = await continuation(client_call_details, request)
347  return call
348 
349  async with aio.insecure_channel(self._server_target,
350  interceptors=[Interceptor()
351  ]) as channel:
352 
353  multicallable = channel.unary_unary(
354  '/grpc.testing.TestService/UnaryCallWithSleep',
355  request_serializer=messages_pb2.SimpleRequest.SerializeToString,
356  response_deserializer=messages_pb2.SimpleResponse.FromString)
357 
358  call = multicallable(
360  timeout=_constants.UNARY_CALL_WITH_SLEEP_VALUE / 2)
361 
362  with self.assertRaises(aio.AioRpcError) as exception_context:
363  await call
364 
365  self.assertTrue(call.done())
366  self.assertFalse(call.cancelled())
367  self.assertEqual(await call.code(),
368  grpc.StatusCode.DEADLINE_EXCEEDED)
369  self.assertEqual(await call.details(), 'Deadline Exceeded')
370  self.assertEqual(await call.initial_metadata(), aio.Metadata())
371  self.assertEqual(await call.trailing_metadata(), aio.Metadata())
372 
373  async def test_call_rpc_error_awaited(self):
374 
375  class Interceptor(aio.UnaryUnaryClientInterceptor):
376 
377  async def intercept_unary_unary(self, continuation,
378  client_call_details, request):
379  call = await continuation(client_call_details, request)
380  await call
381  return call
382 
383  async with aio.insecure_channel(self._server_target,
384  interceptors=[Interceptor()
385  ]) as channel:
386 
387  multicallable = channel.unary_unary(
388  '/grpc.testing.TestService/UnaryCallWithSleep',
389  request_serializer=messages_pb2.SimpleRequest.SerializeToString,
390  response_deserializer=messages_pb2.SimpleResponse.FromString)
391 
392  call = multicallable(
394  timeout=_constants.UNARY_CALL_WITH_SLEEP_VALUE / 2)
395 
396  with self.assertRaises(aio.AioRpcError) as exception_context:
397  await call
398 
399  self.assertTrue(call.done())
400  self.assertFalse(call.cancelled())
401  self.assertEqual(await call.code(),
402  grpc.StatusCode.DEADLINE_EXCEEDED)
403  self.assertEqual(await call.details(), 'Deadline Exceeded')
404  self.assertEqual(await call.initial_metadata(), aio.Metadata())
405  self.assertEqual(await call.trailing_metadata(), aio.Metadata())
406 
407  async def test_cancel_before_rpc(self):
408 
409  interceptor_reached = asyncio.Event()
410  wait_for_ever = self.loop.create_future()
411 
412  class Interceptor(aio.UnaryUnaryClientInterceptor):
413 
414  async def intercept_unary_unary(self, continuation,
415  client_call_details, request):
416  interceptor_reached.set()
417  await wait_for_ever
418 
419  async with aio.insecure_channel(self._server_target,
420  interceptors=[Interceptor()
421  ]) as channel:
422 
423  multicallable = channel.unary_unary(
424  '/grpc.testing.TestService/UnaryCall',
425  request_serializer=messages_pb2.SimpleRequest.SerializeToString,
426  response_deserializer=messages_pb2.SimpleResponse.FromString)
427  call = multicallable(messages_pb2.SimpleRequest())
428 
429  self.assertFalse(call.cancelled())
430  self.assertFalse(call.done())
431 
432  await interceptor_reached.wait()
433  self.assertTrue(call.cancel())
434 
435  with self.assertRaises(asyncio.CancelledError):
436  await call
437 
438  self.assertTrue(call.cancelled())
439  self.assertTrue(call.done())
440  self.assertEqual(await call.code(), grpc.StatusCode.CANCELLED)
441  self.assertEqual(await call.details(),
442  _LOCAL_CANCEL_DETAILS_EXPECTATION)
443  self.assertEqual(await call.initial_metadata(), None)
444  self.assertEqual(await call.trailing_metadata(), None)
445 
446  async def test_cancel_after_rpc(self):
447 
448  interceptor_reached = asyncio.Event()
449  wait_for_ever = self.loop.create_future()
450 
451  class Interceptor(aio.UnaryUnaryClientInterceptor):
452 
453  async def intercept_unary_unary(self, continuation,
454  client_call_details, request):
455  call = await continuation(client_call_details, request)
456  await call
457  interceptor_reached.set()
458  await wait_for_ever
459 
460  async with aio.insecure_channel(self._server_target,
461  interceptors=[Interceptor()
462  ]) as channel:
463 
464  multicallable = channel.unary_unary(
465  '/grpc.testing.TestService/UnaryCall',
466  request_serializer=messages_pb2.SimpleRequest.SerializeToString,
467  response_deserializer=messages_pb2.SimpleResponse.FromString)
468  call = multicallable(messages_pb2.SimpleRequest())
469 
470  self.assertFalse(call.cancelled())
471  self.assertFalse(call.done())
472 
473  await interceptor_reached.wait()
474  self.assertTrue(call.cancel())
475 
476  with self.assertRaises(asyncio.CancelledError):
477  await call
478 
479  self.assertTrue(call.cancelled())
480  self.assertTrue(call.done())
481  self.assertEqual(await call.code(), grpc.StatusCode.CANCELLED)
482  self.assertEqual(await call.details(),
483  _LOCAL_CANCEL_DETAILS_EXPECTATION)
484  self.assertEqual(await call.initial_metadata(), None)
485  self.assertEqual(await call.trailing_metadata(), None)
486 
487  async def test_cancel_inside_interceptor_after_rpc_awaiting(self):
488 
489  class Interceptor(aio.UnaryUnaryClientInterceptor):
490 
491  async def intercept_unary_unary(self, continuation,
492  client_call_details, request):
493  call = await continuation(client_call_details, request)
494  call.cancel()
495  await call
496  return call
497 
498  async with aio.insecure_channel(self._server_target,
499  interceptors=[Interceptor()
500  ]) as channel:
501 
502  multicallable = channel.unary_unary(
503  '/grpc.testing.TestService/UnaryCall',
504  request_serializer=messages_pb2.SimpleRequest.SerializeToString,
505  response_deserializer=messages_pb2.SimpleResponse.FromString)
506  call = multicallable(messages_pb2.SimpleRequest())
507 
508  with self.assertRaises(asyncio.CancelledError):
509  await call
510 
511  self.assertTrue(call.cancelled())
512  self.assertTrue(call.done())
513  self.assertEqual(await call.code(), grpc.StatusCode.CANCELLED)
514  self.assertEqual(await call.details(),
515  _LOCAL_CANCEL_DETAILS_EXPECTATION)
516  self.assertEqual(await call.initial_metadata(), None)
517  self.assertEqual(await call.trailing_metadata(), None)
518 
519  async def test_cancel_inside_interceptor_after_rpc_not_awaiting(self):
520 
521  class Interceptor(aio.UnaryUnaryClientInterceptor):
522 
523  async def intercept_unary_unary(self, continuation,
524  client_call_details, request):
525  call = await continuation(client_call_details, request)
526  call.cancel()
527  return call
528 
529  async with aio.insecure_channel(self._server_target,
530  interceptors=[Interceptor()
531  ]) as channel:
532 
533  multicallable = channel.unary_unary(
534  '/grpc.testing.TestService/UnaryCall',
535  request_serializer=messages_pb2.SimpleRequest.SerializeToString,
536  response_deserializer=messages_pb2.SimpleResponse.FromString)
537  call = multicallable(messages_pb2.SimpleRequest())
538 
539  with self.assertRaises(asyncio.CancelledError):
540  await call
541 
542  self.assertTrue(call.cancelled())
543  self.assertTrue(call.done())
544  self.assertEqual(await call.code(), grpc.StatusCode.CANCELLED)
545  self.assertEqual(await call.details(),
546  _LOCAL_CANCEL_DETAILS_EXPECTATION)
547  self.assertEqual(await call.initial_metadata(), aio.Metadata())
548  self.assertEqual(
549  await call.trailing_metadata(), aio.Metadata(),
550  "When the raw response is None, empty metadata is returned")
551 
552  async def test_initial_metadata_modification(self):
553 
554  class Interceptor(aio.UnaryUnaryClientInterceptor):
555 
556  async def intercept_unary_unary(self, continuation,
557  client_call_details, request):
558  new_metadata = aio.Metadata(*client_call_details.metadata,
559  *_INITIAL_METADATA_TO_INJECT)
560  new_details = aio.ClientCallDetails(
561  method=client_call_details.method,
562  timeout=client_call_details.timeout,
563  metadata=new_metadata,
564  credentials=client_call_details.credentials,
565  wait_for_ready=client_call_details.wait_for_ready,
566  )
567  return await continuation(new_details, request)
568 
569  async with aio.insecure_channel(self._server_target,
570  interceptors=[Interceptor()
571  ]) as channel:
572  stub = test_pb2_grpc.TestServiceStub(channel)
573  call = stub.UnaryCall(messages_pb2.SimpleRequest())
574 
575  # Expected to see the echoed initial metadata
576  self.assertTrue(
577  _common.seen_metadatum(
578  expected_key=_INITIAL_METADATA_KEY,
579  expected_value=_INITIAL_METADATA_TO_INJECT[
580  _INITIAL_METADATA_KEY],
581  actual=await call.initial_metadata(),
582  ))
583  # Expected to see the echoed trailing metadata
584  self.assertTrue(
585  _common.seen_metadatum(
586  expected_key=_TRAILING_METADATA_KEY,
587  expected_value=_INITIAL_METADATA_TO_INJECT[
588  _TRAILING_METADATA_KEY],
589  actual=await call.trailing_metadata(),
590  ))
591  self.assertEqual(await call.code(), grpc.StatusCode.OK)
592 
593  async def test_add_done_callback_before_finishes(self):
594  called = asyncio.Event()
595  interceptor_can_continue = asyncio.Event()
596 
597  def callback(call):
598  called.set()
599 
600  class Interceptor(aio.UnaryUnaryClientInterceptor):
601 
602  async def intercept_unary_unary(self, continuation,
603  client_call_details, request):
604 
605  await interceptor_can_continue.wait()
606  call = await continuation(client_call_details, request)
607  return call
608 
609  async with aio.insecure_channel(self._server_target,
610  interceptors=[Interceptor()
611  ]) as channel:
612 
613  multicallable = channel.unary_unary(
614  '/grpc.testing.TestService/UnaryCall',
615  request_serializer=messages_pb2.SimpleRequest.SerializeToString,
616  response_deserializer=messages_pb2.SimpleResponse.FromString)
617  call = multicallable(messages_pb2.SimpleRequest())
618  call.add_done_callback(callback)
619  interceptor_can_continue.set()
620  await call
621 
622  try:
623  await asyncio.wait_for(
624  called.wait(),
625  timeout=_TIMEOUT_CHECK_IF_CALLBACK_WAS_CALLED)
626  except:
627  self.fail("Callback was not called")
628 
629  async def test_add_done_callback_after_finishes(self):
630  called = asyncio.Event()
631 
632  def callback(call):
633  called.set()
634 
635  class Interceptor(aio.UnaryUnaryClientInterceptor):
636 
637  async def intercept_unary_unary(self, continuation,
638  client_call_details, request):
639 
640  call = await continuation(client_call_details, request)
641  return call
642 
643  async with aio.insecure_channel(self._server_target,
644  interceptors=[Interceptor()
645  ]) as channel:
646 
647  multicallable = channel.unary_unary(
648  '/grpc.testing.TestService/UnaryCall',
649  request_serializer=messages_pb2.SimpleRequest.SerializeToString,
650  response_deserializer=messages_pb2.SimpleResponse.FromString)
651  call = multicallable(messages_pb2.SimpleRequest())
652 
653  await call
654 
655  call.add_done_callback(callback)
656 
657  try:
658  await asyncio.wait_for(
659  called.wait(),
660  timeout=_TIMEOUT_CHECK_IF_CALLBACK_WAS_CALLED)
661  except:
662  self.fail("Callback was not called")
663 
664  async def test_add_done_callback_after_finishes_before_await(self):
665  called = asyncio.Event()
666 
667  def callback(call):
668  called.set()
669 
670  class Interceptor(aio.UnaryUnaryClientInterceptor):
671 
672  async def intercept_unary_unary(self, continuation,
673  client_call_details, request):
674 
675  call = await continuation(client_call_details, request)
676  return call
677 
678  async with aio.insecure_channel(self._server_target,
679  interceptors=[Interceptor()
680  ]) as channel:
681 
682  multicallable = channel.unary_unary(
683  '/grpc.testing.TestService/UnaryCall',
684  request_serializer=messages_pb2.SimpleRequest.SerializeToString,
685  response_deserializer=messages_pb2.SimpleResponse.FromString)
686  call = multicallable(messages_pb2.SimpleRequest())
687 
688  call.add_done_callback(callback)
689 
690  await call
691 
692  try:
693  await asyncio.wait_for(
694  called.wait(),
695  timeout=_TIMEOUT_CHECK_IF_CALLBACK_WAS_CALLED)
696  except:
697  self.fail("Callback was not called")
698 
699 
700 if __name__ == '__main__':
701  logging.basicConfig(level=logging.DEBUG)
702  unittest.main(verbosity=2)
messages_pb2.SimpleRequest
SimpleRequest
Definition: messages_pb2.py:597
tests_aio.unit
Definition: src/python/grpcio_tests/tests_aio/unit/__init__.py:1
tests_aio.unit._test_base.AioTestBase.loop
def loop(self)
Definition: _test_base.py:55
tests_aio.unit._test_server
Definition: tests_aio/unit/_test_server.py:1
capstone.range
range
Definition: third_party/bloaty/third_party/capstone/bindings/python/capstone/__init__.py:6
tests_aio.unit.client_unary_unary_interceptor_test.TestInterceptedUnaryUnaryCall._server
_server
Definition: client_unary_unary_interceptor_test.py:278
tests_aio.unit.client_unary_unary_interceptor_test.TestInterceptedUnaryUnaryCall.setUp
def setUp(self)
Definition: client_unary_unary_interceptor_test.py:277
tests_aio.unit._test_server.start_test_server
def start_test_server(port=0, secure=False, server_credentials=None, interceptors=None)
Definition: tests_aio/unit/_test_server.py:128
tests_aio.unit._test_base
Definition: _test_base.py:1
grpc.RpcError
Definition: src/python/grpcio/grpc/__init__.py:302
tests_aio.unit.client_unary_unary_interceptor_test.TestUnaryUnaryClientInterceptor._server
_server
Definition: client_unary_unary_interceptor_test.py:41
tests_aio.unit.client_unary_unary_interceptor_test.TestUnaryUnaryClientInterceptor.calls
calls
Definition: client_unary_unary_interceptor_test.py:172
grpc::experimental
Definition: include/grpcpp/channel.h:46
tests_aio.unit.client_unary_unary_interceptor_test.TestUnaryUnaryClientInterceptor.setUp
def setUp(self)
Definition: client_unary_unary_interceptor_test.py:40
tests_aio.unit.client_unary_unary_interceptor_test.TestUnaryUnaryClientInterceptor
Definition: client_unary_unary_interceptor_test.py:38
callback
static void callback(void *arg, int status, int timeouts, struct hostent *host)
Definition: acountry.c:224
test_retry
static void test_retry(grpc_end2end_test_config config)
Definition: retry.cc:97
stop
static const char stop[]
Definition: benchmark-async-pummel.c:35
tests_aio.unit.client_unary_unary_interceptor_test.TestInterceptedUnaryUnaryCall
Definition: client_unary_unary_interceptor_test.py:275
asyncio_get_stats.type
type
Definition: asyncio_get_stats.py:37
code
Definition: bloaty/third_party/zlib/contrib/infback9/inftree9.h:24
len
int len
Definition: abseil-cpp/absl/base/internal/low_level_alloc_test.cc:46
messages_pb2.SimpleResponse
SimpleResponse
Definition: messages_pb2.py:604
google::protobuf.internal.python_message.__init__
__init__
Definition: bloaty/third_party/protobuf/python/google/protobuf/internal/python_message.py:557
id
uint32_t id
Definition: flow_control_fuzzer.cc:70
tests_aio.unit._test_base.AioTestBase
Definition: _test_base.py:49
tests_aio.unit.client_unary_unary_interceptor_test.TestUnaryUnaryClientInterceptor.status_code_Ok_observed
status_code_Ok_observed
Definition: client_unary_unary_interceptor_test.py:102


grpc
Author(s):
autogenerated on Fri May 16 2025 02:57:55