# Copyright 2018 Open Source Robotics Foundation, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Module for signal management functionality."""
import asyncio
from contextlib import ExitStack
import os
import platform
import signal
import socket
import threading
from typing import Callable
from typing import Optional
from typing import Tuple # noqa: F401
from typing import Union
[docs]
class AsyncSafeSignalManager:
"""
A context manager class for asynchronous handling of signals.
Similar in purpose to :func:`asyncio.loop.add_signal_handler` but
not limited to Unix platforms.
Signal handlers can be registered at any time with a given manager.
These will become active for the extent of said manager context.
Unlike regular signal handlers, asynchronous signals handlers
can safely interact with their event loop.
The same manager can be used multiple consecutive times and even
be nested with other managers, as these are independent from each
other i.e. managers do not override each other's handlers.
If used outside of the main thread, a ValueError is raised.
The underlying mechanism is built around :func:`signal.set_wakeup_fd`
so as to not interfere with regular handlers installed via
:func:`signal.signal`.
All signals received are forwarded to the previously setup file
descriptor, if any.
..warning::
Within (potentially nested) contexts, :func:`signal.set_wakeup_fd`
calls are intercepted such that the given file descriptor overrides
the previously setup file descriptor for the outermost manager.
This ensures the manager's chain of signal wakeup file descriptors
is not broken by third-party code or by asyncio itself in some platforms.
"""
__current = None # type: AsyncSafeSignalManager
__set_wakeup_fd = signal.set_wakeup_fd # type: Callable[[int], int]
[docs]
def __init__(
self,
loop: asyncio.AbstractEventLoop
):
"""
Instantiate manager.
:param loop: event loop that will handle the signals.
"""
self.__parent: Optional[AsyncSafeSignalManager] = None
self.__loop: asyncio.AbstractEventLoop = loop
self.__background_loop: Optional[asyncio.AbstractEventLoop] = None
self.__handlers: dict = {}
self.__prev_wakeup_handle: Union[int, socket.socket] = -1
self.__wsock: Optional[socket.socket] = None
self.__rsock: Optional[socket.socket] = None
self.__close_sockets: Optional[Callable] = None
def __enter__(self) -> 'AsyncSafeSignalManager':
pair = socket.socketpair() # type: Tuple[socket.socket, socket.socket] # noqa
with ExitStack() as stack:
self.__wsock = stack.enter_context(pair[0])
self.__rsock = stack.enter_context(pair[1])
if self.__wsock is not None:
self.__wsock.setblocking(False)
if self.__rsock is not None:
self.__rsock.setblocking(False)
self.__close_sockets = stack.pop_all().close
self.__add_signal_readers()
try:
self.__install_signal_writers()
except Exception:
self.__remove_signal_readers()
if self.__close_sockets is not None:
self.__close_sockets()
self.__rsock = None
self.__wsock = None
self.__close_sockets = None
raise
self.__chain()
return self
def __exit__(self, exc_type, exc_value, exc_traceback):
try:
try:
self.__uninstall_signal_writers()
finally:
self.__remove_signal_readers()
finally:
self.__unchain()
self.__close_sockets()
self.__rsock = None
self.__wsock = None
self.__close_sockets = None
def __add_signal_readers(self):
try:
self.__loop.add_reader(self.__rsock.fileno(), self.__handle_signal)
except NotImplementedError:
# Some event loops, like the asyncio.ProactorEventLoop
# on Windows, do not support asynchronous socket reads.
# Emulate it.
self.__background_loop = asyncio.SelectorEventLoop()
self.__background_loop.add_reader(
self.__rsock.fileno(),
self.__loop.call_soon_threadsafe,
self.__handle_signal)
def run_background_loop():
asyncio.set_event_loop(self.__background_loop)
self.__background_loop.run_forever()
self.__background_thread = threading.Thread(
target=run_background_loop, daemon=True)
self.__background_thread.start()
def __remove_signal_readers(self):
if self.__background_loop:
self.__background_loop.call_soon_threadsafe(self.__background_loop.stop)
self.__background_thread.join()
self.__background_loop.close()
self.__background_loop = None
else:
self.__loop.remove_reader(self.__rsock.fileno())
def __install_signal_writers(self):
prev_wakeup_handle = self.__set_wakeup_fd(self.__wsock.fileno())
try:
self.__chain_wakeup_handle(prev_wakeup_handle)
except Exception:
own_wakeup_handle = self.__set_wakeup_fd(prev_wakeup_handle)
assert self.__wsock.fileno() == own_wakeup_handle
raise
def __uninstall_signal_writers(self):
prev_wakeup_handle = self.__chain_wakeup_handle(-1)
own_wakeup_handle = self.__set_wakeup_fd(prev_wakeup_handle)
assert self.__wsock.fileno() == own_wakeup_handle
def __chain(self):
self.__parent = AsyncSafeSignalManager.__current
AsyncSafeSignalManager.__current = self
if self.__parent is None:
# Do not trust signal.set_wakeup_fd calls within context.
# Overwrite handle at the start of the managers' chain.
def modified_set_wakeup_fd(signum):
if threading.current_thread() is not threading.main_thread():
raise ValueError(
'set_wakeup_fd only works in main'
' thread of the main interpreter'
)
return self.__chain_wakeup_handle(signum)
signal.set_wakeup_fd = modified_set_wakeup_fd
def __unchain(self):
if self.__parent is None:
signal.set_wakeup_fd = self.__set_wakeup_fd
AsyncSafeSignalManager.__current = self.__parent
def __chain_wakeup_handle(self, wakeup_handle):
prev_wakeup_handle = self.__prev_wakeup_handle
if isinstance(prev_wakeup_handle, socket.socket):
# Detach (Windows) socket and retrieve the raw OS handle.
prev_wakeup_handle = prev_wakeup_handle.detach()
if wakeup_handle != -1 and platform.system() == 'Windows':
# On Windows, os.write will fail on a WinSock handle. There is no WinSock API
# in the standard library either. Thus we wrap it in a socket.socket instance.
try:
wakeup_handle = socket.socket(fileno=wakeup_handle)
except WindowsError as e:
if e.winerror != 10038: # WSAENOTSOCK
raise
self.__prev_wakeup_handle = wakeup_handle
return prev_wakeup_handle
def __handle_signal(self):
while True:
try:
data = self.__rsock.recv(4096)
if not data:
break
for signum in data:
if signum not in self.__handlers:
continue
self.__handlers[signum](signum)
if self.__prev_wakeup_handle != -1:
# Send over (Windows) socket or write file.
if isinstance(self.__prev_wakeup_handle, socket.socket):
self.__prev_wakeup_handle.send(data)
else:
os.write(self.__prev_wakeup_handle, data)
except InterruptedError:
continue
except BlockingIOError:
break
[docs]
def handle(
self,
signum: Union[signal.Signals, int],
handler: Optional[Callable[[int], None]],
) -> Optional[Callable[[int], None]]:
"""
Register a callback for asynchronous handling of a given signal.
:param signum: number of the signal to be handled
:param handler: callback taking a signal number
as its sole argument, or None
:return: previous handler if any, otherwise None
"""
signum = signal.Signals(signum)
if handler is not None:
if not callable(handler):
raise ValueError('signal handler must be a callable')
old_handler = self.__handlers.get(signum, None)
self.__handlers[signum] = handler
else:
old_handler = self.__handlers.pop(signum, None)
return old_handler