94 lines
3.7 KiB
Python
94 lines
3.7 KiB
Python
# Signal handling module
|
|
import platform
|
|
import signal
|
|
from collections import defaultdict
|
|
from types import FrameType
|
|
|
|
import structio
|
|
from structio.io.socket import AsyncSocket
|
|
from typing import Callable, Any, Coroutine
|
|
from structio.thread import AsyncThreadQueue
|
|
from structio.core.run import current_loop
|
|
|
|
|
|
_sig_data = AsyncThreadQueue(float("inf"))
|
|
_sig_handlers: dict[
|
|
signal.Signals, Callable[[Any, Any], Coroutine[Any, Any, Any]] | None
|
|
] = defaultdict(lambda: None)
|
|
|
|
|
|
def _handle(sig: int, frame: FrameType):
|
|
_sig_data.put_sync((sig, frame))
|
|
|
|
|
|
def get_signal_handler(
|
|
sig: int,
|
|
) -> Callable[[Any, Any], Coroutine[Any, Any, Any]] | None:
|
|
"""
|
|
Returns the currently installed async signal handler for the
|
|
given signal or None if it is not set
|
|
"""
|
|
|
|
return _sig_handlers[signal.Signals(sig)]
|
|
|
|
|
|
def set_signal_handler(
|
|
sig: int, handler: Callable[[Any, Any], Coroutine[Any, Any, Any]]
|
|
) -> Callable[[Any, Any], Coroutine[Any, Any, Any]] | None:
|
|
"""
|
|
Sets the given coroutine to handle the given signal asynchronously. The
|
|
previous async signal handler is returned if any was set, otherwise
|
|
None is returned
|
|
"""
|
|
|
|
# Raises an appropriate error
|
|
sig = signal.Signals(sig)
|
|
illegal_signals = []
|
|
if platform.system() in {"Linux", "Darwin"}:
|
|
# Linux/MacOS
|
|
illegal_signals.append(signal.SIGKILL)
|
|
illegal_signals.append(signal.SIGSTOP)
|
|
match sig:
|
|
case sig if sig in illegal_signals:
|
|
raise ValueError(f"signal {sig!r} does not support custom handlers")
|
|
case _:
|
|
prev = _sig_handlers[sig]
|
|
signal.signal(sig, _handle)
|
|
_sig_handlers[sig] = handler
|
|
return prev
|
|
|
|
|
|
async def signal_watcher(sock: AsyncSocket):
|
|
while True:
|
|
# Even though we use set_wakeup_fd (which makes sure
|
|
# our I/O manager is signal-aware and exits cleanly
|
|
# when they arrive), it turns out that actually using the
|
|
# data Python sends over our socket is trickier than it
|
|
# sounds at first. That is because if we receive a bunch
|
|
# of signals and the socket buffer gets filled, we are going
|
|
# to lose all signals after that. Python can raise a warning
|
|
# about this, but it's 1) Not ideal, we're still losing signals,
|
|
# which is bad if we can do better and 2) It can be confusing,
|
|
# because now we're leaking details about the way signals are
|
|
# implemented, and that sucks too; So instead, we use set_wakeup_fd
|
|
# merely as a notification mechanism to wake up our watcher and
|
|
# register a custom signal handler that stores all the information
|
|
# about incoming signals in an unbuffered queue (which means that even
|
|
# if the socket's buffer gets filled, we are still going to deliver all
|
|
# signals when we do our first call to read()). I'm a little uneasy about
|
|
# using an unbounded queue, but realistically I doubt that one would face
|
|
# memory problems because their code is receiving thousands of signals and
|
|
# the event loop is not handling them fast enough (right?)
|
|
await sock.receive(1)
|
|
async for sig, frame in _sig_data:
|
|
if _sig_handlers[sig]:
|
|
try:
|
|
await _sig_handlers[sig](sig, frame)
|
|
except (Exception, KeyboardInterrupt) as e:
|
|
# We try to mimic the behavior of native signal
|
|
# handlers by propagating errors into the program's
|
|
# entry point when an exception occurs. This is far
|
|
# from ideal, but I don't honestly know what else to
|
|
# do with this exception
|
|
current_loop().throw(current_loop().entry_point, e)
|