grpc/aio/_server.py
Go to the documentation of this file.
1 # Copyright 2019 The gRPC Authors
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 # http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14 """Server-side implementation of gRPC Asyncio Python."""
15 
16 from concurrent.futures import Executor
17 from typing import Any, Optional, Sequence
18 
19 import grpc
20 from grpc import _common
21 from grpc import _compression
22 from grpc._cython import cygrpc
23 
24 from . import _base_server
25 from ._interceptor import ServerInterceptor
26 from ._typing import ChannelArgumentType
27 
28 
29 def _augment_channel_arguments(base_options: ChannelArgumentType,
30  compression: Optional[grpc.Compression]):
31  compression_option = _compression.create_channel_option(compression)
32  return tuple(base_options) + compression_option
33 
34 
36  """Serves RPCs."""
37 
38  def __init__(self, thread_pool: Optional[Executor],
39  generic_handlers: Optional[Sequence[grpc.GenericRpcHandler]],
40  interceptors: Optional[Sequence[Any]],
41  options: ChannelArgumentType,
42  maximum_concurrent_rpcs: Optional[int],
43  compression: Optional[grpc.Compression]):
44  self._loop = cygrpc.get_working_loop()
45  if interceptors:
46  invalid_interceptors = [
47  interceptor for interceptor in interceptors
48  if not isinstance(interceptor, ServerInterceptor)
49  ]
50  if invalid_interceptors:
51  raise ValueError(
52  'Interceptor must be ServerInterceptor, the '
53  f'following are invalid: {invalid_interceptors}')
54  self._server = cygrpc.AioServer(
55  self._loop, thread_pool, generic_handlers, interceptors,
56  _augment_channel_arguments(options, compression),
57  maximum_concurrent_rpcs)
58 
60  self,
61  generic_rpc_handlers: Sequence[grpc.GenericRpcHandler]) -> None:
62  """Registers GenericRpcHandlers with this Server.
63 
64  This method is only safe to call before the server is started.
65 
66  Args:
67  generic_rpc_handlers: A sequence of GenericRpcHandlers that will be
68  used to service RPCs.
69  """
70  self._server.add_generic_rpc_handlers(generic_rpc_handlers)
71 
72  def add_insecure_port(self, address: str) -> int:
73  """Opens an insecure port for accepting RPCs.
74 
75  This method may only be called before starting the server.
76 
77  Args:
78  address: The address for which to open a port. If the port is 0,
79  or not specified in the address, then the gRPC runtime will choose a port.
80 
81  Returns:
82  An integer port on which the server will accept RPC requests.
83  """
84  return _common.validate_port_binding_result(
85  address, self._server.add_insecure_port(_common.encode(address)))
86 
87  def add_secure_port(self, address: str,
88  server_credentials: grpc.ServerCredentials) -> int:
89  """Opens a secure port for accepting RPCs.
90 
91  This method may only be called before starting the server.
92 
93  Args:
94  address: The address for which to open a port.
95  if the port is 0, or not specified in the address, then the gRPC
96  runtime will choose a port.
97  server_credentials: A ServerCredentials object.
98 
99  Returns:
100  An integer port on which the server will accept RPC requests.
101  """
102  return _common.validate_port_binding_result(
103  address,
104  self._server.add_secure_port(_common.encode(address),
105  server_credentials))
106 
107  async def start(self) -> None:
108  """Starts this Server.
109 
110  This method may only be called once. (i.e. it is not idempotent).
111  """
112  await self._server.start()
113 
114  async def stop(self, grace: Optional[float]) -> None:
115  """Stops this Server.
116 
117  This method immediately stops the server from servicing new RPCs in
118  all cases.
119 
120  If a grace period is specified, this method returns immediately and all
121  RPCs active at the end of the grace period are aborted. If a grace
122  period is not specified (by passing None for grace), all existing RPCs
123  are aborted immediately and this method blocks until the last RPC
124  handler terminates.
125 
126  This method is idempotent and may be called at any time. Passing a
127  smaller grace value in a subsequent call will have the effect of
128  stopping the Server sooner (passing None will have the effect of
129  stopping the server immediately). Passing a larger grace value in a
130  subsequent call will not have the effect of stopping the server later
131  (i.e. the most restrictive grace value is used).
132 
133  Args:
134  grace: A duration of time in seconds or None.
135  """
136  await self._server.shutdown(grace)
137 
138  async def wait_for_termination(self,
139  timeout: Optional[float] = None) -> bool:
140  """Block current coroutine until the server stops.
141 
142  This is an EXPERIMENTAL API.
143 
144  The wait will not consume computational resources during blocking, and
145  it will block until one of the two following conditions are met:
146 
147  1) The server is stopped or terminated;
148  2) A timeout occurs if timeout is not `None`.
149 
150  The timeout argument works in the same way as `threading.Event.wait()`.
151  https://docs.python.org/3/library/threading.html#threading.Event.wait
152 
153  Args:
154  timeout: A floating point number specifying a timeout for the
155  operation in seconds.
156 
157  Returns:
158  A bool indicates if the operation times out.
159  """
160  return await self._server.wait_for_termination(timeout)
161 
162  def __del__(self):
163  """Schedules a graceful shutdown in current event loop.
164 
165  The Cython AioServer doesn't hold a ref-count to this class. It should
166  be safe to slightly extend the underlying Cython object's life span.
167  """
168  if hasattr(self, '_server'):
169  if self._server.is_running():
170  cygrpc.schedule_coro_threadsafe(
171  self._server.shutdown(None),
172  self._loop,
173  )
174 
175 
176 def server(migration_thread_pool: Optional[Executor] = None,
177  handlers: Optional[Sequence[grpc.GenericRpcHandler]] = None,
178  interceptors: Optional[Sequence[Any]] = None,
179  options: Optional[ChannelArgumentType] = None,
180  maximum_concurrent_rpcs: Optional[int] = None,
181  compression: Optional[grpc.Compression] = None):
182  """Creates a Server with which RPCs can be serviced.
183 
184  Args:
185  migration_thread_pool: A futures.ThreadPoolExecutor to be used by the
186  Server to execute non-AsyncIO RPC handlers for migration purpose.
187  handlers: An optional list of GenericRpcHandlers used for executing RPCs.
188  More handlers may be added by calling add_generic_rpc_handlers any time
189  before the server is started.
190  interceptors: An optional list of ServerInterceptor objects that observe
191  and optionally manipulate the incoming RPCs before handing them over to
192  handlers. The interceptors are given control in the order they are
193  specified. This is an EXPERIMENTAL API.
194  options: An optional list of key-value pairs (:term:`channel_arguments` in gRPC runtime)
195  to configure the channel.
196  maximum_concurrent_rpcs: The maximum number of concurrent RPCs this server
197  will service before returning RESOURCE_EXHAUSTED status, or None to
198  indicate no limit.
199  compression: An element of grpc.compression, e.g.
200  grpc.compression.Gzip. This compression algorithm will be used for the
201  lifetime of the server unless overridden by set_compression. This is an
202  EXPERIMENTAL option.
203 
204  Returns:
205  A Server object.
206  """
207  return Server(migration_thread_pool, () if handlers is None else handlers,
208  () if interceptors is None else interceptors,
209  () if options is None else options, maximum_concurrent_rpcs,
210  compression)
grpc.aio._base_server.Server
Definition: _base_server.py:28
grpc.aio._server.Server.__del__
def __del__(self)
Definition: grpc/aio/_server.py:162
grpc.aio._server.Server.wait_for_termination
bool wait_for_termination(self, Optional[float] timeout=None)
Definition: grpc/aio/_server.py:138
grpc.aio._server._augment_channel_arguments
def _augment_channel_arguments(ChannelArgumentType base_options, Optional[grpc.Compression] compression)
Definition: grpc/aio/_server.py:29
grpc.aio._server.server
def server(Optional[Executor] migration_thread_pool=None, Optional[Sequence[grpc.GenericRpcHandler]] handlers=None, Optional[Sequence[Any]] interceptors=None, Optional[ChannelArgumentType] options=None, Optional[int] maximum_concurrent_rpcs=None, Optional[grpc.Compression] compression=None)
Definition: grpc/aio/_server.py:176
grpc.aio._server.Server._server
_server
Definition: grpc/aio/_server.py:49
grpc.aio._server.Server
Definition: grpc/aio/_server.py:35
grpc::ServerCredentials
Wrapper around grpc_server_credentials, a way to authenticate a server.
Definition: include/grpcpp/security/server_credentials.h:76
grpc.GenericRpcHandler
Definition: src/python/grpcio/grpc/__init__.py:1333
grpc.aio._server.Server.__init__
def __init__(self, Optional[Executor] thread_pool, Optional[Sequence[grpc.GenericRpcHandler]] generic_handlers, Optional[Sequence[Any]] interceptors, ChannelArgumentType options, Optional[int] maximum_concurrent_rpcs, Optional[grpc.Compression] compression)
Definition: grpc/aio/_server.py:38
grpc.Compression
Definition: src/python/grpcio/grpc/__init__.py:2084
grpc.aio._server.Server.stop
None stop(self, Optional[float] grace)
Definition: grpc/aio/_server.py:114
grpc.aio._server.Server._loop
_loop
Definition: grpc/aio/_server.py:39
grpc.aio._server.Server.add_secure_port
int add_secure_port(self, str address, grpc.ServerCredentials server_credentials)
Definition: grpc/aio/_server.py:87
grpc._cython
Definition: src/python/grpcio/grpc/_cython/__init__.py:1
grpc.aio._server.Server.add_insecure_port
int add_insecure_port(self, str address)
Definition: grpc/aio/_server.py:72
grpc.aio._server.Server.add_generic_rpc_handlers
None add_generic_rpc_handlers(self, Sequence[grpc.GenericRpcHandler] generic_rpc_handlers)
Definition: grpc/aio/_server.py:59
grpc.aio._server.Server.start
None start(self)
Definition: grpc/aio/_server.py:107


grpc
Author(s):
autogenerated on Fri May 16 2025 02:57:38