_server_application.py
Go to the documentation of this file.
1 # Copyright 2017 gRPC authors.
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 # http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14 """An example gRPC Python-using server-side application."""
15 
16 import threading
17 
18 import grpc
19 
20 # requests_pb2 is a semantic dependency of this module.
21 from tests.testing import _application_common
22 from tests.testing.proto import requests_pb2 # pylint: disable=unused-import
23 from tests.testing.proto import services_pb2
24 from tests.testing.proto import services_pb2_grpc
25 
26 
27 class FirstServiceServicer(services_pb2_grpc.FirstServiceServicer):
28  """Services RPCs."""
29 
30  def __init__(self):
31  self._abort_lock = threading.RLock()
32  self._abort_response = _application_common.ABORT_NO_STATUS_RESPONSE
33 
34  def UnUn(self, request, context):
35  if request == _application_common.UNARY_UNARY_REQUEST:
36  return _application_common.UNARY_UNARY_RESPONSE
37  elif request == _application_common.ABORT_REQUEST:
38  with self._abort_lock:
39  try:
40  context.abort(grpc.StatusCode.PERMISSION_DENIED,
41  "Denying permission to test abort.")
42  except Exception as e: # pylint: disable=broad-except
43  self._abort_response = _application_common.ABORT_SUCCESS_RESPONSE
44  else:
45  self._abort_status = _application_common.ABORT_FAILURE_RESPONSE
46  return None # NOTE: For the linter.
47  elif request == _application_common.ABORT_SUCCESS_QUERY:
48  with self._abort_lock:
49  return self._abort_response
50  else:
51  context.set_code(grpc.StatusCode.INVALID_ARGUMENT)
52  context.set_details('Something is wrong with your request!')
53  return services_pb2.Down()
54 
55  def UnStre(self, request, context):
56  if _application_common.UNARY_STREAM_REQUEST != request:
57  context.set_code(grpc.StatusCode.INVALID_ARGUMENT)
58  context.set_details('Something is wrong with your request!')
59  return
60  yield services_pb2.Strange() # pylint: disable=unreachable
61 
62  def StreUn(self, request_iterator, context):
63  context.send_initial_metadata(((
64  'server_application_metadata_key',
65  'Hi there!',
66  ),))
67  for request in request_iterator:
68  if request != _application_common.STREAM_UNARY_REQUEST:
69  context.set_code(grpc.StatusCode.INVALID_ARGUMENT)
70  context.set_details('Something is wrong with your request!')
71  return services_pb2.Strange()
72  elif not context.is_active():
73  return services_pb2.Strange()
74  else:
75  return _application_common.STREAM_UNARY_RESPONSE
76 
77  def StreStre(self, request_iterator, context):
78  valid_requests = (_application_common.STREAM_STREAM_REQUEST,
79  _application_common.STREAM_STREAM_MUTATING_REQUEST)
80  for request in request_iterator:
81  if request not in valid_requests:
82  context.set_code(grpc.StatusCode.INVALID_ARGUMENT)
83  context.set_details('Something is wrong with your request!')
84  return
85  elif not context.is_active():
86  return
87  elif request == _application_common.STREAM_STREAM_REQUEST:
88  yield _application_common.STREAM_STREAM_RESPONSE
89  yield _application_common.STREAM_STREAM_RESPONSE
90  elif request == _application_common.STREAM_STREAM_MUTATING_REQUEST:
91  response = services_pb2.Bottom()
92  for i in range(
93  _application_common.STREAM_STREAM_MUTATING_COUNT):
94  response.first_bottom_field = i
95  yield response
tests.testing._server_application.FirstServiceServicer._abort_status
_abort_status
Definition: _server_application.py:45
capstone.range
range
Definition: third_party/bloaty/third_party/capstone/bindings/python/capstone/__init__.py:6
tests.testing
Definition: src/python/grpcio_tests/tests/testing/__init__.py:1
tests.testing._server_application.FirstServiceServicer._abort_lock
_abort_lock
Definition: _server_application.py:31
tests.testing._server_application.FirstServiceServicer.StreUn
def StreUn(self, request_iterator, context)
Definition: _server_application.py:62
tests.testing._server_application.FirstServiceServicer.__init__
def __init__(self)
Definition: _server_application.py:30
tests.testing._server_application.FirstServiceServicer
Definition: _server_application.py:27
tests.testing._server_application.FirstServiceServicer.StreStre
def StreStre(self, request_iterator, context)
Definition: _server_application.py:77
tests.testing._server_application.FirstServiceServicer.UnStre
def UnStre(self, request, context)
Definition: _server_application.py:55
tests.testing.proto
Definition: src/python/grpcio_tests/tests/testing/proto/__init__.py:1
tests.testing._server_application.FirstServiceServicer.UnUn
def UnUn(self, request, context)
Definition: _server_application.py:34
tests.testing._server_application.FirstServiceServicer._abort_response
_abort_response
Definition: _server_application.py:32


grpc
Author(s):
autogenerated on Thu Mar 13 2025 02:58:27