_invalid_metadata_test.py
Go to the documentation of this file.
1 # Copyright 2016 gRPC authors.
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 # http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14 """Test of RPCs made against gRPC Python's application-layer API."""
15 
16 import logging
17 import unittest
18 
19 import grpc
20 
21 from tests.unit.framework.common import test_constants
22 
23 _SERIALIZE_REQUEST = lambda bytestring: bytestring * 2
24 _DESERIALIZE_REQUEST = lambda bytestring: bytestring[len(bytestring) // 2:]
25 _SERIALIZE_RESPONSE = lambda bytestring: bytestring * 3
26 _DESERIALIZE_RESPONSE = lambda bytestring: bytestring[:len(bytestring) // 3]
27 
28 _UNARY_UNARY = '/test/UnaryUnary'
29 _UNARY_STREAM = '/test/UnaryStream'
30 _STREAM_UNARY = '/test/StreamUnary'
31 _STREAM_STREAM = '/test/StreamStream'
32 
33 
35  return channel.unary_unary(_UNARY_UNARY)
36 
37 
39  return channel.unary_stream(_UNARY_STREAM,
40  request_serializer=_SERIALIZE_REQUEST,
41  response_deserializer=_DESERIALIZE_RESPONSE)
42 
43 
45  return channel.stream_unary(_STREAM_UNARY,
46  request_serializer=_SERIALIZE_REQUEST,
47  response_deserializer=_DESERIALIZE_RESPONSE)
48 
49 
51  return channel.stream_stream(_STREAM_STREAM)
52 
53 
54 class InvalidMetadataTest(unittest.TestCase):
55 
56  def setUp(self):
57  self._channel = grpc.insecure_channel('localhost:8080')
62 
63  def tearDown(self):
64  self._channel.close()
65 
67  request = b'\x07\x08'
68  metadata = (('InVaLiD', 'UnaryRequestBlockingUnaryResponse'),)
69  expected_error_details = "metadata was invalid: %s" % metadata
70  with self.assertRaises(ValueError) as exception_context:
71  self._unary_unary(request, metadata=metadata)
72  self.assertIn(expected_error_details, str(exception_context.exception))
73 
75  request = b'\x07\x08'
76  metadata = (('InVaLiD', 'UnaryRequestBlockingUnaryResponseWithCall'),)
77  expected_error_details = "metadata was invalid: %s" % metadata
78  with self.assertRaises(ValueError) as exception_context:
79  self._unary_unary.with_call(request, metadata=metadata)
80  self.assertIn(expected_error_details, str(exception_context.exception))
81 
83  request = b'\x07\x08'
84  metadata = (('InVaLiD', 'UnaryRequestFutureUnaryResponse'),)
85  expected_error_details = "metadata was invalid: %s" % metadata
86  with self.assertRaises(ValueError) as exception_context:
87  self._unary_unary.future(request, metadata=metadata)
88 
90  request = b'\x37\x58'
91  metadata = (('InVaLiD', 'UnaryRequestStreamResponse'),)
92  expected_error_details = "metadata was invalid: %s" % metadata
93  with self.assertRaises(ValueError) as exception_context:
94  self._unary_stream(request, metadata=metadata)
95  self.assertIn(expected_error_details, str(exception_context.exception))
96 
98  request_iterator = (
99  b'\x07\x08' for _ in range(test_constants.STREAM_LENGTH))
100  metadata = (('InVaLiD', 'StreamRequestBlockingUnaryResponse'),)
101  expected_error_details = "metadata was invalid: %s" % metadata
102  with self.assertRaises(ValueError) as exception_context:
103  self._stream_unary(request_iterator, metadata=metadata)
104  self.assertIn(expected_error_details, str(exception_context.exception))
105 
107  request_iterator = (
108  b'\x07\x08' for _ in range(test_constants.STREAM_LENGTH))
109  metadata = (('InVaLiD', 'StreamRequestBlockingUnaryResponseWithCall'),)
110  expected_error_details = "metadata was invalid: %s" % metadata
111  multi_callable = _stream_unary_multi_callable(self._channel)
112  with self.assertRaises(ValueError) as exception_context:
113  multi_callable.with_call(request_iterator, metadata=metadata)
114  self.assertIn(expected_error_details, str(exception_context.exception))
115 
117  request_iterator = (
118  b'\x07\x08' for _ in range(test_constants.STREAM_LENGTH))
119  metadata = (('InVaLiD', 'StreamRequestFutureUnaryResponse'),)
120  expected_error_details = "metadata was invalid: %s" % metadata
121  with self.assertRaises(ValueError) as exception_context:
122  self._stream_unary.future(request_iterator, metadata=metadata)
123  self.assertIn(expected_error_details, str(exception_context.exception))
124 
126  request_iterator = (
127  b'\x07\x08' for _ in range(test_constants.STREAM_LENGTH))
128  metadata = (('InVaLiD', 'StreamRequestStreamResponse'),)
129  expected_error_details = "metadata was invalid: %s" % metadata
130  with self.assertRaises(ValueError) as exception_context:
131  self._stream_stream(request_iterator, metadata=metadata)
132  self.assertIn(expected_error_details, str(exception_context.exception))
133 
135  self.assertRaises(TypeError, self._unary_unary, b'', metadata=42)
136 
137 
138 if __name__ == '__main__':
139  logging.basicConfig()
140  unittest.main(verbosity=2)
xds_interop_client.str
str
Definition: xds_interop_client.py:487
grpc.insecure_channel
def insecure_channel(target, options=None, compression=None)
Definition: src/python/grpcio/grpc/__init__.py:1962
tests.unit._invalid_metadata_test.InvalidMetadataTest.testUnaryRequestStreamResponse
def testUnaryRequestStreamResponse(self)
Definition: _invalid_metadata_test.py:89
tests.unit._invalid_metadata_test.InvalidMetadataTest.testUnaryRequestFutureUnaryResponse
def testUnaryRequestFutureUnaryResponse(self)
Definition: _invalid_metadata_test.py:82
tests.unit._invalid_metadata_test.InvalidMetadataTest._stream_stream
_stream_stream
Definition: _invalid_metadata_test.py:61
capstone.range
range
Definition: third_party/bloaty/third_party/capstone/bindings/python/capstone/__init__.py:6
tests.unit._invalid_metadata_test.InvalidMetadataTest._unary_stream
_unary_stream
Definition: _invalid_metadata_test.py:59
tests.unit._invalid_metadata_test._unary_unary_multi_callable
def _unary_unary_multi_callable(channel)
Definition: _invalid_metadata_test.py:34
tests.unit._invalid_metadata_test.InvalidMetadataTest._unary_unary
_unary_unary
Definition: _invalid_metadata_test.py:58
tests.unit._invalid_metadata_test.InvalidMetadataTest.testUnaryRequestBlockingUnaryResponseWithCall
def testUnaryRequestBlockingUnaryResponseWithCall(self)
Definition: _invalid_metadata_test.py:74
tests.unit._invalid_metadata_test._stream_unary_multi_callable
def _stream_unary_multi_callable(channel)
Definition: _invalid_metadata_test.py:44
tests.unit._exit_scenarios.future
future
Definition: _exit_scenarios.py:217
tests.unit._invalid_metadata_test.InvalidMetadataTest.setUp
def setUp(self)
Definition: _invalid_metadata_test.py:56
tests.unit._invalid_metadata_test.InvalidMetadataTest.tearDown
def tearDown(self)
Definition: _invalid_metadata_test.py:63
tests.unit._invalid_metadata_test.InvalidMetadataTest._channel
_channel
Definition: _invalid_metadata_test.py:57
tests.unit._invalid_metadata_test.InvalidMetadataTest
Definition: _invalid_metadata_test.py:54
close
#define close
Definition: test-fs.c:48
tests.unit._invalid_metadata_test.InvalidMetadataTest.testStreamRequestBlockingUnaryResponse
def testStreamRequestBlockingUnaryResponse(self)
Definition: _invalid_metadata_test.py:97
tests.unit._invalid_metadata_test.InvalidMetadataTest.testStreamRequestFutureUnaryResponse
def testStreamRequestFutureUnaryResponse(self)
Definition: _invalid_metadata_test.py:116
tests.unit._invalid_metadata_test.InvalidMetadataTest.testInvalidMetadata
def testInvalidMetadata(self)
Definition: _invalid_metadata_test.py:134
tests.unit._invalid_metadata_test.InvalidMetadataTest.testStreamRequestStreamResponse
def testStreamRequestStreamResponse(self)
Definition: _invalid_metadata_test.py:125
tests.unit._invalid_metadata_test.InvalidMetadataTest._stream_unary
_stream_unary
Definition: _invalid_metadata_test.py:60
tests.unit._invalid_metadata_test.InvalidMetadataTest.testStreamRequestBlockingUnaryResponseWithCall
def testStreamRequestBlockingUnaryResponseWithCall(self)
Definition: _invalid_metadata_test.py:106
tests.unit._invalid_metadata_test.InvalidMetadataTest.testUnaryRequestBlockingUnaryResponse
def testUnaryRequestBlockingUnaryResponse(self)
Definition: _invalid_metadata_test.py:66
tests.unit._invalid_metadata_test._unary_stream_multi_callable
def _unary_stream_multi_callable(channel)
Definition: _invalid_metadata_test.py:38
tests.unit.framework.common
Definition: src/python/grpcio_tests/tests/unit/framework/common/__init__.py:1
len
int len
Definition: abseil-cpp/absl/base/internal/low_level_alloc_test.cc:46
tests.unit._invalid_metadata_test._stream_stream_multi_callable
def _stream_stream_multi_callable(channel)
Definition: _invalid_metadata_test.py:50


grpc
Author(s):
autogenerated on Fri May 16 2025 02:57:38