14 """Tests server context abort mechanism"""
27 _ABORT =
'/test/abort'
28 _ABORT_WITH_STATUS =
'/test/AbortWithStatus'
29 _INVALID_CODE =
'/test/InvalidCode'
31 _REQUEST = b
'\x00\x00\x00'
32 _RESPONSE = b
'\x00\x00\x00'
34 _ABORT_DETAILS =
'Abandon ship!'
35 _ABORT_METADATA = ((
'a-trailing-metadata',
'42'),)
39 collections.namedtuple(
'_Status',
40 (
'code',
'details',
'trailing_metadata')),
49 do_not_leak_me = _Object()
53 this_should_not_be_leaked = do_not_leak_me
54 servicer_context.abort(
55 grpc.StatusCode.INTERNAL,
58 raise Exception(
'This line should not be executed!')
62 servicer_context.abort_with_status(
64 code=grpc.StatusCode.INTERNAL,
65 details=_ABORT_DETAILS,
66 trailing_metadata=_ABORT_METADATA,
68 raise Exception(
'This line should not be executed!')
72 servicer_context.abort(
81 if handler_call_details.method == _ABORT:
83 elif handler_call_details.method == _ABORT_WITH_STATUS:
85 abort_with_status_unary_unary)
86 elif handler_call_details.method == _INVALID_CODE:
88 invalid_code_unary_unary)
97 port = self.
_server.add_insecure_port(
'[::]:0')
110 rpc_error = exception_context.exception
112 self.assertEqual(rpc_error.code(), grpc.StatusCode.INTERNAL)
113 self.assertEqual(rpc_error.details(), _ABORT_DETAILS)
119 @unittest.skip(
"https://github.com/grpc/grpc/issues/17927")
121 global do_not_leak_me
122 weak_ref = weakref.ref(do_not_leak_me)
131 do_not_leak_me =
None
132 self.assertIsNone(weak_ref())
137 rpc_error = exception_context.exception
139 self.assertEqual(rpc_error.code(), grpc.StatusCode.INTERNAL)
140 self.assertEqual(rpc_error.details(), _ABORT_DETAILS)
141 self.assertEqual(rpc_error.trailing_metadata(), _ABORT_METADATA)
146 rpc_error = exception_context.exception
148 self.assertEqual(rpc_error.code(), grpc.StatusCode.UNKNOWN)
149 self.assertEqual(rpc_error.details(), _ABORT_DETAILS)
152 if __name__ ==
'__main__':
153 logging.basicConfig()
154 unittest.main(verbosity=2)