17 import distutils.spawn
31 import tests.protoc_plugin.protos.payload.test_payload_pb2
as payload_pb2
32 import tests.protoc_plugin.protos.requests.r.test_requests_pb2
as request_pb2
33 import tests.protoc_plugin.protos.responses.test_responses_pb2
as response_pb2
34 import tests.protoc_plugin.protos.service.test_service_pb2_grpc
as service_pb2_grpc
39 STUB_IDENTIFIER =
'TestServiceStub'
40 SERVICER_IDENTIFIER =
'TestServiceServicer'
41 ADD_SERVICER_TO_SERVER_IDENTIFIER =
'add_TestServiceServicer_to_server'
51 @contextlib.contextmanager
60 @contextlib.contextmanager
76 response = response_pb2.SimpleResponse()
77 response.payload.payload_type = payload_pb2.COMPRESSABLE
78 response.payload.payload_compressable =
'a' * request.response_size
83 for parameter
in request.response_parameters:
84 response = response_pb2.StreamingOutputCallResponse()
85 response.payload.payload_type = payload_pb2.COMPRESSABLE
86 response.payload.payload_compressable =
'a' * parameter.size
91 response = response_pb2.StreamingInputCallResponse()
92 aggregated_payload_size = 0
93 for request
in request_iter:
94 aggregated_payload_size +=
len(request.payload.payload_compressable)
95 response.aggregated_payload_size = aggregated_payload_size
100 for request
in request_iter:
101 for parameter
in request.response_parameters:
102 response = response_pb2.StreamingOutputCallResponse()
103 response.payload.payload_type = payload_pb2.COMPRESSABLE
104 response.payload.payload_compressable =
'a' * parameter.size
110 for request
in request_iter:
111 for parameter
in request.response_parameters:
112 response = response_pb2.StreamingOutputCallResponse()
113 response.payload.payload_type = payload_pb2.COMPRESSABLE
114 response.payload.payload_compressable =
'a' * parameter.size
116 responses.append(response)
117 for response
in responses:
122 collections.namedtuple(
'_Service', (
127 """A live and running service.
130 servicer_methods: The _ServicerMethods servicing RPCs.
131 server: The grpc.Server servicing RPCs.
132 stub: A stub on which to invoke RPCs.
137 """Provides a servicer backend and a stub.
140 A _Service with which to test RPCs.
146 def UnaryCall(self, request, context):
147 return servicer_methods.UnaryCall(request, context)
149 def StreamingOutputCall(self, request, context):
150 return servicer_methods.StreamingOutputCall(request, context)
152 def StreamingInputCall(self, request_iterator, context):
153 return servicer_methods.StreamingInputCall(request_iterator,
156 def FullDuplexCall(self, request_iterator, context):
157 return servicer_methods.FullDuplexCall(request_iterator, context)
159 def HalfDuplexCall(self, request_iterator, context):
160 return servicer_methods.HalfDuplexCall(request_iterator, context)
162 server = test_common.test_server()
163 getattr(service_pb2_grpc, ADD_SERVICER_TO_SERVER_IDENTIFIER)(Servicer(),
165 port = server.add_insecure_port(
'[::]:0')
168 stub = getattr(service_pb2_grpc, STUB_IDENTIFIER)(channel)
169 return _Service(servicer_methods, server, stub)
173 """Provides a servicer backend that fails to implement methods and its stub.
176 A _Service with which to test RPCs. The returned _Service's
177 servicer_methods implements none of the methods required of it.
183 server = test_common.test_server()
184 getattr(service_pb2_grpc, ADD_SERVICER_TO_SERVER_IDENTIFIER)(Servicer(),
186 port = server.add_insecure_port(
'[::]:0')
189 stub = getattr(service_pb2_grpc, STUB_IDENTIFIER)(channel)
190 return _Service(
None, server, stub)
195 request = request_pb2.StreamingInputCallRequest()
196 request.payload.payload_type = payload_pb2.COMPRESSABLE
197 request.payload.payload_compressable =
'a'
202 request = request_pb2.StreamingOutputCallRequest()
204 request.response_parameters.add(size=sizes[0], interval_us=0)
205 request.response_parameters.add(size=sizes[1], interval_us=0)
206 request.response_parameters.add(size=sizes[2], interval_us=0)
211 request = request_pb2.StreamingOutputCallRequest()
212 request.response_parameters.add(size=1, interval_us=0)
214 request = request_pb2.StreamingOutputCallRequest()
215 request.response_parameters.add(size=2, interval_us=0)
216 request.response_parameters.add(size=3, interval_us=0)
221 """Test case for the gRPC Python protoc-plugin.
223 While reading these tests, remember that the futures API
224 (`stub.method.future()`) only gives futures for the *response-unary*
225 methods and does not exist for response-streaming methods.
230 self.assertIsNotNone(getattr(service_pb2_grpc, STUB_IDENTIFIER,
None))
231 self.assertIsNotNone(
232 getattr(service_pb2_grpc, SERVICER_IDENTIFIER,
None))
233 self.assertIsNotNone(
234 getattr(service_pb2_grpc, ADD_SERVICER_TO_SERVER_IDENTIFIER,
None))
238 self.assertIsNotNone(service.servicer_methods)
239 self.assertIsNotNone(service.server)
240 self.assertIsNotNone(service.stub)
241 service.server.stop(
None)
245 request = request_pb2.SimpleRequest(response_size=13)
247 service.stub.UnaryCall(request)
248 self.assertIs(exception_context.exception.code(),
249 grpc.StatusCode.UNIMPLEMENTED)
250 service.server.stop(
None)
254 request = request_pb2.SimpleRequest(response_size=13)
255 response = service.stub.UnaryCall(request)
256 expected_response = service.servicer_methods.UnaryCall(
257 request,
'not a real context!')
258 self.assertEqual(expected_response, response)
259 service.server.stop(
None)
263 request = request_pb2.SimpleRequest(response_size=13)
265 with service.servicer_methods.pause():
266 response_future = service.stub.UnaryCall.future(request)
267 response = response_future.result()
268 expected_response = service.servicer_methods.UnaryCall(
269 request,
'not a real RpcContext!')
270 self.assertEqual(expected_response, response)
271 service.server.stop(
None)
275 request = request_pb2.SimpleRequest(response_size=13)
276 with service.servicer_methods.pause():
277 response_future = service.stub.UnaryCall.future(
278 request, timeout=test_constants.SHORT_TIMEOUT)
280 response_future.result()
281 self.assertIs(exception_context.exception.code(),
282 grpc.StatusCode.DEADLINE_EXCEEDED)
283 self.assertIs(response_future.code(), grpc.StatusCode.DEADLINE_EXCEEDED)
284 service.server.stop(
None)
288 request = request_pb2.SimpleRequest(response_size=13)
289 with service.servicer_methods.pause():
290 response_future = service.stub.UnaryCall.future(request)
291 response_future.cancel()
292 self.assertTrue(response_future.cancelled())
293 self.assertIs(response_future.code(), grpc.StatusCode.CANCELLED)
294 service.server.stop(
None)
298 request = request_pb2.SimpleRequest(response_size=13)
299 with service.servicer_methods.fail():
300 response_future = service.stub.UnaryCall.future(request)
301 self.assertIsNotNone(response_future.exception())
302 self.assertIs(response_future.code(), grpc.StatusCode.UNKNOWN)
303 service.server.stop(
None)
308 responses = service.stub.StreamingOutputCall(request)
309 expected_responses = service.servicer_methods.StreamingOutputCall(
310 request,
'not a real RpcContext!')
311 for expected_response, response
in moves.zip_longest(
312 expected_responses, responses):
313 self.assertEqual(expected_response, response)
314 service.server.stop(
None)
319 with service.servicer_methods.pause():
320 responses = service.stub.StreamingOutputCall(
321 request, timeout=test_constants.SHORT_TIMEOUT)
324 self.assertIs(exception_context.exception.code(),
325 grpc.StatusCode.DEADLINE_EXCEEDED)
326 service.server.stop(
None)
331 responses = service.stub.StreamingOutputCall(request)
336 self.assertIs(responses.code(), grpc.StatusCode.CANCELLED)
337 service.server.stop(
None)
342 with service.servicer_methods.fail():
343 responses = service.stub.StreamingOutputCall(request)
344 self.assertIsNotNone(responses)
347 self.assertIs(exception_context.exception.code(),
348 grpc.StatusCode.UNKNOWN)
349 service.server.stop(
None)
353 response = service.stub.StreamingInputCall(
355 expected_response = service.servicer_methods.StreamingInputCall(
357 self.assertEqual(expected_response, response)
358 service.server.stop(
None)
362 with service.servicer_methods.pause():
363 response_future = service.stub.StreamingInputCall.future(
365 response = response_future.result()
366 expected_response = service.servicer_methods.StreamingInputCall(
368 self.assertEqual(expected_response, response)
369 service.server.stop(
None)
373 with service.servicer_methods.pause():
374 response_future = service.stub.StreamingInputCall.future(
376 timeout=test_constants.SHORT_TIMEOUT)
378 response_future.result()
379 self.assertIsInstance(response_future.exception(),
grpc.RpcError)
380 self.assertIs(response_future.exception().
code(),
381 grpc.StatusCode.DEADLINE_EXCEEDED)
382 self.assertIs(exception_context.exception.code(),
383 grpc.StatusCode.DEADLINE_EXCEEDED)
384 service.server.stop(
None)
388 with service.servicer_methods.pause():
389 response_future = service.stub.StreamingInputCall.future(
391 response_future.cancel()
392 self.assertTrue(response_future.cancelled())
394 response_future.result()
395 service.server.stop(
None)
399 with service.servicer_methods.fail():
400 response_future = service.stub.StreamingInputCall.future(
402 self.assertIsNotNone(response_future.exception())
403 self.assertIs(response_future.code(), grpc.StatusCode.UNKNOWN)
404 service.server.stop(
None)
409 expected_responses = service.servicer_methods.FullDuplexCall(
411 for expected_response, response
in moves.zip_longest(
412 expected_responses, responses):
413 self.assertEqual(expected_response, response)
414 service.server.stop(
None)
419 with service.servicer_methods.pause():
420 responses = service.stub.FullDuplexCall(
421 request_iterator, timeout=test_constants.SHORT_TIMEOUT)
424 self.assertIs(exception_context.exception.code(),
425 grpc.StatusCode.DEADLINE_EXCEEDED)
426 service.server.stop(
None)
431 responses = service.stub.FullDuplexCall(request_iterator)
436 self.assertIs(exception_context.exception.code(),
437 grpc.StatusCode.CANCELLED)
438 service.server.stop(
None)
443 with service.servicer_methods.fail():
444 responses = service.stub.FullDuplexCall(request_iterator)
447 self.assertIs(exception_context.exception.code(),
448 grpc.StatusCode.UNKNOWN)
449 service.server.stop(
None)
454 def half_duplex_request_iterator():
455 request = request_pb2.StreamingOutputCallRequest()
456 request.response_parameters.add(size=1, interval_us=0)
458 request = request_pb2.StreamingOutputCallRequest()
459 request.response_parameters.add(size=2, interval_us=0)
460 request.response_parameters.add(size=3, interval_us=0)
463 responses = service.stub.HalfDuplexCall(half_duplex_request_iterator())
464 expected_responses = service.servicer_methods.HalfDuplexCall(
465 half_duplex_request_iterator(),
'not a real RpcContext!')
466 for expected_response, response
in moves.zip_longest(
467 expected_responses, responses):
468 self.assertEqual(expected_response, response)
469 service.server.stop(
None)
472 condition = threading.Condition()
475 @contextlib.contextmanager
483 condition.notify_all()
485 def half_duplex_request_iterator():
486 request = request_pb2.StreamingOutputCallRequest()
487 request.response_parameters.add(size=1, interval_us=0)
495 responses = service.stub.HalfDuplexCall(
496 half_duplex_request_iterator(),
497 timeout=test_constants.SHORT_TIMEOUT)
501 self.assertIs(exception_context.exception.code(),
502 grpc.StatusCode.DEADLINE_EXCEEDED)
503 service.server.stop(
None)
506 @unittest.skipIf(sys.version_info[0] < 3
or sys.version_info[1] < 6,
507 "Unsupported on Python 2.")
511 class Servicer(service_pb2_grpc.TestServiceServicer):
514 return SimpleStubsPluginTest.servicer_methods.UnaryCall(
518 return SimpleStubsPluginTest.servicer_methods.StreamingOutputCall(
522 return SimpleStubsPluginTest.servicer_methods.StreamingInputCall(
523 request_iterator, context)
526 return SimpleStubsPluginTest.servicer_methods.FullDuplexCall(
527 request_iterator, context)
530 return SimpleStubsPluginTest.servicer_methods.HalfDuplexCall(
531 request_iterator, context)
534 super(SimpleStubsPluginTest, self).
setUp()
536 service_pb2_grpc.add_TestServiceServicer_to_server(
544 super(SimpleStubsPluginTest, self).
tearDown()
547 request = request_pb2.SimpleRequest(response_size=13)
548 response = service_pb2_grpc.TestService.UnaryCall(
555 request,
'not a real context!')
556 self.assertEqual(expected_response, response)
559 request = request_pb2.SimpleRequest(response_size=13)
560 response = service_pb2_grpc.TestService.UnaryCall(request,
565 request,
'not a real context!')
566 self.assertEqual(expected_response, response)
571 request,
'not a real RpcContext!')
572 responses = service_pb2_grpc.TestService.StreamingOutputCall(
578 for expected_response, response
in moves.zip_longest(
579 expected_responses, responses):
580 self.assertEqual(expected_response, response)
583 response = service_pb2_grpc.TestService.StreamingInputCall(
591 self.assertEqual(expected_response, response)
594 responses = service_pb2_grpc.TestService.FullDuplexCall(
602 for expected_response, response
in moves.zip_longest(
603 expected_responses, responses):
604 self.assertEqual(expected_response, response)
608 def half_duplex_request_iterator():
609 request = request_pb2.StreamingOutputCallRequest()
610 request.response_parameters.add(size=1, interval_us=0)
612 request = request_pb2.StreamingOutputCallRequest()
613 request.response_parameters.add(size=2, interval_us=0)
614 request.response_parameters.add(size=3, interval_us=0)
617 responses = service_pb2_grpc.TestService.HalfDuplexCall(
618 half_duplex_request_iterator(),
624 half_duplex_request_iterator(),
'not a real RpcContext!')
625 for expected_response, response
in moves.zip_longest(
626 expected_responses, responses):
627 self.assertEqual(expected_response, response)
631 """Test case for running `python -m grpc_tools.protoc`.
635 if sys.executable
is None:
636 raise unittest.SkipTest(
637 "Running on a interpreter that cannot be invoked from the CLI.")
638 proto_dir_path = os.path.join(
"src",
"proto")
639 test_proto_path = os.path.join(proto_dir_path,
"grpc",
"testing",
641 streams = tuple(tempfile.TemporaryFile()
for _
in range(2))
642 work_dir = tempfile.mkdtemp()
644 invocation = (sys.executable,
"-m",
"grpc_tools.protoc",
645 "--proto_path", proto_dir_path,
"--python_out",
646 work_dir,
"--grpc_python_out", work_dir,
648 proc = subprocess.Popen(invocation,
653 for stream
in streams:
655 self.assertEqual(0,
len(stream.read()))
656 self.assertEqual(0, proc.returncode)
658 shutil.rmtree(work_dir)
661 if __name__ ==
'__main__':
662 unittest.main(verbosity=2)