_read_some_but_not_all_responses_test.py
Go to the documentation of this file.
1 # Copyright 2016 gRPC authors.
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 # http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14 """Test a corner-case at the level of the Cython API."""
15 
16 import threading
17 import unittest
18 
19 from grpc._cython import cygrpc
20 
21 from tests.unit._cython import test_utilities
22 
23 _EMPTY_FLAGS = 0
24 _EMPTY_METADATA = ()
25 
26 
27 class _ServerDriver(object):
28 
29  def __init__(self, completion_queue, shutdown_tag):
30  self._condition = threading.Condition()
31  self._completion_queue = completion_queue
32  self._shutdown_tag = shutdown_tag
33  self._events = []
34  self._saw_shutdown_tag = False
35 
36  def start(self):
37 
38  def in_thread():
39  while True:
40  event = self._completion_queue.poll()
41  with self._condition:
42  self._events.append(event)
43  self._condition.notify()
44  if event.tag is self._shutdown_tag:
45  self._saw_shutdown_tag = True
46  break
47 
48  thread = threading.Thread(target=in_thread)
49  thread.start()
50 
51  def done(self):
52  with self._condition:
53  return self._saw_shutdown_tag
54 
55  def first_event(self):
56  with self._condition:
57  while not self._events:
58  self._condition.wait()
59  return self._events[0]
60 
61  def events(self):
62  with self._condition:
63  while not self._saw_shutdown_tag:
64  self._condition.wait()
65  return tuple(self._events)
66 
67 
68 class _QueueDriver(object):
69 
70  def __init__(self, condition, completion_queue, due):
71  self._condition = condition
72  self._completion_queue = completion_queue
73  self._due = due
74  self._events = []
75  self._returned = False
76 
77  def start(self):
78 
79  def in_thread():
80  while True:
81  event = self._completion_queue.poll()
82  with self._condition:
83  self._events.append(event)
84  self._due.remove(event.tag)
85  self._condition.notify_all()
86  if not self._due:
87  self._returned = True
88  return
89 
90  thread = threading.Thread(target=in_thread)
91  thread.start()
92 
93  def done(self):
94  with self._condition:
95  return self._returned
96 
97  def event_with_tag(self, tag):
98  with self._condition:
99  while True:
100  for event in self._events:
101  if event.tag is tag:
102  return event
103  self._condition.wait()
104 
105  def events(self):
106  with self._condition:
107  while not self._returned:
108  self._condition.wait()
109  return tuple(self._events)
110 
111 
112 class ReadSomeButNotAllResponsesTest(unittest.TestCase):
113 
115  server_completion_queue = cygrpc.CompletionQueue()
116  server = cygrpc.Server([(
117  b'grpc.so_reuseport',
118  0,
119  )], False)
120  server.register_completion_queue(server_completion_queue)
121  port = server.add_http2_port(b'[::]:0')
122  server.start()
123  channel = cygrpc.Channel('localhost:{}'.format(port).encode(), set(),
124  None)
125 
126  server_shutdown_tag = 'server_shutdown_tag'
127  server_driver = _ServerDriver(server_completion_queue,
128  server_shutdown_tag)
129  server_driver.start()
130 
131  client_condition = threading.Condition()
132  client_due = set()
133 
134  server_call_condition = threading.Condition()
135  server_send_initial_metadata_tag = 'server_send_initial_metadata_tag'
136  server_send_first_message_tag = 'server_send_first_message_tag'
137  server_send_second_message_tag = 'server_send_second_message_tag'
138  server_complete_rpc_tag = 'server_complete_rpc_tag'
139  server_call_due = set((
140  server_send_initial_metadata_tag,
141  server_send_first_message_tag,
142  server_send_second_message_tag,
143  server_complete_rpc_tag,
144  ))
145  server_call_completion_queue = cygrpc.CompletionQueue()
146  server_call_driver = _QueueDriver(server_call_condition,
147  server_call_completion_queue,
148  server_call_due)
149  server_call_driver.start()
150 
151  server_rpc_tag = 'server_rpc_tag'
152  request_call_result = server.request_call(server_call_completion_queue,
153  server_completion_queue,
154  server_rpc_tag)
155 
156  client_receive_initial_metadata_tag = 'client_receive_initial_metadata_tag'
157  client_complete_rpc_tag = 'client_complete_rpc_tag'
158  client_call = channel.segregated_call(
159  _EMPTY_FLAGS, b'/twinkies', None, None, _EMPTY_METADATA, None, (
160  (
161  [
162  cygrpc.ReceiveInitialMetadataOperation(_EMPTY_FLAGS),
163  ],
164  client_receive_initial_metadata_tag,
165  ),
166  (
167  [
168  cygrpc.SendInitialMetadataOperation(
169  _EMPTY_METADATA, _EMPTY_FLAGS),
170  cygrpc.SendCloseFromClientOperation(_EMPTY_FLAGS),
171  cygrpc.ReceiveStatusOnClientOperation(_EMPTY_FLAGS),
172  ],
173  client_complete_rpc_tag,
174  ),
175  ))
176  client_receive_initial_metadata_event_future = test_utilities.SimpleFuture(
177  client_call.next_event)
178 
179  server_rpc_event = server_driver.first_event()
180 
181  with server_call_condition:
182  server_send_initial_metadata_start_batch_result = (
183  server_rpc_event.call.start_server_batch([
184  cygrpc.SendInitialMetadataOperation(_EMPTY_METADATA,
185  _EMPTY_FLAGS),
186  ], server_send_initial_metadata_tag))
187  server_send_first_message_start_batch_result = (
188  server_rpc_event.call.start_server_batch([
189  cygrpc.SendMessageOperation(b'\x07', _EMPTY_FLAGS),
190  ], server_send_first_message_tag))
191  server_send_initial_metadata_event = server_call_driver.event_with_tag(
192  server_send_initial_metadata_tag)
193  server_send_first_message_event = server_call_driver.event_with_tag(
194  server_send_first_message_tag)
195  with server_call_condition:
196  server_send_second_message_start_batch_result = (
197  server_rpc_event.call.start_server_batch([
198  cygrpc.SendMessageOperation(b'\x07', _EMPTY_FLAGS),
199  ], server_send_second_message_tag))
200  server_complete_rpc_start_batch_result = (
201  server_rpc_event.call.start_server_batch([
202  cygrpc.ReceiveCloseOnServerOperation(_EMPTY_FLAGS),
203  cygrpc.SendStatusFromServerOperation(
204  (), cygrpc.StatusCode.ok, b'test details',
205  _EMPTY_FLAGS),
206  ], server_complete_rpc_tag))
207  server_send_second_message_event = server_call_driver.event_with_tag(
208  server_send_second_message_tag)
209  server_complete_rpc_event = server_call_driver.event_with_tag(
210  server_complete_rpc_tag)
211  server_call_driver.events()
212 
213  client_recieve_initial_metadata_event = client_receive_initial_metadata_event_future.result(
214  )
215 
216  client_receive_first_message_tag = 'client_receive_first_message_tag'
217  client_call.operate([
218  cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS),
219  ], client_receive_first_message_tag)
220  client_receive_first_message_event = client_call.next_event()
221 
222  client_call_cancel_result = client_call.cancel(
223  cygrpc.StatusCode.cancelled, 'Cancelled during test!')
224  client_complete_rpc_event = client_call.next_event()
225 
226  channel.close(cygrpc.StatusCode.unknown, 'Channel closed!')
227  server.shutdown(server_completion_queue, server_shutdown_tag)
228  server.cancel_all_calls()
229  server_driver.events()
230 
231  self.assertEqual(cygrpc.CallError.ok, request_call_result)
232  self.assertEqual(cygrpc.CallError.ok,
233  server_send_initial_metadata_start_batch_result)
234  self.assertIs(server_rpc_tag, server_rpc_event.tag)
235  self.assertEqual(cygrpc.CompletionType.operation_complete,
236  server_rpc_event.completion_type)
237  self.assertIsInstance(server_rpc_event.call, cygrpc.Call)
238 
239 
240 if __name__ == '__main__':
241  unittest.main(verbosity=2)
tests.unit._cython._read_some_but_not_all_responses_test._ServerDriver._condition
_condition
Definition: _read_some_but_not_all_responses_test.py:30
http2_test_server.format
format
Definition: http2_test_server.py:118
tests.unit._cython
Definition: src/python/grpcio_tests/tests/unit/_cython/__init__.py:1
tests.unit._cython._read_some_but_not_all_responses_test._ServerDriver.start
def start(self)
Definition: _read_some_but_not_all_responses_test.py:36
tests.unit._cython._read_some_but_not_all_responses_test._QueueDriver.__init__
def __init__(self, condition, completion_queue, due)
Definition: _read_some_but_not_all_responses_test.py:70
grpc._common.encode
def encode(s)
Definition: grpc/_common.py:68
tests.unit._cython._read_some_but_not_all_responses_test._ServerDriver
Definition: _read_some_but_not_all_responses_test.py:27
tests.unit._cython._read_some_but_not_all_responses_test._QueueDriver.start
def start(self)
Definition: _read_some_but_not_all_responses_test.py:77
tests.unit._cython._read_some_but_not_all_responses_test.ReadSomeButNotAllResponsesTest.testReadSomeButNotAllResponses
def testReadSomeButNotAllResponses(self)
Definition: _read_some_but_not_all_responses_test.py:114
tests.unit._cython._read_some_but_not_all_responses_test._QueueDriver
Definition: _read_some_but_not_all_responses_test.py:68
tests.unit._cython._read_some_but_not_all_responses_test._QueueDriver._due
_due
Definition: _read_some_but_not_all_responses_test.py:73
tests.unit._cython._read_some_but_not_all_responses_test._ServerDriver.events
def events(self)
Definition: _read_some_but_not_all_responses_test.py:61
tests.unit._cython._read_some_but_not_all_responses_test._QueueDriver._events
_events
Definition: _read_some_but_not_all_responses_test.py:74
tests.unit._cython._read_some_but_not_all_responses_test._QueueDriver.events
def events(self)
Definition: _read_some_but_not_all_responses_test.py:105
tests.unit._cython._read_some_but_not_all_responses_test._ServerDriver.done
def done(self)
Definition: _read_some_but_not_all_responses_test.py:51
tests.unit._cython._read_some_but_not_all_responses_test._QueueDriver.event_with_tag
def event_with_tag(self, tag)
Definition: _read_some_but_not_all_responses_test.py:97
tests.unit._cython._read_some_but_not_all_responses_test._ServerDriver.__init__
def __init__(self, completion_queue, shutdown_tag)
Definition: _read_some_but_not_all_responses_test.py:29
tests.unit._cython._read_some_but_not_all_responses_test._ServerDriver._saw_shutdown_tag
_saw_shutdown_tag
Definition: _read_some_but_not_all_responses_test.py:34
tests.unit._cython._read_some_but_not_all_responses_test._ServerDriver._completion_queue
_completion_queue
Definition: _read_some_but_not_all_responses_test.py:31
tests.unit._cython._read_some_but_not_all_responses_test._QueueDriver.done
def done(self)
Definition: _read_some_but_not_all_responses_test.py:93
tests.unit._cython._read_some_but_not_all_responses_test._ServerDriver._shutdown_tag
_shutdown_tag
Definition: _read_some_but_not_all_responses_test.py:32
tests.unit._cython.test_utilities.SimpleFuture
Definition: _cython/test_utilities.py:20
tests.unit._cython._read_some_but_not_all_responses_test._QueueDriver._condition
_condition
Definition: _read_some_but_not_all_responses_test.py:71
tests.unit._cython._read_some_but_not_all_responses_test._QueueDriver._completion_queue
_completion_queue
Definition: _read_some_but_not_all_responses_test.py:72
cpp.gmock_class.set
set
Definition: bloaty/third_party/googletest/googlemock/scripts/generator/cpp/gmock_class.py:44
wait
static void wait(notification *n)
Definition: alts_tsi_handshaker_test.cc:114
tests.unit._cython._read_some_but_not_all_responses_test.ReadSomeButNotAllResponsesTest
Definition: _read_some_but_not_all_responses_test.py:112
tests.unit._cython._read_some_but_not_all_responses_test._ServerDriver.first_event
def first_event(self)
Definition: _read_some_but_not_all_responses_test.py:55
grpc._cython
Definition: src/python/grpcio/grpc/_cython/__init__.py:1
tests.unit._cython._read_some_but_not_all_responses_test._ServerDriver._events
_events
Definition: _read_some_but_not_all_responses_test.py:33
tests.unit._cython._read_some_but_not_all_responses_test._QueueDriver._returned
_returned
Definition: _read_some_but_not_all_responses_test.py:75


grpc
Author(s):
autogenerated on Fri May 16 2025 02:57:38