14 """Tests server and client side compression."""
31 _STREAM_URI =
'Meffod'
32 _UNARY_URI =
'MeffodMan'
37 request_streaming =
True
38 response_streaming =
True
39 request_deserializer =
None
40 response_serializer =
None
43 for request
in request_iterator:
49 request_streaming =
False
50 response_streaming =
False
51 request_deserializer =
None
52 response_serializer =
None
65 if handler_call_details.method == _STREAM_URI:
66 return _STREAMING_METHOD_HANDLER
68 return _UNARY_METHOD_HANDLER
120 max_workers=test_constants.THREAD_CONCURRENCY)
121 self.
_server.add_generic_rpc_handlers((_GENERIC_HANDLER,))
130 multi_callable = channel.stream_stream(_STREAM_URI)
131 request_iterator =
_Pipe(())
134 request_iterator.close()
136 self.assertIs(response_iterator.code(), grpc.StatusCode.CANCELLED)
140 multi_callable = channel.stream_stream(_STREAM_URI)
141 request_iterator =
_Pipe((b
'abc',))
143 next(response_iterator)
145 request_iterator.close()
147 self.assertIs(response_iterator.code(), grpc.StatusCode.CANCELLED)
151 self.
_port))
as channel:
152 multi_callable = channel.stream_stream(_STREAM_URI)
153 request_iterator =
_Pipe((b
'abc',))
155 next(response_iterator)
156 request_iterator.close()
158 self.assertIs(response_iterator.code(), grpc.StatusCode.CANCELLED)
162 self.
_port))
as channel:
163 multi_callable = channel.stream_stream(_STREAM_URI)
164 request_iterators = tuple(
166 for _
in range(test_constants.THREAD_CONCURRENCY))
167 response_iterators = []
168 for request_iterator
in request_iterators:
170 next(response_iterator)
171 response_iterators.append(response_iterator)
172 for request_iterator
in request_iterators:
173 request_iterator.close()
175 for response_iterator
in response_iterators:
176 self.assertIs(response_iterator.code(), grpc.StatusCode.CANCELLED)
180 multi_callable = channel.stream_stream(_STREAM_URI)
181 request_iterator =
_Pipe((b
'abc',))
183 next(response_iterator)
185 end = start + _MORE_TIME
187 def sleep_some_time_then_close():
188 time.sleep(_SOME_TIME)
191 for _
in range(test_constants.THREAD_CONCURRENCY):
192 close_thread = threading.Thread(target=sleep_some_time_then_close)
195 request_iterator.add(b
'def')
197 if end < time.time():
199 request_iterator.close()
201 self.assertIs(response_iterator.code(), grpc.StatusCode.CANCELLED)
205 self.
_port))
as channel:
206 stream_multi_callable = channel.stream_stream(_STREAM_URI)
207 endless_iterator = itertools.repeat(b
'abc')
208 stream_response_iterator = stream_multi_callable(endless_iterator)
209 future = channel.unary_unary(_UNARY_URI).
future(b
'abc')
211 def on_done_callback(future):
212 raise Exception(
"This should not cause a deadlock.")
214 future.add_done_callback(on_done_callback)
218 if __name__ ==
'__main__':
219 logging.basicConfig()
220 unittest.main(verbosity=2)