testing/_server_test.py
Go to the documentation of this file.
1 # Copyright 2017 gRPC authors.
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 # http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14 
15 import time
16 import unittest
17 
18 import grpc
19 import grpc_testing
20 
21 from tests.testing import _application_common
22 from tests.testing import _application_testing_common
23 from tests.testing import _server_application
24 from tests.testing.proto import services_pb2
25 
26 
27 class FirstServiceServicerTest(unittest.TestCase):
28 
29  def setUp(self):
33  descriptors_to_servicers = {
34  _application_testing_common.FIRST_SERVICE: servicer
35  }
37  descriptors_to_servicers, self._real_time)
39  descriptors_to_servicers, self._fake_time)
40 
42  rpc = self._real_time_server.invoke_unary_unary(
43  _application_testing_common.FIRST_SERVICE_UNUN, (),
44  _application_common.UNARY_UNARY_REQUEST, None)
45  initial_metadata = rpc.initial_metadata()
46  response, trailing_metadata, code, details = rpc.termination()
47 
48  self.assertEqual(_application_common.UNARY_UNARY_RESPONSE, response)
49  self.assertIs(code, grpc.StatusCode.OK)
50 
52  rpc = self._real_time_server.invoke_unary_stream(
53  _application_testing_common.FIRST_SERVICE_UNSTRE, (),
54  _application_common.UNARY_STREAM_REQUEST, None)
55  initial_metadata = rpc.initial_metadata()
56  trailing_metadata, code, details = rpc.termination()
57 
58  self.assertIs(code, grpc.StatusCode.OK)
59 
61  rpc = self._real_time_server.invoke_stream_unary(
62  _application_testing_common.FIRST_SERVICE_STREUN, (), None)
63  rpc.send_request(_application_common.STREAM_UNARY_REQUEST)
64  rpc.send_request(_application_common.STREAM_UNARY_REQUEST)
65  rpc.send_request(_application_common.STREAM_UNARY_REQUEST)
66  rpc.requests_closed()
67  initial_metadata = rpc.initial_metadata()
68  response, trailing_metadata, code, details = rpc.termination()
69 
70  self.assertEqual(_application_common.STREAM_UNARY_RESPONSE, response)
71  self.assertIs(code, grpc.StatusCode.OK)
72 
74  rpc = self._real_time_server.invoke_stream_stream(
75  _application_testing_common.FIRST_SERVICE_STRESTRE, (), None)
76  rpc.send_request(_application_common.STREAM_STREAM_REQUEST)
77  initial_metadata = rpc.initial_metadata()
78  responses = [
79  rpc.take_response(),
80  rpc.take_response(),
81  ]
82  rpc.send_request(_application_common.STREAM_STREAM_REQUEST)
83  rpc.send_request(_application_common.STREAM_STREAM_REQUEST)
84  responses.extend([
85  rpc.take_response(),
86  rpc.take_response(),
87  rpc.take_response(),
88  rpc.take_response(),
89  ])
90  rpc.requests_closed()
91  trailing_metadata, code, details = rpc.termination()
92 
93  for response in responses:
94  self.assertEqual(_application_common.STREAM_STREAM_RESPONSE,
95  response)
96  self.assertIs(code, grpc.StatusCode.OK)
97 
99  rpc = self._real_time_server.invoke_stream_stream(
100  _application_testing_common.FIRST_SERVICE_STRESTRE, (), None)
101  rpc.send_request(_application_common.STREAM_STREAM_MUTATING_REQUEST)
102  initial_metadata = rpc.initial_metadata()
103  responses = [
104  rpc.take_response()
105  for _ in range(_application_common.STREAM_STREAM_MUTATING_COUNT)
106  ]
107  rpc.send_request(_application_common.STREAM_STREAM_MUTATING_REQUEST)
108  responses.extend([
109  rpc.take_response()
110  for _ in range(_application_common.STREAM_STREAM_MUTATING_COUNT)
111  ])
112  rpc.requests_closed()
113  _, _, _ = rpc.termination()
114  expected_responses = (
115  services_pb2.Bottom(first_bottom_field=0),
116  services_pb2.Bottom(first_bottom_field=1),
117  services_pb2.Bottom(first_bottom_field=0),
118  services_pb2.Bottom(first_bottom_field=1),
119  )
120  self.assertSequenceEqual(expected_responses, responses)
121 
123  rpc = self._real_time_server.invoke_unary_unary(
124  _application_testing_common.FIRST_SERVICE_UNUN, (),
125  _application_common.UNARY_UNARY_REQUEST, None)
126  first_initial_metadata = rpc.initial_metadata()
127  second_initial_metadata = rpc.initial_metadata()
128  third_initial_metadata = rpc.initial_metadata()
129  first_termination = rpc.termination()
130  second_termination = rpc.termination()
131  third_termination = rpc.termination()
132 
133  for later_initial_metadata in (
134  second_initial_metadata,
135  third_initial_metadata,
136  ):
137  self.assertEqual(first_initial_metadata, later_initial_metadata)
138  response = first_termination[0]
139  terminal_metadata = first_termination[1]
140  code = first_termination[2]
141  details = first_termination[3]
142  for later_termination in (
143  second_termination,
144  third_termination,
145  ):
146  self.assertEqual(response, later_termination[0])
147  self.assertEqual(terminal_metadata, later_termination[1])
148  self.assertIs(code, later_termination[2])
149  self.assertEqual(details, later_termination[3])
150  self.assertEqual(_application_common.UNARY_UNARY_RESPONSE, response)
151  self.assertIs(code, grpc.StatusCode.OK)
152 
154  rpc = self._real_time_server.invoke_unary_unary(
155  _application_testing_common.FIRST_SERVICE_UNUN, (),
156  _application_common.ERRONEOUS_UNARY_UNARY_REQUEST, None)
157  initial_metadata = rpc.initial_metadata()
158  response, trailing_metadata, code, details = rpc.termination()
159 
160  self.assertIsNot(code, grpc.StatusCode.OK)
161 
163  rpc = self._real_time_server.invoke_stream_unary(
164  _application_testing_common.FIRST_SERVICE_STREUN, (),
165  _application_common.INFINITE_REQUEST_STREAM_TIMEOUT)
166  rpc.send_request(_application_common.STREAM_UNARY_REQUEST)
167  rpc.send_request(_application_common.STREAM_UNARY_REQUEST)
168  rpc.send_request(_application_common.STREAM_UNARY_REQUEST)
169  initial_metadata = rpc.initial_metadata()
170  self._real_time.sleep_for(
171  _application_common.INFINITE_REQUEST_STREAM_TIMEOUT * 2)
172  rpc.send_request(_application_common.STREAM_UNARY_REQUEST)
173  response, trailing_metadata, code, details = rpc.termination()
174 
175  self.assertIs(code, grpc.StatusCode.DEADLINE_EXCEEDED)
176 
178  rpc = self._fake_time_server.invoke_stream_unary(
179  _application_testing_common.FIRST_SERVICE_STREUN, (),
180  _application_common.INFINITE_REQUEST_STREAM_TIMEOUT)
181  rpc.send_request(_application_common.STREAM_UNARY_REQUEST)
182  rpc.send_request(_application_common.STREAM_UNARY_REQUEST)
183  rpc.send_request(_application_common.STREAM_UNARY_REQUEST)
184  initial_metadata = rpc.initial_metadata()
185  self._fake_time.sleep_for(
186  _application_common.INFINITE_REQUEST_STREAM_TIMEOUT * 2)
187  rpc.send_request(_application_common.STREAM_UNARY_REQUEST)
188  response, trailing_metadata, code, details = rpc.termination()
189 
190  self.assertIs(code, grpc.StatusCode.DEADLINE_EXCEEDED)
191 
193  rpc = self._real_time_server.invoke_unary_unary(
194  _application_testing_common.FIRST_SERVICE_UNUN, (),
195  _application_common.ABORT_REQUEST, None)
196  _, _, code, _ = rpc.termination()
197  self.assertIs(code, grpc.StatusCode.PERMISSION_DENIED)
198  rpc = self._real_time_server.invoke_unary_unary(
199  _application_testing_common.FIRST_SERVICE_UNUN, (),
200  _application_common.ABORT_SUCCESS_QUERY, None)
201  response, _, code, _ = rpc.termination()
202  self.assertEqual(_application_common.ABORT_SUCCESS_RESPONSE, response)
203  self.assertIs(code, grpc.StatusCode.OK)
204 
205 
206 if __name__ == '__main__':
207  unittest.main(verbosity=2)
grpc_testing.server_from_dictionary
def server_from_dictionary(descriptors_to_servicers, time)
Definition: src/python/grpcio_testing/grpc_testing/__init__.py:682
tests.testing._server_test.FirstServiceServicerTest._real_time
_real_time
Definition: testing/_server_test.py:30
capstone.range
range
Definition: third_party/bloaty/third_party/capstone/bindings/python/capstone/__init__.py:6
tests.testing
Definition: src/python/grpcio_tests/tests/testing/__init__.py:1
tests.testing._server_test.FirstServiceServicerTest.test_infinite_request_stream_real_time
def test_infinite_request_stream_real_time(self)
Definition: testing/_server_test.py:162
tests.testing._server_application.FirstServiceServicer
Definition: _server_application.py:27
tests.testing._server_test.FirstServiceServicerTest.test_successful_stream_unary
def test_successful_stream_unary(self)
Definition: testing/_server_test.py:60
tests.testing._server_test.FirstServiceServicerTest.test_successful_unary_unary
def test_successful_unary_unary(self)
Definition: testing/_server_test.py:41
tests.testing._server_test.FirstServiceServicerTest.setUp
def setUp(self)
Definition: testing/_server_test.py:29
tests.testing._server_test.FirstServiceServicerTest
Definition: testing/_server_test.py:27
grpc_testing.strict_fake_time
def strict_fake_time(now)
Definition: src/python/grpcio_testing/grpc_testing/__init__.py:647
tests.testing._server_test.FirstServiceServicerTest.test_server_rpc_idempotence
def test_server_rpc_idempotence(self)
Definition: testing/_server_test.py:122
tests.testing._server_test.FirstServiceServicerTest._real_time_server
_real_time_server
Definition: testing/_server_test.py:36
tests.testing.proto
Definition: src/python/grpcio_tests/tests/testing/proto/__init__.py:1
tests.testing._server_test.FirstServiceServicerTest.test_misbehaving_client_unary_unary
def test_misbehaving_client_unary_unary(self)
Definition: testing/_server_test.py:153
tests.testing._server_test.FirstServiceServicerTest.test_servicer_context_abort
def test_servicer_context_abort(self)
Definition: testing/_server_test.py:192
tests.testing._server_test.FirstServiceServicerTest.test_infinite_request_stream_fake_time
def test_infinite_request_stream_fake_time(self)
Definition: testing/_server_test.py:177
tests.testing._server_test.FirstServiceServicerTest._fake_time
_fake_time
Definition: testing/_server_test.py:31
tests.testing._server_test.FirstServiceServicerTest._fake_time_server
_fake_time_server
Definition: testing/_server_test.py:38
tests.testing._server_test.FirstServiceServicerTest.test_successful_unary_stream
def test_successful_unary_stream(self)
Definition: testing/_server_test.py:51
tests.testing._server_test.FirstServiceServicerTest.test_successful_stream_stream
def test_successful_stream_stream(self)
Definition: testing/_server_test.py:73
grpc_testing.strict_real_time
def strict_real_time()
Definition: src/python/grpcio_testing/grpc_testing/__init__.py:632
tests.testing._server_test.FirstServiceServicerTest.test_mutating_stream_stream
def test_mutating_stream_stream(self)
Definition: testing/_server_test.py:98


grpc
Author(s):
autogenerated on Thu Mar 13 2025 02:58:27