_servicer_context.py
Go to the documentation of this file.
1 # Copyright 2017 gRPC authors.
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 # http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14 
15 import grpc
16 from grpc_testing import _common
17 
18 
20 
21  def __init__(self, rpc, time, deadline):
22  self._rpc = rpc
23  self._time = time
24  self._deadline = deadline
25 
26  def is_active(self):
27  return self._rpc.is_active()
28 
29  def time_remaining(self):
30  if self._rpc.is_active():
31  if self._deadline is None:
32  return None
33  else:
34  return max(0.0, self._deadline - self._time.time())
35  else:
36  return 0.0
37 
38  def cancel(self):
39  self._rpc.application_cancel()
40 
41  def add_callback(self, callback):
42  return self._rpc.add_callback(callback)
43 
45  return self._rpc.invocation_metadata()
46 
47  def peer(self):
48  raise NotImplementedError()
49 
50  def peer_identities(self):
51  raise NotImplementedError()
52 
53  def peer_identity_key(self):
54  raise NotImplementedError()
55 
56  def auth_context(self):
57  raise NotImplementedError()
58 
59  def set_compression(self):
60  raise NotImplementedError()
61 
62  def send_initial_metadata(self, initial_metadata):
63  initial_metadata_sent = self._rpc.send_initial_metadata(
64  _common.fuss_with_metadata(initial_metadata))
65  if not initial_metadata_sent:
66  raise ValueError(
67  'ServicerContext.send_initial_metadata called too late!')
68 
70  raise NotImplementedError()
71 
72  def set_trailing_metadata(self, trailing_metadata):
74  _common.fuss_with_metadata(trailing_metadata))
75 
76  def abort(self, code, details):
77  with self._rpc._condition:
78  self._rpc._abort(code, details)
79  raise Exception()
80 
81  def abort_with_status(self, status):
82  raise NotImplementedError()
83 
84  def set_code(self, code):
85  self._rpc.set_code(code)
86 
87  def set_details(self, details):
88  self._rpc.set_details(details)
grpc_testing._server._servicer_context.ServicerContext._deadline
_deadline
Definition: _servicer_context.py:24
grpc_testing._server._servicer_context.ServicerContext.disable_next_message_compression
def disable_next_message_compression(self)
Definition: _servicer_context.py:69
grpc_testing._server._servicer_context.ServicerContext.abort_with_status
def abort_with_status(self, status)
Definition: _servicer_context.py:81
grpc_testing._server._servicer_context.ServicerContext
Definition: _servicer_context.py:19
grpc_testing._server._servicer_context.ServicerContext.peer_identities
def peer_identities(self)
Definition: _servicer_context.py:50
grpc_testing._server._servicer_context.ServicerContext.is_active
def is_active(self)
Definition: _servicer_context.py:26
grpc_testing._server._servicer_context.ServicerContext.set_details
def set_details(self, details)
Definition: _servicer_context.py:87
grpc_testing._server._servicer_context.ServicerContext._rpc
_rpc
Definition: _servicer_context.py:22
grpc_testing._server._servicer_context.ServicerContext.set_compression
def set_compression(self)
Definition: _servicer_context.py:59
_abort
static ABSL_ATTRIBUTE_NORETURN void _abort()
Definition: bloaty/src/re.h:60
grpc_testing._server._servicer_context.ServicerContext._time
_time
Definition: _servicer_context.py:23
max
int max
Definition: bloaty/third_party/zlib/examples/enough.c:170
grpc_testing._server._servicer_context.ServicerContext.add_callback
def add_callback(self, callback)
Definition: _servicer_context.py:41
grpc_testing._server._servicer_context.ServicerContext.cancel
def cancel(self)
Definition: _servicer_context.py:38
grpc_testing._server._servicer_context.ServicerContext.auth_context
def auth_context(self)
Definition: _servicer_context.py:56
grpc_testing._server._servicer_context.ServicerContext.set_code
def set_code(self, code)
Definition: _servicer_context.py:84
grpc_testing._server._servicer_context.ServicerContext.time_remaining
def time_remaining(self)
Definition: _servicer_context.py:29
grpc.ServicerContext
Definition: src/python/grpcio/grpc/__init__.py:1083
grpc_testing._server._servicer_context.ServicerContext.__init__
def __init__(self, rpc, time, deadline)
Definition: _servicer_context.py:21
grpc_testing._server._servicer_context.ServicerContext.peer_identity_key
def peer_identity_key(self)
Definition: _servicer_context.py:53
grpc_testing._server._servicer_context.ServicerContext.abort
def abort(self, code, details)
Definition: _servicer_context.py:76
grpc_testing._server._servicer_context.ServicerContext.invocation_metadata
def invocation_metadata(self)
Definition: _servicer_context.py:44
grpc_testing._server._servicer_context.ServicerContext.set_trailing_metadata
def set_trailing_metadata(self, trailing_metadata)
Definition: _servicer_context.py:72
grpc_testing._server._servicer_context.ServicerContext.send_initial_metadata
def send_initial_metadata(self, initial_metadata)
Definition: _servicer_context.py:62
grpc_testing._server._servicer_context.ServicerContext.peer
def peer(self)
Definition: _servicer_context.py:47


grpc
Author(s):
autogenerated on Thu Mar 13 2025 02:58:27