14 """Test of RPCs made against gRPC Python's application-layer API."""
23 _SERIALIZE_REQUEST =
lambda bytestring: bytestring * 2
24 _DESERIALIZE_REQUEST =
lambda bytestring: bytestring[
len(bytestring) // 2:]
25 _SERIALIZE_RESPONSE =
lambda bytestring: bytestring * 3
26 _DESERIALIZE_RESPONSE =
lambda bytestring: bytestring[:
len(bytestring) // 3]
28 _UNARY_UNARY =
'/test/UnaryUnary'
29 _UNARY_STREAM =
'/test/UnaryStream'
30 _STREAM_UNARY =
'/test/StreamUnary'
31 _STREAM_STREAM =
'/test/StreamStream'
35 return channel.unary_unary(_UNARY_UNARY)
39 return channel.unary_stream(_UNARY_STREAM,
40 request_serializer=_SERIALIZE_REQUEST,
41 response_deserializer=_DESERIALIZE_RESPONSE)
45 return channel.stream_unary(_STREAM_UNARY,
46 request_serializer=_SERIALIZE_REQUEST,
47 response_deserializer=_DESERIALIZE_RESPONSE)
51 return channel.stream_stream(_STREAM_STREAM)
68 metadata = ((
'InVaLiD',
'UnaryRequestBlockingUnaryResponse'),)
69 expected_error_details =
"metadata was invalid: %s" % metadata
70 with self.assertRaises(ValueError)
as exception_context:
72 self.assertIn(expected_error_details,
str(exception_context.exception))
76 metadata = ((
'InVaLiD',
'UnaryRequestBlockingUnaryResponseWithCall'),)
77 expected_error_details =
"metadata was invalid: %s" % metadata
78 with self.assertRaises(ValueError)
as exception_context:
80 self.assertIn(expected_error_details,
str(exception_context.exception))
84 metadata = ((
'InVaLiD',
'UnaryRequestFutureUnaryResponse'),)
85 expected_error_details =
"metadata was invalid: %s" % metadata
86 with self.assertRaises(ValueError)
as exception_context:
91 metadata = ((
'InVaLiD',
'UnaryRequestStreamResponse'),)
92 expected_error_details =
"metadata was invalid: %s" % metadata
93 with self.assertRaises(ValueError)
as exception_context:
95 self.assertIn(expected_error_details,
str(exception_context.exception))
99 b
'\x07\x08' for _
in range(test_constants.STREAM_LENGTH))
100 metadata = ((
'InVaLiD',
'StreamRequestBlockingUnaryResponse'),)
101 expected_error_details =
"metadata was invalid: %s" % metadata
102 with self.assertRaises(ValueError)
as exception_context:
104 self.assertIn(expected_error_details,
str(exception_context.exception))
108 b
'\x07\x08' for _
in range(test_constants.STREAM_LENGTH))
109 metadata = ((
'InVaLiD',
'StreamRequestBlockingUnaryResponseWithCall'),)
110 expected_error_details =
"metadata was invalid: %s" % metadata
112 with self.assertRaises(ValueError)
as exception_context:
113 multi_callable.with_call(request_iterator, metadata=metadata)
114 self.assertIn(expected_error_details,
str(exception_context.exception))
118 b
'\x07\x08' for _
in range(test_constants.STREAM_LENGTH))
119 metadata = ((
'InVaLiD',
'StreamRequestFutureUnaryResponse'),)
120 expected_error_details =
"metadata was invalid: %s" % metadata
121 with self.assertRaises(ValueError)
as exception_context:
123 self.assertIn(expected_error_details,
str(exception_context.exception))
127 b
'\x07\x08' for _
in range(test_constants.STREAM_LENGTH))
128 metadata = ((
'InVaLiD',
'StreamRequestStreamResponse'),)
129 expected_error_details =
"metadata was invalid: %s" % metadata
130 with self.assertRaises(ValueError)
as exception_context:
132 self.assertIn(expected_error_details,
str(exception_context.exception))
135 self.assertRaises(TypeError, self.
_unary_unary, b
'', metadata=42)
138 if __name__ ==
'__main__':
139 logging.basicConfig()
140 unittest.main(verbosity=2)