cygrpc_test.py
Go to the documentation of this file.
1 # Copyright 2015 gRPC authors.
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 # http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14 
15 import platform
16 import threading
17 import time
18 import unittest
19 
20 from grpc._cython import cygrpc
21 
22 from tests.unit import resources
23 from tests.unit import test_common
24 from tests.unit._cython import test_utilities
25 
26 _SSL_HOST_OVERRIDE = b'foo.test.google.fr'
27 _CALL_CREDENTIALS_METADATA_KEY = 'call-creds-key'
28 _CALL_CREDENTIALS_METADATA_VALUE = 'call-creds-value'
29 _EMPTY_FLAGS = 0
30 
31 
32 def _metadata_plugin(context, callback):
33  callback(((
34  _CALL_CREDENTIALS_METADATA_KEY,
35  _CALL_CREDENTIALS_METADATA_VALUE,
36  ),), cygrpc.StatusCode.ok, b'')
37 
38 
39 class TypeSmokeTest(unittest.TestCase):
40 
42  completion_queue = cygrpc.CompletionQueue()
43  del completion_queue
44 
45  def testServerUpDown(self):
46  server = cygrpc.Server(set([(
47  b'grpc.so_reuseport',
48  0,
49  )]), False)
50  del server
51 
52  def testChannelUpDown(self):
53  channel = cygrpc.Channel(b'[::]:0', None, None)
54  channel.close(cygrpc.StatusCode.cancelled, 'Test method anyway!')
55 
57  cygrpc.MetadataPluginCallCredentials(_metadata_plugin,
58  b'test plugin name!')
59 
61  server = cygrpc.Server([(
62  b'grpc.so_reuseport',
63  0,
64  )], False)
65  completion_queue = cygrpc.CompletionQueue()
66  server.register_completion_queue(completion_queue)
67  port = server.add_http2_port(b'[::]:0')
68  self.assertIsInstance(port, int)
69  server.start()
70  del server
71 
73  completion_queue = cygrpc.CompletionQueue()
74  server = cygrpc.Server([
75  (
76  b'grpc.so_reuseport',
77  0,
78  ),
79  ], False)
80  server.add_http2_port(b'[::]:0')
81  server.register_completion_queue(completion_queue)
82  server.start()
83  shutdown_tag = object()
84  server.shutdown(completion_queue, shutdown_tag)
85  event = completion_queue.poll()
86  self.assertEqual(cygrpc.CompletionType.operation_complete,
87  event.completion_type)
88  self.assertIs(shutdown_tag, event.tag)
89  del server
90  del completion_queue
91 
92 
93 class ServerClientMixin(object):
94 
95  def setUpMixin(self, server_credentials, client_credentials, host_override):
96  self.server_completion_queue = cygrpc.CompletionQueue()
97  self.server = cygrpc.Server([(
98  b'grpc.so_reuseport',
99  0,
100  )], False)
101  self.server.register_completion_queue(self.server_completion_queue)
102  if server_credentials:
103  self.port = self.server.add_http2_port(b'[::]:0',
104  server_credentials)
105  else:
106  self.port = self.server.add_http2_port(b'[::]:0')
107  self.server.start()
108  self.client_completion_queue = cygrpc.CompletionQueue()
109  if client_credentials:
110  client_channel_arguments = ((
111  cygrpc.ChannelArgKey.ssl_target_name_override,
112  host_override,
113  ),)
114  self.client_channel = cygrpc.Channel(
115  'localhost:{}'.format(self.port).encode(),
116  client_channel_arguments, client_credentials)
117  else:
118  self.client_channel = cygrpc.Channel(
119  'localhost:{}'.format(self.port).encode(), set(), None)
120  if host_override:
121  self.host_argument = None # default host
122  self.expected_host = host_override
123  else:
124  # arbitrary host name necessitating no further identification
125  self.host_argument = b'hostess'
126  self.expected_host = self.host_argument
127 
128  def tearDownMixin(self):
129  self.client_channel.close(cygrpc.StatusCode.ok, 'test being torn down!')
130  del self.client_channel
131  del self.server
132  del self.client_completion_queue
133  del self.server_completion_queue
134 
135  def _perform_queue_operations(self, operations, call, queue, deadline,
136  description):
137  """Perform the operations with given call, queue, and deadline.
138 
139  Invocation errors are reported with as an exception with `description`
140  in the message. Performs the operations asynchronously, returning a
141  future.
142  """
143 
144  def performer():
145  tag = object()
146  try:
147  call_result = call.start_client_batch(operations, tag)
148  self.assertEqual(cygrpc.CallError.ok, call_result)
149  event = queue.poll(deadline=deadline)
150  self.assertEqual(cygrpc.CompletionType.operation_complete,
151  event.completion_type)
152  self.assertTrue(event.success)
153  self.assertIs(tag, event.tag)
154  except Exception as error:
155  raise Exception("Error in '{}': {}".format(
156  description, error.message))
157  return event
158 
159  return test_utilities.SimpleFuture(performer)
160 
161  def test_echo(self):
162  DEADLINE = time.time() + 5
163  DEADLINE_TOLERANCE = 0.25
164  CLIENT_METADATA_ASCII_KEY = 'key'
165  CLIENT_METADATA_ASCII_VALUE = 'val'
166  CLIENT_METADATA_BIN_KEY = 'key-bin'
167  CLIENT_METADATA_BIN_VALUE = b'\0' * 1000
168  SERVER_INITIAL_METADATA_KEY = 'init_me_me_me'
169  SERVER_INITIAL_METADATA_VALUE = 'whodawha?'
170  SERVER_TRAILING_METADATA_KEY = 'california_is_in_a_drought'
171  SERVER_TRAILING_METADATA_VALUE = 'zomg it is'
172  SERVER_STATUS_CODE = cygrpc.StatusCode.ok
173  SERVER_STATUS_DETAILS = 'our work is never over'
174  REQUEST = b'in death a member of project mayhem has a name'
175  RESPONSE = b'his name is robert paulson'
176  METHOD = b'twinkies'
177 
178  server_request_tag = object()
179  request_call_result = self.server.request_call(
181  server_request_tag)
182 
183  self.assertEqual(cygrpc.CallError.ok, request_call_result)
184 
185  client_call_tag = object()
186  client_initial_metadata = (
187  (
188  CLIENT_METADATA_ASCII_KEY,
189  CLIENT_METADATA_ASCII_VALUE,
190  ),
191  (
192  CLIENT_METADATA_BIN_KEY,
193  CLIENT_METADATA_BIN_VALUE,
194  ),
195  )
196  client_call = self.client_channel.integrated_call(
197  0, METHOD, self.host_argument, DEADLINE, client_initial_metadata,
198  None, [
199  (
200  [
201  cygrpc.SendInitialMetadataOperation(
202  client_initial_metadata, _EMPTY_FLAGS),
203  cygrpc.SendMessageOperation(REQUEST, _EMPTY_FLAGS),
204  cygrpc.SendCloseFromClientOperation(_EMPTY_FLAGS),
205  cygrpc.ReceiveInitialMetadataOperation(_EMPTY_FLAGS),
206  cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS),
207  cygrpc.ReceiveStatusOnClientOperation(_EMPTY_FLAGS),
208  ],
209  client_call_tag,
210  ),
211  ])
212  client_event_future = test_utilities.SimpleFuture(
213  self.client_channel.next_call_event)
214 
215  request_event = self.server_completion_queue.poll(deadline=DEADLINE)
216  self.assertEqual(cygrpc.CompletionType.operation_complete,
217  request_event.completion_type)
218  self.assertIsInstance(request_event.call, cygrpc.Call)
219  self.assertIs(server_request_tag, request_event.tag)
220  self.assertTrue(
221  test_common.metadata_transmitted(client_initial_metadata,
222  request_event.invocation_metadata))
223  self.assertEqual(METHOD, request_event.call_details.method)
224  self.assertEqual(self.expected_host, request_event.call_details.host)
225  self.assertLess(abs(DEADLINE - request_event.call_details.deadline),
226  DEADLINE_TOLERANCE)
227 
228  server_call_tag = object()
229  server_call = request_event.call
230  server_initial_metadata = ((
231  SERVER_INITIAL_METADATA_KEY,
232  SERVER_INITIAL_METADATA_VALUE,
233  ),)
234  server_trailing_metadata = ((
235  SERVER_TRAILING_METADATA_KEY,
236  SERVER_TRAILING_METADATA_VALUE,
237  ),)
238  server_start_batch_result = server_call.start_server_batch([
239  cygrpc.SendInitialMetadataOperation(server_initial_metadata,
240  _EMPTY_FLAGS),
241  cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS),
242  cygrpc.SendMessageOperation(RESPONSE, _EMPTY_FLAGS),
243  cygrpc.ReceiveCloseOnServerOperation(_EMPTY_FLAGS),
244  cygrpc.SendStatusFromServerOperation(
245  server_trailing_metadata, SERVER_STATUS_CODE,
246  SERVER_STATUS_DETAILS, _EMPTY_FLAGS)
247  ], server_call_tag)
248  self.assertEqual(cygrpc.CallError.ok, server_start_batch_result)
249 
250  server_event = self.server_completion_queue.poll(deadline=DEADLINE)
251  client_event = client_event_future.result()
252 
253  self.assertEqual(6, len(client_event.batch_operations))
254  found_client_op_types = set()
255  for client_result in client_event.batch_operations:
256  # we expect each op type to be unique
257  self.assertNotIn(client_result.type(), found_client_op_types)
258  found_client_op_types.add(client_result.type())
259  if client_result.type(
260  ) == cygrpc.OperationType.receive_initial_metadata:
261  self.assertTrue(
262  test_common.metadata_transmitted(
263  server_initial_metadata,
264  client_result.initial_metadata()))
265  elif client_result.type() == cygrpc.OperationType.receive_message:
266  self.assertEqual(RESPONSE, client_result.message())
267  elif client_result.type(
268  ) == cygrpc.OperationType.receive_status_on_client:
269  self.assertTrue(
270  test_common.metadata_transmitted(
271  server_trailing_metadata,
272  client_result.trailing_metadata()))
273  self.assertEqual(SERVER_STATUS_DETAILS, client_result.details())
274  self.assertEqual(SERVER_STATUS_CODE, client_result.code())
275  self.assertEqual(
276  set([
277  cygrpc.OperationType.send_initial_metadata,
278  cygrpc.OperationType.send_message,
279  cygrpc.OperationType.send_close_from_client,
280  cygrpc.OperationType.receive_initial_metadata,
281  cygrpc.OperationType.receive_message,
282  cygrpc.OperationType.receive_status_on_client
283  ]), found_client_op_types)
284 
285  self.assertEqual(5, len(server_event.batch_operations))
286  found_server_op_types = set()
287  for server_result in server_event.batch_operations:
288  self.assertNotIn(server_result.type(), found_server_op_types)
289  found_server_op_types.add(server_result.type())
290  if server_result.type() == cygrpc.OperationType.receive_message:
291  self.assertEqual(REQUEST, server_result.message())
292  elif server_result.type(
293  ) == cygrpc.OperationType.receive_close_on_server:
294  self.assertFalse(server_result.cancelled())
295  self.assertEqual(
296  set([
297  cygrpc.OperationType.send_initial_metadata,
298  cygrpc.OperationType.receive_message,
299  cygrpc.OperationType.send_message,
300  cygrpc.OperationType.receive_close_on_server,
301  cygrpc.OperationType.send_status_from_server
302  ]), found_server_op_types)
303 
304  del client_call
305  del server_call
306 
307  def test_6522(self):
308  DEADLINE = time.time() + 5
309  DEADLINE_TOLERANCE = 0.25
310  METHOD = b'twinkies'
311 
312  empty_metadata = ()
313 
314  # Prologue
315  server_request_tag = object()
318  server_request_tag)
319  client_call = self.client_channel.segregated_call(
320  0, METHOD, self.host_argument, DEADLINE, None, None,
321  ([(
322  [
323  cygrpc.SendInitialMetadataOperation(empty_metadata,
324  _EMPTY_FLAGS),
325  cygrpc.ReceiveInitialMetadataOperation(_EMPTY_FLAGS),
326  ],
327  object(),
328  ),
329  (
330  [
331  cygrpc.ReceiveStatusOnClientOperation(_EMPTY_FLAGS),
332  ],
333  object(),
334  )]))
335 
336  client_initial_metadata_event_future = test_utilities.SimpleFuture(
337  client_call.next_event)
338 
339  request_event = self.server_completion_queue.poll(deadline=DEADLINE)
340  server_call = request_event.call
341 
342  def perform_server_operations(operations, description):
343  return self._perform_queue_operations(operations, server_call,
345  DEADLINE, description)
346 
347  server_event_future = perform_server_operations([
348  cygrpc.SendInitialMetadataOperation(empty_metadata, _EMPTY_FLAGS),
349  ], "Server prologue")
350 
351  client_initial_metadata_event_future.result() # force completion
352  server_event_future.result()
353 
354  # Messaging
355  for _ in range(10):
356  client_call.operate([
357  cygrpc.SendMessageOperation(b'', _EMPTY_FLAGS),
358  cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS),
359  ], "Client message")
360  client_message_event_future = test_utilities.SimpleFuture(
361  client_call.next_event)
362  server_event_future = perform_server_operations([
363  cygrpc.SendMessageOperation(b'', _EMPTY_FLAGS),
364  cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS),
365  ], "Server receive")
366 
367  client_message_event_future.result() # force completion
368  server_event_future.result()
369 
370  # Epilogue
371  client_call.operate([
372  cygrpc.SendCloseFromClientOperation(_EMPTY_FLAGS),
373  ], "Client epilogue")
374  # One for ReceiveStatusOnClient, one for SendCloseFromClient.
375  client_events_future = test_utilities.SimpleFuture(lambda: {
376  client_call.next_event(),
377  client_call.next_event(),
378  })
379 
380  server_event_future = perform_server_operations([
381  cygrpc.ReceiveCloseOnServerOperation(_EMPTY_FLAGS),
382  cygrpc.SendStatusFromServerOperation(
383  empty_metadata, cygrpc.StatusCode.ok, b'', _EMPTY_FLAGS)
384  ], "Server epilogue")
385 
386  client_events_future.result() # force completion
387  server_event_future.result()
388 
389 
391 
392  def setUp(self):
393  self.setUpMixin(None, None, None)
394 
395  def tearDown(self):
396  self.tearDownMixin()
397 
398 
399 class SecureServerSecureClient(unittest.TestCase, ServerClientMixin):
400 
401  def setUp(self):
402  server_credentials = cygrpc.server_credentials_ssl(
403  None, [
404  cygrpc.SslPemKeyCertPair(resources.private_key(),
405  resources.certificate_chain())
406  ], False)
407  client_credentials = cygrpc.SSLChannelCredentials(
408  resources.test_root_certificates(), None, None)
409  self.setUpMixin(server_credentials, client_credentials,
410  _SSL_HOST_OVERRIDE)
411 
412  def tearDown(self):
413  self.tearDownMixin()
414 
415 
416 if __name__ == '__main__':
417  unittest.main(verbosity=2)
tests.unit._cython.cygrpc_test._metadata_plugin
def _metadata_plugin(context, callback)
Definition: cygrpc_test.py:32
tests.unit._cython.cygrpc_test.ServerClientMixin
Definition: cygrpc_test.py:93
tests.unit._cython.cygrpc_test.ServerClientMixin.host_argument
host_argument
Definition: cygrpc_test.py:121
http2_test_server.format
format
Definition: http2_test_server.py:118
tests.unit._cython
Definition: src/python/grpcio_tests/tests/unit/_cython/__init__.py:1
request_call
static void request_call(grpc_end2end_proxy *proxy)
Definition: proxy.cc:432
capstone.range
range
Definition: third_party/bloaty/third_party/capstone/bindings/python/capstone/__init__.py:6
tests.unit._cython.cygrpc_test.InsecureServerInsecureClient.setUp
def setUp(self)
Definition: cygrpc_test.py:392
tests.unit._cython.cygrpc_test.ServerClientMixin.setUpMixin
def setUpMixin(self, server_credentials, client_credentials, host_override)
Definition: cygrpc_test.py:95
tests.unit._cython.cygrpc_test.SecureServerSecureClient.tearDown
def tearDown(self)
Definition: cygrpc_test.py:412
tests.unit._cython.cygrpc_test.ServerClientMixin.client_completion_queue
client_completion_queue
Definition: cygrpc_test.py:108
tests.unit._cython.cygrpc_test.ServerClientMixin.expected_host
expected_host
Definition: cygrpc_test.py:122
grpc._common.encode
def encode(s)
Definition: grpc/_common.py:68
tests.unit._cython.cygrpc_test.InsecureServerInsecureClient.tearDown
def tearDown(self)
Definition: cygrpc_test.py:395
tests.unit._cython.cygrpc_test.SecureServerSecureClient
Definition: cygrpc_test.py:399
tests.unit._cython.cygrpc_test.ServerClientMixin.server
server
Definition: cygrpc_test.py:97
start
static uint64_t start
Definition: benchmark-pound.c:74
tests.unit._cython.cygrpc_test.ServerClientMixin.server_completion_queue
server_completion_queue
Definition: cygrpc_test.py:96
tests.unit._cython.cygrpc_test.TypeSmokeTest.testCompletionQueueUpDown
def testCompletionQueueUpDown(self)
Definition: cygrpc_test.py:41
tests.unit._cython.cygrpc_test.ServerClientMixin.tearDownMixin
def tearDownMixin(self)
Definition: cygrpc_test.py:128
tests.unit._cython.cygrpc_test.TypeSmokeTest.testServerStartNoExplicitShutdown
def testServerStartNoExplicitShutdown(self)
Definition: cygrpc_test.py:60
tests.unit._cython.cygrpc_test.ServerClientMixin.test_6522
def test_6522(self)
Definition: cygrpc_test.py:307
tests.unit._cython.cygrpc_test.TypeSmokeTest.testServerUpDown
def testServerUpDown(self)
Definition: cygrpc_test.py:45
close
#define close
Definition: test-fs.c:48
tests.unit._cython.cygrpc_test.ServerClientMixin.client_channel
client_channel
Definition: cygrpc_test.py:114
callback
static void callback(void *arg, int status, int timeouts, struct hostent *host)
Definition: acountry.c:224
tests.unit._cython.cygrpc_test.ServerClientMixin.port
port
Definition: cygrpc_test.py:103
tests.unit._cython.cygrpc_test.ServerClientMixin._perform_queue_operations
def _perform_queue_operations(self, operations, call, queue, deadline, description)
Definition: cygrpc_test.py:135
tests.unit
Definition: src/python/grpcio_tests/tests/unit/__init__.py:1
tests.unit._cython.test_utilities.SimpleFuture
Definition: _cython/test_utilities.py:20
tests.unit._cython.cygrpc_test.ServerClientMixin.test_echo
def test_echo(self)
Definition: cygrpc_test.py:161
tests.unit._cython.cygrpc_test.InsecureServerInsecureClient
Definition: cygrpc_test.py:390
cpp.gmock_class.set
set
Definition: bloaty/third_party/googletest/googlemock/scripts/generator/cpp/gmock_class.py:44
tests.unit._cython.cygrpc_test.TypeSmokeTest.test_metadata_plugin_call_credentials_up_down
def test_metadata_plugin_call_credentials_up_down(self)
Definition: cygrpc_test.py:56
tests.unit._cython.cygrpc_test.SecureServerSecureClient.setUp
def setUp(self)
Definition: cygrpc_test.py:401
grpc._cython
Definition: src/python/grpcio/grpc/_cython/__init__.py:1
len
int len
Definition: abseil-cpp/absl/base/internal/low_level_alloc_test.cc:46
tests.unit._cython.cygrpc_test.TypeSmokeTest.testServerStartShutdown
def testServerStartShutdown(self)
Definition: cygrpc_test.py:72
tests.unit._cython.cygrpc_test.TypeSmokeTest.testChannelUpDown
def testChannelUpDown(self)
Definition: cygrpc_test.py:52
tests.unit._cython.cygrpc_test.TypeSmokeTest
Definition: cygrpc_test.py:39


grpc
Author(s):
autogenerated on Thu Mar 13 2025 02:59:02