14 """An example gRPC Python-using client-side application."""
32 UNARY_UNARY =
'unary unary'
33 UNARY_STREAM =
'unary stream'
34 STREAM_UNARY =
'stream unary'
35 STREAM_STREAM =
'stream stream'
36 CONCURRENT_STREAM_UNARY =
'concurrent stream unary'
37 CONCURRENT_STREAM_STREAM =
'concurrent stream stream'
38 CANCEL_UNARY_UNARY =
'cancel unary unary'
39 CANCEL_UNARY_STREAM =
'cancel unary stream'
40 INFINITE_REQUEST_STREAM =
'infinite request stream'
43 class Outcome(collections.namedtuple(
'Outcome', (
'kind',
'code',
'details'))):
44 """Outcome of a client application scenario.
47 kind: A Kind value describing the overall kind of scenario execution.
48 code: A grpc.StatusCode value. Only valid if kind is Kind.RPC_ERROR.
49 details: A status details string. Only valid if kind is Kind.RPC_ERROR.
54 SATISFACTORY =
'satisfactory'
55 UNSATISFACTORY =
'unsatisfactory'
56 RPC_ERROR =
'rpc error'
59 _SATISFACTORY_OUTCOME =
Outcome(Outcome.Kind.SATISFACTORY,
None,
None)
60 _UNSATISFACTORY_OUTCOME =
Outcome(Outcome.Kind.UNSATISFACTORY,
None,
None)
101 response = stub.UnUn(_application_common.UNARY_UNARY_REQUEST)
102 if _application_common.UNARY_UNARY_RESPONSE == response:
103 return _SATISFACTORY_OUTCOME
105 return _UNSATISFACTORY_OUTCOME
109 response_iterator = stub.UnStre(_application_common.UNARY_STREAM_REQUEST)
111 next(response_iterator)
112 except StopIteration:
113 return _SATISFACTORY_OUTCOME
115 return _UNSATISFACTORY_OUTCOME
119 response, call = stub.StreUn.with_call(
120 iter((_application_common.STREAM_UNARY_REQUEST,) * 3))
121 if (_application_common.STREAM_UNARY_RESPONSE == response
and
122 call.code()
is grpc.StatusCode.OK):
123 return _SATISFACTORY_OUTCOME
125 return _UNSATISFACTORY_OUTCOME
129 request_pipe =
_Pipe()
130 response_iterator = stub.StreStre(
iter(request_pipe))
131 request_pipe.add(_application_common.STREAM_STREAM_REQUEST)
132 first_responses =
next(response_iterator),
next(response_iterator)
133 request_pipe.add(_application_common.STREAM_STREAM_REQUEST)
134 second_responses =
next(response_iterator),
next(response_iterator)
137 next(response_iterator)
138 except StopIteration:
139 unexpected_extra_response =
False
141 unexpected_extra_response =
True
142 if (first_responses == _application_common.TWO_STREAM_STREAM_RESPONSES
and
143 second_responses == _application_common.TWO_STREAM_STREAM_RESPONSES
144 and not unexpected_extra_response):
145 return _SATISFACTORY_OUTCOME
147 return _UNSATISFACTORY_OUTCOME
151 future_calls = tuple(
152 stub.StreUn.future(
iter((_application_common.STREAM_UNARY_REQUEST,) *
154 for _
in range(test_constants.THREAD_CONCURRENCY))
155 for future_call
in future_calls:
156 if future_call.code()
is grpc.StatusCode.OK:
157 response = future_call.result()
158 if _application_common.STREAM_UNARY_RESPONSE != response:
159 return _UNSATISFACTORY_OUTCOME
161 return _UNSATISFACTORY_OUTCOME
163 return _SATISFACTORY_OUTCOME
167 condition = threading.Condition()
168 outcomes = [
None] * test_constants.RPC_CONCURRENCY
170 def run_stream_stream(index):
173 outcomes[index] = outcome
176 for index
in range(test_constants.RPC_CONCURRENCY):
177 thread = threading.Thread(target=run_stream_stream, args=(index,))
182 for outcome
in outcomes:
183 if outcome.kind
is not Outcome.Kind.SATISFACTORY:
184 return _UNSATISFACTORY_OUTCOME
186 return _SATISFACTORY_OUTCOME
192 response_future_call = stub.UnUn.future(
193 _application_common.UNARY_UNARY_REQUEST)
194 initial_metadata = response_future_call.initial_metadata()
195 cancelled = response_future_call.cancel()
196 if initial_metadata
is not None and cancelled:
197 return _SATISFACTORY_OUTCOME
199 return _UNSATISFACTORY_OUTCOME
206 yield _application_common.STREAM_UNARY_REQUEST
208 response_future_call = stub.StreUn.future(
210 timeout=_application_common.INFINITE_REQUEST_STREAM_TIMEOUT)
211 if response_future_call.code()
is grpc.StatusCode.DEADLINE_EXCEEDED:
212 return _SATISFACTORY_OUTCOME
214 return _UNSATISFACTORY_OUTCOME
218 Scenario.UNARY_UNARY: _run_unary_unary,
219 Scenario.UNARY_STREAM: _run_unary_stream,
220 Scenario.STREAM_UNARY: _run_stream_unary,
221 Scenario.STREAM_STREAM: _run_stream_stream,
222 Scenario.CONCURRENT_STREAM_UNARY: _run_concurrent_stream_unary,
223 Scenario.CONCURRENT_STREAM_STREAM: _run_concurrent_stream_stream,
224 Scenario.CANCEL_UNARY_UNARY: _run_cancel_unary_unary,
225 Scenario.INFINITE_REQUEST_STREAM: _run_infinite_request_stream,
229 def run(scenario, channel):
230 stub = services_pb2_grpc.FirstServiceStub(channel)
232 return _IMPLEMENTATIONS[scenario](stub)
234 return Outcome(Outcome.Kind.RPC_ERROR, rpc_error.code(),