_client_application.py
Go to the documentation of this file.
1 # Copyright 2017 gRPC authors.
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 # http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14 """An example gRPC Python-using client-side application."""
15 
16 import collections
17 import enum
18 import threading
19 import time
20 
21 import grpc
22 
23 from tests.testing import _application_common
24 from tests.testing.proto import requests_pb2
25 from tests.testing.proto import services_pb2
26 from tests.testing.proto import services_pb2_grpc
27 from tests.unit.framework.common import test_constants
28 
29 
30 @enum.unique
31 class Scenario(enum.Enum):
32  UNARY_UNARY = 'unary unary'
33  UNARY_STREAM = 'unary stream'
34  STREAM_UNARY = 'stream unary'
35  STREAM_STREAM = 'stream stream'
36  CONCURRENT_STREAM_UNARY = 'concurrent stream unary'
37  CONCURRENT_STREAM_STREAM = 'concurrent stream stream'
38  CANCEL_UNARY_UNARY = 'cancel unary unary'
39  CANCEL_UNARY_STREAM = 'cancel unary stream'
40  INFINITE_REQUEST_STREAM = 'infinite request stream'
41 
42 
43 class Outcome(collections.namedtuple('Outcome', ('kind', 'code', 'details'))):
44  """Outcome of a client application scenario.
45 
46  Attributes:
47  kind: A Kind value describing the overall kind of scenario execution.
48  code: A grpc.StatusCode value. Only valid if kind is Kind.RPC_ERROR.
49  details: A status details string. Only valid if kind is Kind.RPC_ERROR.
50  """
51 
52  @enum.unique
53  class Kind(enum.Enum):
54  SATISFACTORY = 'satisfactory'
55  UNSATISFACTORY = 'unsatisfactory'
56  RPC_ERROR = 'rpc error'
57 
58 
59 _SATISFACTORY_OUTCOME = Outcome(Outcome.Kind.SATISFACTORY, None, None)
60 _UNSATISFACTORY_OUTCOME = Outcome(Outcome.Kind.UNSATISFACTORY, None, None)
61 
62 
63 class _Pipe(object):
64 
65  def __init__(self):
66  self._condition = threading.Condition()
67  self._values = []
68  self._open = True
69 
70  def __iter__(self):
71  return self
72 
73  def _next(self):
74  with self._condition:
75  while True:
76  if self._values:
77  return self._values.pop(0)
78  elif not self._open:
79  raise StopIteration()
80  else:
81  self._condition.wait()
82 
83  def __next__(self): # (Python 3 Iterator Protocol)
84  return self._next()
85 
86  def next(self): # (Python 2 Iterator Protocol)
87  return self._next()
88 
89  def add(self, value):
90  with self._condition:
91  self._values.append(value)
92  self._condition.notify_all()
93 
94  def close(self):
95  with self._condition:
96  self._open = False
97  self._condition.notify_all()
98 
99 
101  response = stub.UnUn(_application_common.UNARY_UNARY_REQUEST)
102  if _application_common.UNARY_UNARY_RESPONSE == response:
103  return _SATISFACTORY_OUTCOME
104  else:
105  return _UNSATISFACTORY_OUTCOME
106 
107 
109  response_iterator = stub.UnStre(_application_common.UNARY_STREAM_REQUEST)
110  try:
111  next(response_iterator)
112  except StopIteration:
113  return _SATISFACTORY_OUTCOME
114  else:
115  return _UNSATISFACTORY_OUTCOME
116 
117 
119  response, call = stub.StreUn.with_call(
120  iter((_application_common.STREAM_UNARY_REQUEST,) * 3))
121  if (_application_common.STREAM_UNARY_RESPONSE == response and
122  call.code() is grpc.StatusCode.OK):
123  return _SATISFACTORY_OUTCOME
124  else:
125  return _UNSATISFACTORY_OUTCOME
126 
127 
129  request_pipe = _Pipe()
130  response_iterator = stub.StreStre(iter(request_pipe))
131  request_pipe.add(_application_common.STREAM_STREAM_REQUEST)
132  first_responses = next(response_iterator), next(response_iterator)
133  request_pipe.add(_application_common.STREAM_STREAM_REQUEST)
134  second_responses = next(response_iterator), next(response_iterator)
135  request_pipe.close()
136  try:
137  next(response_iterator)
138  except StopIteration:
139  unexpected_extra_response = False
140  else:
141  unexpected_extra_response = True
142  if (first_responses == _application_common.TWO_STREAM_STREAM_RESPONSES and
143  second_responses == _application_common.TWO_STREAM_STREAM_RESPONSES
144  and not unexpected_extra_response):
145  return _SATISFACTORY_OUTCOME
146  else:
147  return _UNSATISFACTORY_OUTCOME
148 
149 
151  future_calls = tuple(
152  stub.StreUn.future(iter((_application_common.STREAM_UNARY_REQUEST,) *
153  3))
154  for _ in range(test_constants.THREAD_CONCURRENCY))
155  for future_call in future_calls:
156  if future_call.code() is grpc.StatusCode.OK:
157  response = future_call.result()
158  if _application_common.STREAM_UNARY_RESPONSE != response:
159  return _UNSATISFACTORY_OUTCOME
160  else:
161  return _UNSATISFACTORY_OUTCOME
162  else:
163  return _SATISFACTORY_OUTCOME
164 
165 
167  condition = threading.Condition()
168  outcomes = [None] * test_constants.RPC_CONCURRENCY
169 
170  def run_stream_stream(index):
171  outcome = _run_stream_stream(stub)
172  with condition:
173  outcomes[index] = outcome
174  condition.notify()
175 
176  for index in range(test_constants.RPC_CONCURRENCY):
177  thread = threading.Thread(target=run_stream_stream, args=(index,))
178  thread.start()
179  with condition:
180  while True:
181  if all(outcomes):
182  for outcome in outcomes:
183  if outcome.kind is not Outcome.Kind.SATISFACTORY:
184  return _UNSATISFACTORY_OUTCOME
185  else:
186  return _SATISFACTORY_OUTCOME
187  else:
188  condition.wait()
189 
190 
192  response_future_call = stub.UnUn.future(
193  _application_common.UNARY_UNARY_REQUEST)
194  initial_metadata = response_future_call.initial_metadata()
195  cancelled = response_future_call.cancel()
196  if initial_metadata is not None and cancelled:
197  return _SATISFACTORY_OUTCOME
198  else:
199  return _UNSATISFACTORY_OUTCOME
200 
201 
203 
205  while True:
206  yield _application_common.STREAM_UNARY_REQUEST
207 
208  response_future_call = stub.StreUn.future(
210  timeout=_application_common.INFINITE_REQUEST_STREAM_TIMEOUT)
211  if response_future_call.code() is grpc.StatusCode.DEADLINE_EXCEEDED:
212  return _SATISFACTORY_OUTCOME
213  else:
214  return _UNSATISFACTORY_OUTCOME
215 
216 
217 _IMPLEMENTATIONS = {
218  Scenario.UNARY_UNARY: _run_unary_unary,
219  Scenario.UNARY_STREAM: _run_unary_stream,
220  Scenario.STREAM_UNARY: _run_stream_unary,
221  Scenario.STREAM_STREAM: _run_stream_stream,
222  Scenario.CONCURRENT_STREAM_UNARY: _run_concurrent_stream_unary,
223  Scenario.CONCURRENT_STREAM_STREAM: _run_concurrent_stream_stream,
224  Scenario.CANCEL_UNARY_UNARY: _run_cancel_unary_unary,
225  Scenario.INFINITE_REQUEST_STREAM: _run_infinite_request_stream,
226 }
227 
228 
229 def run(scenario, channel):
230  stub = services_pb2_grpc.FirstServiceStub(channel)
231  try:
232  return _IMPLEMENTATIONS[scenario](stub)
233  except grpc.RpcError as rpc_error:
234  return Outcome(Outcome.Kind.RPC_ERROR, rpc_error.code(),
235  rpc_error.details())
test_group_name.all
all
Definition: test_group_name.py:241
tests.testing._client_application._run_unary_stream
def _run_unary_stream(stub)
Definition: _client_application.py:108
tests.testing._client_application.Outcome.Kind
Definition: _client_application.py:53
tests.testing._client_application._Pipe._condition
_condition
Definition: _client_application.py:66
tests.unit._exit_scenarios.infinite_request_iterator
def infinite_request_iterator()
Definition: _exit_scenarios.py:159
capstone.range
range
Definition: third_party/bloaty/third_party/capstone/bindings/python/capstone/__init__.py:6
tests.testing
Definition: src/python/grpcio_tests/tests/testing/__init__.py:1
tests.testing._client_application._Pipe.__iter__
def __iter__(self)
Definition: _client_application.py:70
tests.testing._client_application._Pipe.add
def add(self, value)
Definition: _client_application.py:89
grpc.RpcError
Definition: src/python/grpcio/grpc/__init__.py:302
tests.testing._client_application.Scenario
Definition: _client_application.py:31
tests.testing._client_application._run_cancel_unary_unary
def _run_cancel_unary_unary(stub)
Definition: _client_application.py:191
tests.testing._client_application._Pipe._open
_open
Definition: _client_application.py:68
tests.testing._client_application._Pipe.__init__
def __init__(self)
Definition: _client_application.py:65
tests.testing._client_application._Pipe.next
def next(self)
Definition: _client_application.py:86
tests.testing._client_application._run_infinite_request_stream
def _run_infinite_request_stream(stub)
Definition: _client_application.py:202
tests.testing._client_application._Pipe._next
def _next(self)
Definition: _client_application.py:73
tests.testing._client_application.Outcome
Definition: _client_application.py:43
tests.testing._client_application._run_stream_unary
def _run_stream_unary(stub)
Definition: _client_application.py:118
tests.testing._client_application._run_concurrent_stream_unary
def _run_concurrent_stream_unary(stub)
Definition: _client_application.py:150
tests.testing.proto
Definition: src/python/grpcio_tests/tests/testing/proto/__init__.py:1
tests.testing._client_application._Pipe.close
def close(self)
Definition: _client_application.py:94
next
AllocList * next[kMaxLevel]
Definition: abseil-cpp/absl/base/internal/low_level_alloc.cc:100
tests.testing._client_application._Pipe._values
_values
Definition: _client_application.py:67
tests.testing._client_application._run_concurrent_stream_stream
def _run_concurrent_stream_stream(stub)
Definition: _client_application.py:166
wait
static void wait(notification *n)
Definition: alts_tsi_handshaker_test.cc:114
tests.testing._client_application._Pipe
Definition: _client_application.py:63
tests.testing._client_application.run
def run(scenario, channel)
Definition: _client_application.py:229
tests.testing._client_application._Pipe.__next__
def __next__(self)
Definition: _client_application.py:83
iter
Definition: test_winkernel.cpp:47
tests.unit.framework.common
Definition: src/python/grpcio_tests/tests/unit/framework/common/__init__.py:1
tests.testing._client_application._run_stream_stream
def _run_stream_stream(stub)
Definition: _client_application.py:128
tests.testing._client_application._run_unary_unary
def _run_unary_unary(stub)
Definition: _client_application.py:100


grpc
Author(s):
autogenerated on Thu Mar 13 2025 02:58:27