30 from grpc_tools
import protoc
35 _RELATIVE_PROTO_PATH =
'relative_proto_path'
36 _RELATIVE_PYTHON_OUT =
'relative_python_out'
38 _PROTO_FILES_PATH_COMPONENTS = (
40 'beta_grpc_plugin_test',
45 'beta_grpc_plugin_test',
48 'test_requests.proto',
51 'beta_grpc_plugin_test',
53 'test_responses.proto',
56 'beta_grpc_plugin_test',
62 _PAYLOAD_PB2 =
'beta_grpc_plugin_test.payload.test_payload_pb2'
63 _REQUESTS_PB2 =
'beta_grpc_plugin_test.requests.r.test_requests_pb2'
64 _RESPONSES_PB2 =
'beta_grpc_plugin_test.responses.test_responses_pb2'
65 _SERVICE_PB2 =
'beta_grpc_plugin_test.service.test_service_pb2'
68 SERVICER_IDENTIFIER =
'BetaTestServiceServicer'
69 STUB_IDENTIFIER =
'BetaTestServiceStub'
70 SERVER_FACTORY_IDENTIFIER =
'beta_create_TestService_server'
71 STUB_FACTORY_IDENTIFIER =
'beta_create_TestService_stub'
74 @contextlib.contextmanager
76 old_system_path = sys.path[:]
77 sys.path = sys.path[0:1] + path_insertion + sys.path[1:]
79 sys.path = old_system_path
84 for path_components
in path_components_sequence:
86 for path_component
in path_components:
87 relative_path = path.join(thus_far, path_component)
88 if relative_path
not in created:
89 os.makedirs(path.join(root, relative_path))
90 created.add(relative_path)
91 thus_far = path.join(thus_far, path_component)
95 imports_substituted = raw_proto_content.replace(
96 b
'import "tests/protoc_plugin/protos/',
97 b
'import "beta_grpc_plugin_test/')
98 package_statement_substituted = imports_substituted.replace(
99 b
'package grpc_protoc_plugin;', b
'package beta_grpc_protoc_plugin;')
100 return package_statement_substituted
104 for subdirectory, _, _
in os.walk(directory):
105 init_file_name = path.join(subdirectory,
'__init__.py')
106 with open(init_file_name,
'wb')
as init_file:
119 @contextlib.contextmanager
128 @contextlib.contextmanager
145 response.payload.payload_type = self.
_payload_pb2.COMPRESSABLE
146 response.payload.payload_compressable =
'a' * request.response_size
151 for parameter
in request.response_parameters:
153 response.payload.payload_type = self.
_payload_pb2.COMPRESSABLE
154 response.payload.payload_compressable =
'a' * parameter.size
160 aggregated_payload_size = 0
161 for request
in request_iter:
162 aggregated_payload_size +=
len(request.payload.payload_compressable)
163 response.aggregated_payload_size = aggregated_payload_size
168 for request
in request_iter:
169 for parameter
in request.response_parameters:
171 response.payload.payload_type = self.
_payload_pb2.COMPRESSABLE
172 response.payload.payload_compressable =
'a' * parameter.size
178 for request
in request_iter:
179 for parameter
in request.response_parameters:
181 response.payload.payload_type = self.
_payload_pb2.COMPRESSABLE
182 response.payload.payload_compressable =
'a' * parameter.size
184 responses.append(response)
185 for response
in responses:
189 @contextlib.contextmanager
191 """Provides a servicer backend and a stub.
193 The servicer is just the implementation of the actual servicer passed to the
194 face player of the python RPC implementation; the two are detached.
197 A (servicer_methods, stub) pair where servicer_methods is the back-end of
198 the service bound to the stub and stub is the stub on which to invoke
205 def UnaryCall(self, request, context):
206 return servicer_methods.UnaryCall(request, context)
208 def StreamingOutputCall(self, request, context):
209 return servicer_methods.StreamingOutputCall(request, context)
211 def StreamingInputCall(self, request_iter, context):
212 return servicer_methods.StreamingInputCall(request_iter, context)
214 def FullDuplexCall(self, request_iter, context):
215 return servicer_methods.FullDuplexCall(request_iter, context)
217 def HalfDuplexCall(self, request_iter, context):
218 return servicer_methods.HalfDuplexCall(request_iter, context)
220 servicer = Servicer()
221 server = getattr(service_pb2, SERVER_FACTORY_IDENTIFIER)(servicer)
222 port = server.add_insecure_port(
'[::]:0')
224 channel = implementations.insecure_channel(
'localhost', port)
225 stub = getattr(service_pb2, STUB_FACTORY_IDENTIFIER)(channel)
226 yield servicer_methods, stub
230 @contextlib.contextmanager
232 """Provides a servicer backend that fails to implement methods and its stub.
234 The servicer is just the implementation of the actual servicer passed to the
235 face player of the python RPC implementation; the two are detached.
237 service_pb2: The service_pb2 module generated by this test.
239 A (servicer_methods, stub) pair where servicer_methods is the back-end of
240 the service bound to the stub and stub is the stub on which to invoke
247 servicer = Servicer()
248 server = getattr(service_pb2, SERVER_FACTORY_IDENTIFIER)(servicer)
249 port = server.add_insecure_port(
'[::]:0')
251 channel = implementations.insecure_channel(
'localhost', port)
252 stub = getattr(service_pb2, STUB_FACTORY_IDENTIFIER)(channel)
259 request = requests_pb2.StreamingInputCallRequest()
260 request.payload.payload_type = payload_pb2.COMPRESSABLE
261 request.payload.payload_compressable =
'a'
266 request = requests_pb2.StreamingOutputCallRequest()
268 request.response_parameters.add(size=sizes[0], interval_us=0)
269 request.response_parameters.add(size=sizes[1], interval_us=0)
270 request.response_parameters.add(size=sizes[2], interval_us=0)
275 request = requests_pb2.StreamingOutputCallRequest()
276 request.response_parameters.add(size=1, interval_us=0)
278 request = requests_pb2.StreamingOutputCallRequest()
279 request.response_parameters.add(size=2, interval_us=0)
280 request.response_parameters.add(size=3, interval_us=0)
285 """Test case for the gRPC Python protoc-plugin.
287 While reading these tests, remember that the futures API
288 (`stub.method.future()`) only gives futures for the *response-unary*
289 methods and does not exist for response-streaming methods.
300 directories_path_components = {
301 proto_file_path_components[:-1]
302 for proto_file_path_components
in _PROTO_FILES_PATH_COMPONENTS
306 for proto_file_path_components
in _PROTO_FILES_PATH_COMPONENTS:
307 raw_proto_content = pkgutil.get_data(
308 'tests.protoc_plugin.protos',
309 path.join(*proto_file_path_components[1:]))
312 *proto_file_path_components)
313 with open(proto_file_name,
'wb')
as proto_file:
314 proto_file.write(massaged_proto_content)
327 protoc_exit_code = protoc.main(args)
328 self.assertEqual(0, protoc_exit_code)
342 self.assertIsNotNone(
344 self.assertIsNotNone(getattr(self.
_service_pb2, STUB_IDENTIFIER,
None))
345 self.assertIsNotNone(
346 getattr(self.
_service_pb2, SERVER_FACTORY_IDENTIFIER,
None))
347 self.assertIsNotNone(
348 getattr(self.
_service_pb2, STUB_FACTORY_IDENTIFIER,
None))
363 stub.UnaryCall(request, test_constants.LONG_TIMEOUT)
364 except face.AbortionError
as error:
365 self.assertEqual(interfaces.StatusCode.UNIMPLEMENTED,
374 response = stub.UnaryCall(request, test_constants.LONG_TIMEOUT)
375 expected_response = methods.UnaryCall(request,
'not a real context!')
376 self.assertEqual(expected_response, response)
385 with methods.pause():
386 response_future = stub.UnaryCall.future(
387 request, test_constants.LONG_TIMEOUT)
388 response = response_future.result()
389 expected_response = methods.UnaryCall(request,
'not a real RpcContext!')
390 self.assertEqual(expected_response, response)
398 with methods.pause():
399 response_future = stub.UnaryCall.future(
400 request, test_constants.SHORT_TIMEOUT)
401 with self.assertRaises(face.ExpirationError):
402 response_future.result()
410 with methods.pause():
411 response_future = stub.UnaryCall.future(request, 1)
412 response_future.cancel()
413 self.assertTrue(response_future.cancelled())
422 response_future = stub.UnaryCall.future(
423 request, test_constants.LONG_TIMEOUT)
424 self.assertIsNotNone(response_future.exception())
432 responses = stub.StreamingOutputCall(request,
433 test_constants.LONG_TIMEOUT)
434 expected_responses = methods.StreamingOutputCall(
435 request,
'not a real RpcContext!')
436 for expected_response, response
in moves.zip_longest(
437 expected_responses, responses):
438 self.assertEqual(expected_response, response)
446 with methods.pause():
447 responses = stub.StreamingOutputCall(
448 request, test_constants.SHORT_TIMEOUT)
449 with self.assertRaises(face.ExpirationError):
458 responses = stub.StreamingOutputCall(request,
459 test_constants.LONG_TIMEOUT)
462 with self.assertRaises(face.CancellationError):
472 responses = stub.StreamingOutputCall(request, 1)
473 self.assertIsNotNone(responses)
474 with self.assertRaises(face.RemoteError):
482 response = stub.StreamingInputCall(
485 test_constants.LONG_TIMEOUT)
486 expected_response = methods.StreamingInputCall(
489 'not a real RpcContext!')
490 self.assertEqual(expected_response, response)
497 with methods.pause():
498 response_future = stub.StreamingInputCall.future(
501 test_constants.LONG_TIMEOUT)
502 response = response_future.result()
503 expected_response = methods.StreamingInputCall(
506 'not a real RpcContext!')
507 self.assertEqual(expected_response, response)
514 with methods.pause():
515 response_future = stub.StreamingInputCall.future(
518 test_constants.SHORT_TIMEOUT)
519 with self.assertRaises(face.ExpirationError):
520 response_future.result()
521 self.assertIsInstance(response_future.exception(),
522 face.ExpirationError)
529 with methods.pause():
530 response_future = stub.StreamingInputCall.future(
533 test_constants.LONG_TIMEOUT)
534 response_future.cancel()
535 self.assertTrue(response_future.cancelled())
536 with self.assertRaises(future.CancelledError):
537 response_future.result()
545 response_future = stub.StreamingInputCall.future(
548 test_constants.LONG_TIMEOUT)
549 self.assertIsNotNone(response_future.exception())
556 responses = stub.FullDuplexCall(
558 test_constants.LONG_TIMEOUT)
559 expected_responses = methods.FullDuplexCall(
561 'not a real RpcContext!')
562 for expected_response, response
in moves.zip_longest(
563 expected_responses, responses):
564 self.assertEqual(expected_response, response)
572 with methods.pause():
573 responses = stub.FullDuplexCall(request_iterator,
574 test_constants.SHORT_TIMEOUT)
575 with self.assertRaises(face.ExpirationError):
584 responses = stub.FullDuplexCall(request_iterator,
585 test_constants.LONG_TIMEOUT)
588 with self.assertRaises(face.CancellationError):
598 responses = stub.FullDuplexCall(request_iterator,
599 test_constants.LONG_TIMEOUT)
600 self.assertIsNotNone(responses)
601 with self.assertRaises(face.RemoteError):
610 def half_duplex_request_iterator():
612 request.response_parameters.add(size=1, interval_us=0)
615 request.response_parameters.add(size=2, interval_us=0)
616 request.response_parameters.add(size=3, interval_us=0)
619 responses = stub.HalfDuplexCall(half_duplex_request_iterator(),
620 test_constants.LONG_TIMEOUT)
621 expected_responses = methods.HalfDuplexCall(
622 half_duplex_request_iterator(),
'not a real RpcContext!')
623 for check
in moves.zip_longest(expected_responses, responses):
624 expected_response, response = check
625 self.assertEqual(expected_response, response)
630 condition = threading.Condition()
633 @contextlib.contextmanager
641 condition.notify_all()
643 def half_duplex_request_iterator():
645 request.response_parameters.add(size=1, interval_us=0)
654 responses = stub.HalfDuplexCall(half_duplex_request_iterator(),
655 test_constants.SHORT_TIMEOUT)
657 with self.assertRaises(face.ExpirationError):
661 if __name__ ==
'__main__':
662 unittest.main(verbosity=2)