_python_plugin_test.py
Go to the documentation of this file.
1 # Copyright 2015 gRPC authors.
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 # http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14 
15 import collections
16 import contextlib
17 import distutils.spawn
18 import errno
19 import os
20 import shutil
21 import subprocess
22 import sys
23 import tempfile
24 import threading
25 import unittest
26 
27 import grpc
28 import grpc.experimental
29 from six import moves
30 
31 import tests.protoc_plugin.protos.payload.test_payload_pb2 as payload_pb2
32 import tests.protoc_plugin.protos.requests.r.test_requests_pb2 as request_pb2
33 import tests.protoc_plugin.protos.responses.test_responses_pb2 as response_pb2
34 import tests.protoc_plugin.protos.service.test_service_pb2_grpc as service_pb2_grpc
35 from tests.unit import test_common
36 from tests.unit.framework.common import test_constants
37 
38 # Identifiers of entities we expect to find in the generated module.
39 STUB_IDENTIFIER = 'TestServiceStub'
40 SERVICER_IDENTIFIER = 'TestServiceServicer'
41 ADD_SERVICER_TO_SERVER_IDENTIFIER = 'add_TestServiceServicer_to_server'
42 
43 
44 class _ServicerMethods(object):
45 
46  def __init__(self):
47  self._condition = threading.Condition()
48  self._paused = False
49  self._fail = False
50 
51  @contextlib.contextmanager
52  def pause(self): # pylint: disable=invalid-name
53  with self._condition:
54  self._paused = True
55  yield
56  with self._condition:
57  self._paused = False
58  self._condition.notify_all()
59 
60  @contextlib.contextmanager
61  def fail(self): # pylint: disable=invalid-name
62  with self._condition:
63  self._fail = True
64  yield
65  with self._condition:
66  self._fail = False
67 
68  def _control(self): # pylint: disable=invalid-name
69  with self._condition:
70  if self._fail:
71  raise ValueError()
72  while self._paused:
73  self._condition.wait()
74 
75  def UnaryCall(self, request, unused_rpc_context):
76  response = response_pb2.SimpleResponse()
77  response.payload.payload_type = payload_pb2.COMPRESSABLE
78  response.payload.payload_compressable = 'a' * request.response_size
79  self._control()
80  return response
81 
82  def StreamingOutputCall(self, request, unused_rpc_context):
83  for parameter in request.response_parameters:
84  response = response_pb2.StreamingOutputCallResponse()
85  response.payload.payload_type = payload_pb2.COMPRESSABLE
86  response.payload.payload_compressable = 'a' * parameter.size
87  self._control()
88  yield response
89 
90  def StreamingInputCall(self, request_iter, unused_rpc_context):
91  response = response_pb2.StreamingInputCallResponse()
92  aggregated_payload_size = 0
93  for request in request_iter:
94  aggregated_payload_size += len(request.payload.payload_compressable)
95  response.aggregated_payload_size = aggregated_payload_size
96  self._control()
97  return response
98 
99  def FullDuplexCall(self, request_iter, unused_rpc_context):
100  for request in request_iter:
101  for parameter in request.response_parameters:
102  response = response_pb2.StreamingOutputCallResponse()
103  response.payload.payload_type = payload_pb2.COMPRESSABLE
104  response.payload.payload_compressable = 'a' * parameter.size
105  self._control()
106  yield response
107 
108  def HalfDuplexCall(self, request_iter, unused_rpc_context):
109  responses = []
110  for request in request_iter:
111  for parameter in request.response_parameters:
112  response = response_pb2.StreamingOutputCallResponse()
113  response.payload.payload_type = payload_pb2.COMPRESSABLE
114  response.payload.payload_compressable = 'a' * parameter.size
115  self._control()
116  responses.append(response)
117  for response in responses:
118  yield response
119 
120 
121 class _Service(
122  collections.namedtuple('_Service', (
123  'servicer_methods',
124  'server',
125  'stub',
126  ))):
127  """A live and running service.
128 
129  Attributes:
130  servicer_methods: The _ServicerMethods servicing RPCs.
131  server: The grpc.Server servicing RPCs.
132  stub: A stub on which to invoke RPCs.
133  """
134 
135 
137  """Provides a servicer backend and a stub.
138 
139  Returns:
140  A _Service with which to test RPCs.
141  """
142  servicer_methods = _ServicerMethods()
143 
144  class Servicer(getattr(service_pb2_grpc, SERVICER_IDENTIFIER)):
145 
146  def UnaryCall(self, request, context):
147  return servicer_methods.UnaryCall(request, context)
148 
149  def StreamingOutputCall(self, request, context):
150  return servicer_methods.StreamingOutputCall(request, context)
151 
152  def StreamingInputCall(self, request_iterator, context):
153  return servicer_methods.StreamingInputCall(request_iterator,
154  context)
155 
156  def FullDuplexCall(self, request_iterator, context):
157  return servicer_methods.FullDuplexCall(request_iterator, context)
158 
159  def HalfDuplexCall(self, request_iterator, context):
160  return servicer_methods.HalfDuplexCall(request_iterator, context)
161 
162  server = test_common.test_server()
163  getattr(service_pb2_grpc, ADD_SERVICER_TO_SERVER_IDENTIFIER)(Servicer(),
164  server)
165  port = server.add_insecure_port('[::]:0')
166  server.start()
167  channel = grpc.insecure_channel('localhost:{}'.format(port))
168  stub = getattr(service_pb2_grpc, STUB_IDENTIFIER)(channel)
169  return _Service(servicer_methods, server, stub)
170 
171 
173  """Provides a servicer backend that fails to implement methods and its stub.
174 
175  Returns:
176  A _Service with which to test RPCs. The returned _Service's
177  servicer_methods implements none of the methods required of it.
178  """
179 
180  class Servicer(getattr(service_pb2_grpc, SERVICER_IDENTIFIER)):
181  pass
182 
183  server = test_common.test_server()
184  getattr(service_pb2_grpc, ADD_SERVICER_TO_SERVER_IDENTIFIER)(Servicer(),
185  server)
186  port = server.add_insecure_port('[::]:0')
187  server.start()
188  channel = grpc.insecure_channel('localhost:{}'.format(port))
189  stub = getattr(service_pb2_grpc, STUB_IDENTIFIER)(channel)
190  return _Service(None, server, stub)
191 
192 
194  for _ in range(3):
195  request = request_pb2.StreamingInputCallRequest()
196  request.payload.payload_type = payload_pb2.COMPRESSABLE
197  request.payload.payload_compressable = 'a'
198  yield request
199 
200 
202  request = request_pb2.StreamingOutputCallRequest()
203  sizes = [1, 2, 3]
204  request.response_parameters.add(size=sizes[0], interval_us=0)
205  request.response_parameters.add(size=sizes[1], interval_us=0)
206  request.response_parameters.add(size=sizes[2], interval_us=0)
207  return request
208 
209 
211  request = request_pb2.StreamingOutputCallRequest()
212  request.response_parameters.add(size=1, interval_us=0)
213  yield request
214  request = request_pb2.StreamingOutputCallRequest()
215  request.response_parameters.add(size=2, interval_us=0)
216  request.response_parameters.add(size=3, interval_us=0)
217  yield request
218 
219 
220 class PythonPluginTest(unittest.TestCase):
221  """Test case for the gRPC Python protoc-plugin.
222 
223  While reading these tests, remember that the futures API
224  (`stub.method.future()`) only gives futures for the *response-unary*
225  methods and does not exist for response-streaming methods.
226  """
227 
229  # check that we can access the generated module and its members.
230  self.assertIsNotNone(getattr(service_pb2_grpc, STUB_IDENTIFIER, None))
231  self.assertIsNotNone(
232  getattr(service_pb2_grpc, SERVICER_IDENTIFIER, None))
233  self.assertIsNotNone(
234  getattr(service_pb2_grpc, ADD_SERVICER_TO_SERVER_IDENTIFIER, None))
235 
236  def testUpDown(self):
237  service = _CreateService()
238  self.assertIsNotNone(service.servicer_methods)
239  self.assertIsNotNone(service.server)
240  self.assertIsNotNone(service.stub)
241  service.server.stop(None)
242 
244  service = _CreateIncompleteService()
245  request = request_pb2.SimpleRequest(response_size=13)
246  with self.assertRaises(grpc.RpcError) as exception_context:
247  service.stub.UnaryCall(request)
248  self.assertIs(exception_context.exception.code(),
249  grpc.StatusCode.UNIMPLEMENTED)
250  service.server.stop(None)
251 
252  def testUnaryCall(self):
253  service = _CreateService()
254  request = request_pb2.SimpleRequest(response_size=13)
255  response = service.stub.UnaryCall(request)
256  expected_response = service.servicer_methods.UnaryCall(
257  request, 'not a real context!')
258  self.assertEqual(expected_response, response)
259  service.server.stop(None)
260 
262  service = _CreateService()
263  request = request_pb2.SimpleRequest(response_size=13)
264  # Check that the call does not block waiting for the server to respond.
265  with service.servicer_methods.pause():
266  response_future = service.stub.UnaryCall.future(request)
267  response = response_future.result()
268  expected_response = service.servicer_methods.UnaryCall(
269  request, 'not a real RpcContext!')
270  self.assertEqual(expected_response, response)
271  service.server.stop(None)
272 
274  service = _CreateService()
275  request = request_pb2.SimpleRequest(response_size=13)
276  with service.servicer_methods.pause():
277  response_future = service.stub.UnaryCall.future(
278  request, timeout=test_constants.SHORT_TIMEOUT)
279  with self.assertRaises(grpc.RpcError) as exception_context:
280  response_future.result()
281  self.assertIs(exception_context.exception.code(),
282  grpc.StatusCode.DEADLINE_EXCEEDED)
283  self.assertIs(response_future.code(), grpc.StatusCode.DEADLINE_EXCEEDED)
284  service.server.stop(None)
285 
287  service = _CreateService()
288  request = request_pb2.SimpleRequest(response_size=13)
289  with service.servicer_methods.pause():
290  response_future = service.stub.UnaryCall.future(request)
291  response_future.cancel()
292  self.assertTrue(response_future.cancelled())
293  self.assertIs(response_future.code(), grpc.StatusCode.CANCELLED)
294  service.server.stop(None)
295 
297  service = _CreateService()
298  request = request_pb2.SimpleRequest(response_size=13)
299  with service.servicer_methods.fail():
300  response_future = service.stub.UnaryCall.future(request)
301  self.assertIsNotNone(response_future.exception())
302  self.assertIs(response_future.code(), grpc.StatusCode.UNKNOWN)
303  service.server.stop(None)
304 
306  service = _CreateService()
307  request = _streaming_output_request()
308  responses = service.stub.StreamingOutputCall(request)
309  expected_responses = service.servicer_methods.StreamingOutputCall(
310  request, 'not a real RpcContext!')
311  for expected_response, response in moves.zip_longest(
312  expected_responses, responses):
313  self.assertEqual(expected_response, response)
314  service.server.stop(None)
315 
317  service = _CreateService()
318  request = _streaming_output_request()
319  with service.servicer_methods.pause():
320  responses = service.stub.StreamingOutputCall(
321  request, timeout=test_constants.SHORT_TIMEOUT)
322  with self.assertRaises(grpc.RpcError) as exception_context:
323  list(responses)
324  self.assertIs(exception_context.exception.code(),
325  grpc.StatusCode.DEADLINE_EXCEEDED)
326  service.server.stop(None)
327 
329  service = _CreateService()
330  request = _streaming_output_request()
331  responses = service.stub.StreamingOutputCall(request)
332  next(responses)
333  responses.cancel()
334  with self.assertRaises(grpc.RpcError) as exception_context:
335  next(responses)
336  self.assertIs(responses.code(), grpc.StatusCode.CANCELLED)
337  service.server.stop(None)
338 
340  service = _CreateService()
341  request = _streaming_output_request()
342  with service.servicer_methods.fail():
343  responses = service.stub.StreamingOutputCall(request)
344  self.assertIsNotNone(responses)
345  with self.assertRaises(grpc.RpcError) as exception_context:
346  next(responses)
347  self.assertIs(exception_context.exception.code(),
348  grpc.StatusCode.UNKNOWN)
349  service.server.stop(None)
350 
352  service = _CreateService()
353  response = service.stub.StreamingInputCall(
355  expected_response = service.servicer_methods.StreamingInputCall(
356  _streaming_input_request_iterator(), 'not a real RpcContext!')
357  self.assertEqual(expected_response, response)
358  service.server.stop(None)
359 
361  service = _CreateService()
362  with service.servicer_methods.pause():
363  response_future = service.stub.StreamingInputCall.future(
365  response = response_future.result()
366  expected_response = service.servicer_methods.StreamingInputCall(
367  _streaming_input_request_iterator(), 'not a real RpcContext!')
368  self.assertEqual(expected_response, response)
369  service.server.stop(None)
370 
372  service = _CreateService()
373  with service.servicer_methods.pause():
374  response_future = service.stub.StreamingInputCall.future(
376  timeout=test_constants.SHORT_TIMEOUT)
377  with self.assertRaises(grpc.RpcError) as exception_context:
378  response_future.result()
379  self.assertIsInstance(response_future.exception(), grpc.RpcError)
380  self.assertIs(response_future.exception().code(),
381  grpc.StatusCode.DEADLINE_EXCEEDED)
382  self.assertIs(exception_context.exception.code(),
383  grpc.StatusCode.DEADLINE_EXCEEDED)
384  service.server.stop(None)
385 
387  service = _CreateService()
388  with service.servicer_methods.pause():
389  response_future = service.stub.StreamingInputCall.future(
391  response_future.cancel()
392  self.assertTrue(response_future.cancelled())
393  with self.assertRaises(grpc.FutureCancelledError):
394  response_future.result()
395  service.server.stop(None)
396 
398  service = _CreateService()
399  with service.servicer_methods.fail():
400  response_future = service.stub.StreamingInputCall.future(
402  self.assertIsNotNone(response_future.exception())
403  self.assertIs(response_future.code(), grpc.StatusCode.UNKNOWN)
404  service.server.stop(None)
405 
407  service = _CreateService()
408  responses = service.stub.FullDuplexCall(_full_duplex_request_iterator())
409  expected_responses = service.servicer_methods.FullDuplexCall(
410  _full_duplex_request_iterator(), 'not a real RpcContext!')
411  for expected_response, response in moves.zip_longest(
412  expected_responses, responses):
413  self.assertEqual(expected_response, response)
414  service.server.stop(None)
415 
417  request_iterator = _full_duplex_request_iterator()
418  service = _CreateService()
419  with service.servicer_methods.pause():
420  responses = service.stub.FullDuplexCall(
421  request_iterator, timeout=test_constants.SHORT_TIMEOUT)
422  with self.assertRaises(grpc.RpcError) as exception_context:
423  list(responses)
424  self.assertIs(exception_context.exception.code(),
425  grpc.StatusCode.DEADLINE_EXCEEDED)
426  service.server.stop(None)
427 
429  service = _CreateService()
430  request_iterator = _full_duplex_request_iterator()
431  responses = service.stub.FullDuplexCall(request_iterator)
432  next(responses)
433  responses.cancel()
434  with self.assertRaises(grpc.RpcError) as exception_context:
435  next(responses)
436  self.assertIs(exception_context.exception.code(),
437  grpc.StatusCode.CANCELLED)
438  service.server.stop(None)
439 
441  request_iterator = _full_duplex_request_iterator()
442  service = _CreateService()
443  with service.servicer_methods.fail():
444  responses = service.stub.FullDuplexCall(request_iterator)
445  with self.assertRaises(grpc.RpcError) as exception_context:
446  next(responses)
447  self.assertIs(exception_context.exception.code(),
448  grpc.StatusCode.UNKNOWN)
449  service.server.stop(None)
450 
452  service = _CreateService()
453 
454  def half_duplex_request_iterator():
455  request = request_pb2.StreamingOutputCallRequest()
456  request.response_parameters.add(size=1, interval_us=0)
457  yield request
458  request = request_pb2.StreamingOutputCallRequest()
459  request.response_parameters.add(size=2, interval_us=0)
460  request.response_parameters.add(size=3, interval_us=0)
461  yield request
462 
463  responses = service.stub.HalfDuplexCall(half_duplex_request_iterator())
464  expected_responses = service.servicer_methods.HalfDuplexCall(
465  half_duplex_request_iterator(), 'not a real RpcContext!')
466  for expected_response, response in moves.zip_longest(
467  expected_responses, responses):
468  self.assertEqual(expected_response, response)
469  service.server.stop(None)
470 
472  condition = threading.Condition()
473  wait_cell = [False]
474 
475  @contextlib.contextmanager
476  def wait(): # pylint: disable=invalid-name
477  # Where's Python 3's 'nonlocal' statement when you need it?
478  with condition:
479  wait_cell[0] = True
480  yield
481  with condition:
482  wait_cell[0] = False
483  condition.notify_all()
484 
485  def half_duplex_request_iterator():
486  request = request_pb2.StreamingOutputCallRequest()
487  request.response_parameters.add(size=1, interval_us=0)
488  yield request
489  with condition:
490  while wait_cell[0]:
491  condition.wait()
492 
493  service = _CreateService()
494  with wait():
495  responses = service.stub.HalfDuplexCall(
496  half_duplex_request_iterator(),
497  timeout=test_constants.SHORT_TIMEOUT)
498  # half-duplex waits for the client to send all info
499  with self.assertRaises(grpc.RpcError) as exception_context:
500  next(responses)
501  self.assertIs(exception_context.exception.code(),
502  grpc.StatusCode.DEADLINE_EXCEEDED)
503  service.server.stop(None)
504 
505 
506 @unittest.skipIf(sys.version_info[0] < 3 or sys.version_info[1] < 6,
507  "Unsupported on Python 2.")
508 class SimpleStubsPluginTest(unittest.TestCase):
509  servicer_methods = _ServicerMethods()
510 
511  class Servicer(service_pb2_grpc.TestServiceServicer):
512 
513  def UnaryCall(self, request, context):
514  return SimpleStubsPluginTest.servicer_methods.UnaryCall(
515  request, context)
516 
517  def StreamingOutputCall(self, request, context):
518  return SimpleStubsPluginTest.servicer_methods.StreamingOutputCall(
519  request, context)
520 
521  def StreamingInputCall(self, request_iterator, context):
522  return SimpleStubsPluginTest.servicer_methods.StreamingInputCall(
523  request_iterator, context)
524 
525  def FullDuplexCall(self, request_iterator, context):
526  return SimpleStubsPluginTest.servicer_methods.FullDuplexCall(
527  request_iterator, context)
528 
529  def HalfDuplexCall(self, request_iterator, context):
530  return SimpleStubsPluginTest.servicer_methods.HalfDuplexCall(
531  request_iterator, context)
532 
533  def setUp(self):
534  super(SimpleStubsPluginTest, self).setUp()
535  self._server = test_common.test_server()
536  service_pb2_grpc.add_TestServiceServicer_to_server(
537  self.Servicer(), self._server)
538  self._port = self._server.add_insecure_port('[::]:0')
539  self._server.start()
540  self._target = 'localhost:{}'.format(self._port)
541 
542  def tearDown(self):
543  self._server.stop(None)
544  super(SimpleStubsPluginTest, self).tearDown()
545 
546  def testUnaryCall(self):
547  request = request_pb2.SimpleRequest(response_size=13)
548  response = service_pb2_grpc.TestService.UnaryCall(
549  request,
550  self._target,
552  ),
553  wait_for_ready=True)
554  expected_response = self.servicer_methods.UnaryCall(
555  request, 'not a real context!')
556  self.assertEqual(expected_response, response)
557 
559  request = request_pb2.SimpleRequest(response_size=13)
560  response = service_pb2_grpc.TestService.UnaryCall(request,
561  self._target,
562  insecure=True,
563  wait_for_ready=True)
564  expected_response = self.servicer_methods.UnaryCall(
565  request, 'not a real context!')
566  self.assertEqual(expected_response, response)
567 
569  request = _streaming_output_request()
570  expected_responses = self.servicer_methods.StreamingOutputCall(
571  request, 'not a real RpcContext!')
572  responses = service_pb2_grpc.TestService.StreamingOutputCall(
573  request,
574  self._target,
576  ),
577  wait_for_ready=True)
578  for expected_response, response in moves.zip_longest(
579  expected_responses, responses):
580  self.assertEqual(expected_response, response)
581 
583  response = service_pb2_grpc.TestService.StreamingInputCall(
585  self._target,
587  ),
588  wait_for_ready=True)
589  expected_response = self.servicer_methods.StreamingInputCall(
590  _streaming_input_request_iterator(), 'not a real RpcContext!')
591  self.assertEqual(expected_response, response)
592 
594  responses = service_pb2_grpc.TestService.FullDuplexCall(
596  self._target,
598  ),
599  wait_for_ready=True)
600  expected_responses = self.servicer_methods.FullDuplexCall(
601  _full_duplex_request_iterator(), 'not a real RpcContext!')
602  for expected_response, response in moves.zip_longest(
603  expected_responses, responses):
604  self.assertEqual(expected_response, response)
605 
607 
608  def half_duplex_request_iterator():
609  request = request_pb2.StreamingOutputCallRequest()
610  request.response_parameters.add(size=1, interval_us=0)
611  yield request
612  request = request_pb2.StreamingOutputCallRequest()
613  request.response_parameters.add(size=2, interval_us=0)
614  request.response_parameters.add(size=3, interval_us=0)
615  yield request
616 
617  responses = service_pb2_grpc.TestService.HalfDuplexCall(
618  half_duplex_request_iterator(),
619  self._target,
621  ),
622  wait_for_ready=True)
623  expected_responses = self.servicer_methods.HalfDuplexCall(
624  half_duplex_request_iterator(), 'not a real RpcContext!')
625  for expected_response, response in moves.zip_longest(
626  expected_responses, responses):
627  self.assertEqual(expected_response, response)
628 
629 
630 class ModuleMainTest(unittest.TestCase):
631  """Test case for running `python -m grpc_tools.protoc`.
632  """
633 
634  def test_clean_output(self):
635  if sys.executable is None:
636  raise unittest.SkipTest(
637  "Running on a interpreter that cannot be invoked from the CLI.")
638  proto_dir_path = os.path.join("src", "proto")
639  test_proto_path = os.path.join(proto_dir_path, "grpc", "testing",
640  "empty.proto")
641  streams = tuple(tempfile.TemporaryFile() for _ in range(2))
642  work_dir = tempfile.mkdtemp()
643  try:
644  invocation = (sys.executable, "-m", "grpc_tools.protoc",
645  "--proto_path", proto_dir_path, "--python_out",
646  work_dir, "--grpc_python_out", work_dir,
647  test_proto_path)
648  proc = subprocess.Popen(invocation,
649  stdout=streams[0],
650  stderr=streams[1])
651  proc.wait()
652  outs = []
653  for stream in streams:
654  stream.seek(0)
655  self.assertEqual(0, len(stream.read()))
656  self.assertEqual(0, proc.returncode)
657  except Exception: # pylint: disable=broad-except
658  shutil.rmtree(work_dir)
659 
660 
661 if __name__ == '__main__':
662  unittest.main(verbosity=2)
tests.protoc_plugin._python_plugin_test._CreateService
def _CreateService()
Definition: _python_plugin_test.py:136
tests.protoc_plugin._python_plugin_test.PythonPluginTest.testUnaryCallFutureExpired
def testUnaryCallFutureExpired(self)
Definition: _python_plugin_test.py:273
grpc.insecure_channel
def insecure_channel(target, options=None, compression=None)
Definition: src/python/grpcio/grpc/__init__.py:1962
tests.protoc_plugin._python_plugin_test._ServicerMethods._control
def _control(self)
Definition: _python_plugin_test.py:68
tests.protoc_plugin._python_plugin_test.PythonPluginTest.testFullDuplexCall
def testFullDuplexCall(self)
Definition: _python_plugin_test.py:406
grpc::experimental.insecure_channel_credentials
def insecure_channel_credentials()
Definition: src/python/grpcio/grpc/experimental/__init__.py:51
tests.protoc_plugin._python_plugin_test.SimpleStubsPluginTest._port
_port
Definition: _python_plugin_test.py:538
http2_test_server.format
format
Definition: http2_test_server.py:118
tests.protoc_plugin._python_plugin_test.PythonPluginTest.testStreamingInputCallFutureCancelled
def testStreamingInputCallFutureCancelled(self)
Definition: _python_plugin_test.py:386
tests.protoc_plugin._python_plugin_test.SimpleStubsPluginTest.Servicer.UnaryCall
def UnaryCall(self, request, context)
Definition: _python_plugin_test.py:513
tests.protoc_plugin._python_plugin_test._ServicerMethods.UnaryCall
def UnaryCall(self, request, unused_rpc_context)
Definition: _python_plugin_test.py:75
tests.protoc_plugin._python_plugin_test.ModuleMainTest.test_clean_output
def test_clean_output(self)
Definition: _python_plugin_test.py:634
tests.protoc_plugin._python_plugin_test.PythonPluginTest.testFullDuplexCallFailed
def testFullDuplexCallFailed(self)
Definition: _python_plugin_test.py:440
tests.protoc_plugin._python_plugin_test.PythonPluginTest
Definition: _python_plugin_test.py:220
tests.protoc_plugin._python_plugin_test._ServicerMethods.HalfDuplexCall
def HalfDuplexCall(self, request_iter, unused_rpc_context)
Definition: _python_plugin_test.py:108
capstone.range
range
Definition: third_party/bloaty/third_party/capstone/bindings/python/capstone/__init__.py:6
tests.protoc_plugin._python_plugin_test._ServicerMethods.__init__
def __init__(self)
Definition: _python_plugin_test.py:46
grpc.FutureCancelledError
Definition: src/python/grpcio/grpc/__init__.py:44
tests.protoc_plugin._python_plugin_test._streaming_output_request
def _streaming_output_request()
Definition: _python_plugin_test.py:201
tests.protoc_plugin._python_plugin_test.PythonPluginTest.testStreamingInputCallFutureFailed
def testStreamingInputCallFutureFailed(self)
Definition: _python_plugin_test.py:397
tests.protoc_plugin._python_plugin_test._Service
Definition: _python_plugin_test.py:126
tests.protoc_plugin._python_plugin_test.PythonPluginTest.testStreamingOutputCall
def testStreamingOutputCall(self)
Definition: _python_plugin_test.py:305
tests.protoc_plugin._python_plugin_test.PythonPluginTest.testUnaryCallFutureFailed
def testUnaryCallFutureFailed(self)
Definition: _python_plugin_test.py:296
tests.protoc_plugin._python_plugin_test.PythonPluginTest.testStreamingOutputCallExpired
def testStreamingOutputCallExpired(self)
Definition: _python_plugin_test.py:316
grpc.RpcError
Definition: src/python/grpcio/grpc/__init__.py:302
tests.protoc_plugin._python_plugin_test._CreateIncompleteService
def _CreateIncompleteService()
Definition: _python_plugin_test.py:172
tests.protoc_plugin._python_plugin_test.SimpleStubsPluginTest.setUp
def setUp(self)
Definition: _python_plugin_test.py:533
tests.protoc_plugin._python_plugin_test._full_duplex_request_iterator
def _full_duplex_request_iterator()
Definition: _python_plugin_test.py:210
tests.protoc_plugin._python_plugin_test.PythonPluginTest.testUnaryCallFutureCancelled
def testUnaryCallFutureCancelled(self)
Definition: _python_plugin_test.py:286
tests.protoc_plugin._python_plugin_test._ServicerMethods.StreamingInputCall
def StreamingInputCall(self, request_iter, unused_rpc_context)
Definition: _python_plugin_test.py:90
tests.protoc_plugin._python_plugin_test.SimpleStubsPluginTest._server
_server
Definition: _python_plugin_test.py:535
tests.protoc_plugin._python_plugin_test.SimpleStubsPluginTest.testUnaryCallInsecureSugar
def testUnaryCallInsecureSugar(self)
Definition: _python_plugin_test.py:558
tests.protoc_plugin._python_plugin_test.SimpleStubsPluginTest.tearDown
def tearDown(self)
Definition: _python_plugin_test.py:542
tests.protoc_plugin._python_plugin_test.PythonPluginTest.testFullDuplexCallCancelled
def testFullDuplexCallCancelled(self)
Definition: _python_plugin_test.py:428
start
static uint64_t start
Definition: benchmark-pound.c:74
tests.protoc_plugin._python_plugin_test.SimpleStubsPluginTest.Servicer.StreamingOutputCall
def StreamingOutputCall(self, request, context)
Definition: _python_plugin_test.py:517
tests.protoc_plugin._python_plugin_test.PythonPluginTest.testUnaryCall
def testUnaryCall(self)
Definition: _python_plugin_test.py:252
tests.protoc_plugin._python_plugin_test.PythonPluginTest.testStreamingOutputCallCancelled
def testStreamingOutputCallCancelled(self)
Definition: _python_plugin_test.py:328
grpc::experimental
Definition: include/grpcpp/channel.h:46
tests.protoc_plugin._python_plugin_test.PythonPluginTest.testStreamingOutputCallFailed
def testStreamingOutputCallFailed(self)
Definition: _python_plugin_test.py:339
tests.protoc_plugin._python_plugin_test.SimpleStubsPluginTest.testHalfDuplexCall
def testHalfDuplexCall(self)
Definition: _python_plugin_test.py:606
tests.protoc_plugin._python_plugin_test.SimpleStubsPluginTest
Definition: _python_plugin_test.py:508
tests.protoc_plugin._python_plugin_test.SimpleStubsPluginTest.servicer_methods
servicer_methods
Definition: _python_plugin_test.py:509
tests.protoc_plugin._python_plugin_test.SimpleStubsPluginTest._target
_target
Definition: _python_plugin_test.py:540
tests.protoc_plugin._python_plugin_test._ServicerMethods.FullDuplexCall
def FullDuplexCall(self, request_iter, unused_rpc_context)
Definition: _python_plugin_test.py:99
tests.protoc_plugin._python_plugin_test.PythonPluginTest.testStreamingInputCall
def testStreamingInputCall(self)
Definition: _python_plugin_test.py:351
tests.protoc_plugin._python_plugin_test._streaming_input_request_iterator
def _streaming_input_request_iterator()
Definition: _python_plugin_test.py:193
tests.protoc_plugin._python_plugin_test.PythonPluginTest.testHalfDuplexCall
def testHalfDuplexCall(self)
Definition: _python_plugin_test.py:451
tests.protoc_plugin._python_plugin_test.SimpleStubsPluginTest.testUnaryCall
def testUnaryCall(self)
Definition: _python_plugin_test.py:546
tests.protoc_plugin._python_plugin_test.PythonPluginTest.testFullDuplexCallExpired
def testFullDuplexCallExpired(self)
Definition: _python_plugin_test.py:416
tests.protoc_plugin._python_plugin_test._ServicerMethods._paused
_paused
Definition: _python_plugin_test.py:48
tests.protoc_plugin._python_plugin_test.SimpleStubsPluginTest.testStreamingOutputCall
def testStreamingOutputCall(self)
Definition: _python_plugin_test.py:568
tests.protoc_plugin._python_plugin_test._ServicerMethods._fail
_fail
Definition: _python_plugin_test.py:49
tests.protoc_plugin._python_plugin_test._ServicerMethods.fail
def fail(self)
Definition: _python_plugin_test.py:61
tests.protoc_plugin._python_plugin_test.PythonPluginTest.testImportAttributes
def testImportAttributes(self)
Definition: _python_plugin_test.py:228
tests.protoc_plugin._python_plugin_test.SimpleStubsPluginTest.Servicer.FullDuplexCall
def FullDuplexCall(self, request_iterator, context)
Definition: _python_plugin_test.py:525
tests.unit
Definition: src/python/grpcio_tests/tests/unit/__init__.py:1
tests.protoc_plugin._python_plugin_test._ServicerMethods.pause
def pause(self)
Definition: _python_plugin_test.py:52
tests.protoc_plugin._python_plugin_test._ServicerMethods
Definition: _python_plugin_test.py:44
tests.protoc_plugin._python_plugin_test.SimpleStubsPluginTest.Servicer
Definition: _python_plugin_test.py:511
next
AllocList * next[kMaxLevel]
Definition: abseil-cpp/absl/base/internal/low_level_alloc.cc:100
tests.protoc_plugin._python_plugin_test.SimpleStubsPluginTest.testStreamingInputCall
def testStreamingInputCall(self)
Definition: _python_plugin_test.py:582
wait
static void wait(notification *n)
Definition: alts_tsi_handshaker_test.cc:114
tests.protoc_plugin._python_plugin_test.PythonPluginTest.testUpDown
def testUpDown(self)
Definition: _python_plugin_test.py:236
tests.protoc_plugin._python_plugin_test.PythonPluginTest.testUnaryCallFuture
def testUnaryCallFuture(self)
Definition: _python_plugin_test.py:261
tests.protoc_plugin._python_plugin_test._ServicerMethods.StreamingOutputCall
def StreamingOutputCall(self, request, unused_rpc_context)
Definition: _python_plugin_test.py:82
tests.protoc_plugin._python_plugin_test._ServicerMethods._condition
_condition
Definition: _python_plugin_test.py:47
tests.protoc_plugin._python_plugin_test.SimpleStubsPluginTest.Servicer.StreamingInputCall
def StreamingInputCall(self, request_iterator, context)
Definition: _python_plugin_test.py:521
stop
static const char stop[]
Definition: benchmark-async-pummel.c:35
tests.unit.framework.common
Definition: src/python/grpcio_tests/tests/unit/framework/common/__init__.py:1
tests.protoc_plugin._python_plugin_test.SERVICER_IDENTIFIER
string SERVICER_IDENTIFIER
Definition: _python_plugin_test.py:40
code
Definition: bloaty/third_party/zlib/contrib/infback9/inftree9.h:24
len
int len
Definition: abseil-cpp/absl/base/internal/low_level_alloc_test.cc:46
tests.protoc_plugin._python_plugin_test.PythonPluginTest.testHalfDuplexCallWedged
def testHalfDuplexCallWedged(self)
Definition: _python_plugin_test.py:471
tests.protoc_plugin._python_plugin_test.SimpleStubsPluginTest.testFullDuplexCall
def testFullDuplexCall(self)
Definition: _python_plugin_test.py:593
tests.protoc_plugin._python_plugin_test.ModuleMainTest
Definition: _python_plugin_test.py:630
tests.protoc_plugin._python_plugin_test.PythonPluginTest.testStreamingInputCallFutureExpired
def testStreamingInputCallFutureExpired(self)
Definition: _python_plugin_test.py:371
tests.protoc_plugin._python_plugin_test.PythonPluginTest.testStreamingInputCallFuture
def testStreamingInputCallFuture(self)
Definition: _python_plugin_test.py:360
tests.protoc_plugin._python_plugin_test.PythonPluginTest.testIncompleteServicer
def testIncompleteServicer(self)
Definition: _python_plugin_test.py:243
tests.protoc_plugin._python_plugin_test.SimpleStubsPluginTest.Servicer.HalfDuplexCall
def HalfDuplexCall(self, request_iterator, context)
Definition: _python_plugin_test.py:529


grpc
Author(s):
autogenerated on Thu Mar 13 2025 02:58:27