14 """The Python implementation of the TestServicer."""
20 from src.proto.grpc.testing
import empty_pb2
21 from src.proto.grpc.testing
import messages_pb2
22 from src.proto.grpc.testing
import test_pb2_grpc
24 _INITIAL_METADATA_KEY =
"x-grpc-test-echo-initial"
25 _TRAILING_METADATA_KEY =
"x-grpc-test-echo-trailing-bin"
26 _US_IN_A_SECOND = 1000 * 1000
30 """Copies metadata from request to response if it is present."""
31 invocation_metadata = dict(servicer_context.invocation_metadata())
32 if _INITIAL_METADATA_KEY
in invocation_metadata:
33 initial_metadatum = (_INITIAL_METADATA_KEY,
34 invocation_metadata[_INITIAL_METADATA_KEY])
35 servicer_context.send_initial_metadata((initial_metadatum,))
36 if _TRAILING_METADATA_KEY
in invocation_metadata:
37 trailing_metadatum = (_TRAILING_METADATA_KEY,
38 invocation_metadata[_TRAILING_METADATA_KEY])
39 servicer_context.set_trailing_metadata((trailing_metadatum,))
43 """Sets the response context code and details if the request asks for them"""
44 if request.HasField(
'response_status'):
45 servicer_context.set_code(request.response_status.code)
46 servicer_context.set_details(request.response_status.message)
53 return empty_pb2.Empty()
60 body=b
'\x00' * request.response_size))
64 for response_parameters
in request.response_parameters:
65 if response_parameters.interval_us != 0:
66 time.sleep(response_parameters.interval_us / _US_IN_A_SECOND)
70 response_parameters.size))
74 for request
in request_iterator:
75 if request.payload
is not None and request.payload.body:
76 aggregate_size +=
len(request.payload.body)
78 aggregated_payload_size=aggregate_size)
82 for request
in request_iterator:
84 for response_parameters
in request.response_parameters:
85 if response_parameters.interval_us != 0:
86 time.sleep(response_parameters.interval_us /
91 response_parameters.size))