20 from src.proto.grpc.testing
import messages_pb2
21 from src.proto.grpc.testing
import test_pb2_grpc
27 _NUM_STREAM_RESPONSES = 5
28 _NUM_STREAM_REQUESTS = 5
29 _RESPONSE_PAYLOAD_SIZE = 7
36 return await continuation(client_call_details, request_iterator)
38 def assert_in_final_state(self, test: unittest.TestCase):
43 aio.StreamStreamClientInterceptor):
52 def assert_in_final_state(self, test: unittest.TestCase):
53 test.assertEqual(_NUM_STREAM_REQUESTS,
55 test.assertEqual(_NUM_STREAM_RESPONSES,
64 async
def tearDown(self):
67 async
def test_intercepts(self):
69 for interceptor_class
in (
70 _StreamStreamInterceptorEmpty,
71 _StreamStreamInterceptorWithRequestAndResponseIterator):
73 with self.subTest(name=interceptor_class):
74 interceptor = interceptor_class()
75 channel = aio.insecure_channel(self._server_target,
76 interceptors=[interceptor])
77 stub = test_pb2_grpc.TestServiceStub(channel)
81 request.response_parameters.append(
83 size=_RESPONSE_PAYLOAD_SIZE))
85 async
def request_iterator():
86 for _
in range(_NUM_STREAM_REQUESTS):
89 call = stub.FullDuplexCall(request_iterator())
91 await call.wait_for_connection()
94 async
for response
in call:
96 self.assertIsInstance(
97 response, messages_pb2.StreamingOutputCallResponse)
98 self.assertEqual(_RESPONSE_PAYLOAD_SIZE,
99 len(response.payload.body))
101 self.assertEqual(response_cnt, _NUM_STREAM_RESPONSES)
102 self.assertEqual(await call.code(), grpc.StatusCode.OK)
103 self.assertEqual(await call.initial_metadata(), aio.Metadata())
104 self.assertEqual(await call.trailing_metadata(), aio.Metadata())
105 self.assertEqual(await call.details(),
'')
106 self.assertEqual(await call.debug_error_string(),
'')
107 self.assertEqual(call.cancel(),
False)
108 self.assertEqual(call.cancelled(),
False)
109 self.assertEqual(call.done(),
True)
111 interceptor.assert_in_final_state(self)
113 await channel.close()
115 async
def test_intercepts_using_write_and_read(self):
116 for interceptor_class
in (
117 _StreamStreamInterceptorEmpty,
118 _StreamStreamInterceptorWithRequestAndResponseIterator):
120 with self.subTest(name=interceptor_class):
121 interceptor = interceptor_class()
122 channel = aio.insecure_channel(self._server_target,
123 interceptors=[interceptor])
124 stub = test_pb2_grpc.TestServiceStub(channel)
128 request.response_parameters.append(
130 size=_RESPONSE_PAYLOAD_SIZE))
132 call = stub.FullDuplexCall()
134 for _
in range(_NUM_STREAM_RESPONSES):
135 await call.write(request)
136 response = await call.read()
137 self.assertIsInstance(
138 response, messages_pb2.StreamingOutputCallResponse)
139 self.assertEqual(_RESPONSE_PAYLOAD_SIZE,
140 len(response.payload.body))
142 await call.done_writing()
144 self.assertEqual(await call.code(), grpc.StatusCode.OK)
145 self.assertEqual(await call.initial_metadata(), aio.Metadata())
146 self.assertEqual(await call.trailing_metadata(), aio.Metadata())
147 self.assertEqual(await call.details(),
'')
148 self.assertEqual(await call.debug_error_string(),
'')
149 self.assertEqual(call.cancel(),
False)
150 self.assertEqual(call.cancelled(),
False)
151 self.assertEqual(call.done(),
True)
153 interceptor.assert_in_final_state(self)
155 await channel.close()
157 async
def test_multiple_interceptors_request_iterator(self):
158 for interceptor_class
in (
159 _StreamStreamInterceptorEmpty,
160 _StreamStreamInterceptorWithRequestAndResponseIterator):
162 with self.subTest(name=interceptor_class):
164 interceptors = [interceptor_class(), interceptor_class()]
165 channel = aio.insecure_channel(self._server_target,
166 interceptors=interceptors)
167 stub = test_pb2_grpc.TestServiceStub(channel)
171 request.response_parameters.append(
173 size=_RESPONSE_PAYLOAD_SIZE))
175 call = stub.FullDuplexCall()
177 for _
in range(_NUM_STREAM_RESPONSES):
178 await call.write(request)
179 response = await call.read()
180 self.assertIsInstance(
181 response, messages_pb2.StreamingOutputCallResponse)
182 self.assertEqual(_RESPONSE_PAYLOAD_SIZE,
183 len(response.payload.body))
185 await call.done_writing()
187 self.assertEqual(await call.code(), grpc.StatusCode.OK)
188 self.assertEqual(await call.initial_metadata(), aio.Metadata())
189 self.assertEqual(await call.trailing_metadata(), aio.Metadata())
190 self.assertEqual(await call.details(),
'')
191 self.assertEqual(await call.debug_error_string(),
'')
192 self.assertEqual(call.cancel(),
False)
193 self.assertEqual(call.cancelled(),
False)
194 self.assertEqual(call.done(),
True)
196 for interceptor
in interceptors:
197 interceptor.assert_in_final_state(self)
199 await channel.close()
202 if __name__ ==
'__main__':
203 logging.basicConfig(level=logging.DEBUG)
204 unittest.main(verbosity=2)