structio/structio/signals.py

94 lines
3.7 KiB
Python

# Signal handling module
import platform
import signal
from collections import defaultdict
from types import FrameType
import structio
from structio.io.socket import AsyncSocket
from typing import Callable, Any, Coroutine
from structio.thread import AsyncThreadQueue
from structio.core.run import current_loop
_sig_data = AsyncThreadQueue(float("inf"))
_sig_handlers: dict[
signal.Signals, Callable[[Any, Any], Coroutine[Any, Any, Any]] | None
] = defaultdict(lambda: None)
def _handle(sig: int, frame: FrameType):
_sig_data.put_sync((sig, frame))
def get_signal_handler(
sig: int,
) -> Callable[[Any, Any], Coroutine[Any, Any, Any]] | None:
"""
Returns the currently installed async signal handler for the
given signal or None if it is not set
"""
return _sig_handlers[signal.Signals(sig)]
def set_signal_handler(
sig: int, handler: Callable[[Any, Any], Coroutine[Any, Any, Any]]
) -> Callable[[Any, Any], Coroutine[Any, Any, Any]] | None:
"""
Sets the given coroutine to handle the given signal asynchronously. The
previous async signal handler is returned if any was set, otherwise
None is returned
"""
# Raises an appropriate error
sig = signal.Signals(sig)
illegal_signals = []
if platform.system() in {"Linux", "Darwin"}:
# Linux/MacOS
illegal_signals.append(signal.SIGKILL)
illegal_signals.append(signal.SIGSTOP)
match sig:
case sig if sig in illegal_signals:
raise ValueError(f"signal {sig!r} does not support custom handlers")
case _:
prev = _sig_handlers[sig]
signal.signal(sig, _handle)
_sig_handlers[sig] = handler
return prev
async def signal_watcher(sock: AsyncSocket):
while True:
# Even though we use set_wakeup_fd (which makes sure
# our I/O manager is signal-aware and exits cleanly
# when they arrive), it turns out that actually using the
# data Python sends over our socket is trickier than it
# sounds at first. That is because if we receive a bunch
# of signals and the socket buffer gets filled, we are going
# to lose all signals after that. Python can raise a warning
# about this, but it's 1) Not ideal, we're still losing signals,
# which is bad if we can do better and 2) It can be confusing,
# because now we're leaking details about the way signals are
# implemented, and that sucks too; So instead, we use set_wakeup_fd
# merely as a notification mechanism to wake up our watcher and
# register a custom signal handler that stores all the information
# about incoming signals in an unbuffered queue (which means that even
# if the socket's buffer gets filled, we are still going to deliver all
# signals when we do our first call to read()). I'm a little uneasy about
# using an unbounded queue, but realistically I doubt that one would face
# memory problems because their code is receiving thousands of signals and
# the event loop is not handling them fast enough (right?)
await sock.receive(1)
async for sig, frame in _sig_data:
if _sig_handlers[sig]:
try:
await _sig_handlers[sig](sig, frame)
except (Exception, KeyboardInterrupt) as e:
# We try to mimic the behavior of native signal
# handlers by propagating errors into the program's
# entry point when an exception occurs. This is far
# from ideal, but I don't honestly know what else to
# do with this exception
current_loop().throw(current_loop().entry_point, e)