21 from src.proto.grpc.testing
import empty_pb2
22 from src.proto.grpc.testing
import messages_pb2
23 from src.proto.grpc.testing
import test_pb2_grpc
27 _INITIAL_METADATA_KEY =
"x-grpc-test-echo-initial"
28 _TRAILING_METADATA_KEY =
"x-grpc-test-echo-trailing-bin"
32 """Copies metadata from request to response if it is present."""
33 invocation_metadata = dict(servicer_context.invocation_metadata())
34 if _INITIAL_METADATA_KEY
in invocation_metadata:
35 initial_metadatum = (_INITIAL_METADATA_KEY,
36 invocation_metadata[_INITIAL_METADATA_KEY])
37 await servicer_context.send_initial_metadata((initial_metadatum,))
38 if _TRAILING_METADATA_KEY
in invocation_metadata:
39 trailing_metadatum = (_TRAILING_METADATA_KEY,
40 invocation_metadata[_TRAILING_METADATA_KEY])
41 servicer_context.set_trailing_metadata((trailing_metadatum,))
46 """Echos the RPC status if demanded by the request."""
47 if request.HasField(
'response_status'):
48 await servicer_context.abort(request.response_status.code,
49 request.response_status.message)
59 body=b
'\x00' * request.response_size))
61 async
def EmptyCall(self, request, context):
62 return empty_pb2.Empty()
64 async
def StreamingOutputCall(
65 self, request: messages_pb2.StreamingOutputCallRequest,
67 for response_parameters
in request.response_parameters:
68 if response_parameters.interval_us != 0:
70 datetime.timedelta(microseconds=response_parameters.
71 interval_us).total_seconds())
72 if response_parameters.size != 0:
76 response_parameters.size))
83 async
def UnaryCallWithSleep(self, unused_request, unused_context):
84 await asyncio.sleep(_constants.UNARY_CALL_WITH_SLEEP_VALUE)
87 async
def StreamingInputCall(self, request_async_iterator, unused_context):
89 async
for request
in request_async_iterator:
90 if request.payload
is not None and request.payload.body:
91 aggregate_size +=
len(request.payload.body)
93 aggregated_payload_size=aggregate_size)
95 async
def FullDuplexCall(self, request_async_iterator, context):
97 async
for request
in request_async_iterator:
99 for response_parameters
in request.response_parameters:
100 if response_parameters.interval_us != 0:
102 datetime.timedelta(microseconds=response_parameters.
103 interval_us).total_seconds())
104 if response_parameters.size != 0:
108 response_parameters.size))
116 rpc_method_handlers = {
117 'UnaryCallWithSleep':
119 servicer.UnaryCallWithSleep,
120 request_deserializer=messages_pb2.SimpleRequest.FromString,
121 response_serializer=messages_pb2.SimpleResponse.
130 server_credentials=None,
132 server = aio.server(options=((
'grpc.so_reuseport', 0),),
133 interceptors=interceptors)
135 test_pb2_grpc.add_TestServiceServicer_to_server(servicer, server)
140 if server_credentials
is None:
142 (resources.private_key(), resources.certificate_chain())
144 port = server.add_secure_port(
'[::]:%d' % port, server_credentials)
146 port = server.add_insecure_port(
'[::]:%d' % port)
151 return 'localhost:%d' % port, server