structio/structio/abc.py

951 lines
24 KiB
Python

import io
import os
from abc import abstractmethod, ABC
from types import FrameType
import structio
from structio.core.task import Task
from structio.exceptions import StructIOException
from typing import Callable, Any, Coroutine
class Clock(ABC):
"""
Abstract base clock class
"""
@abstractmethod
def __init__(self):
pass
@abstractmethod
def start(self):
raise NotImplementedError
@abstractmethod
def setup(self):
raise NotImplementedError
@abstractmethod
def teardown(self):
raise NotImplementedError
@abstractmethod
def current_time(self):
raise NotImplementedError
@abstractmethod
def deadline(self, deadline):
raise NotImplementedError
class SchedulingPolicy(ABC):
"""
A generic scheduling policy. This is what
controls the way tasks are scheduled in the
event loop
"""
@abstractmethod
def is_scheduled(self, task: Task) -> bool:
"""
Returns whether the given task is
scheduled to run. This doesn't
necessarily mean that the task will
actually get executed, just that the
policy knows about this task
"""
raise NotImplementedError
@abstractmethod
def has_next_task(self) -> bool:
"""
Returns whether the policy has a next
candidate task to run
"""
raise NotImplementedError
@abstractmethod
def has_paused_task(self) -> bool:
"""
Returns whether the policy has any paused
tasks waiting to be rescheduled
"""
raise NotImplementedError
@abstractmethod
def peek_paused_task(self) -> Task | None:
"""
Returns the first paused task in the queue,
if there is any, but doesn't consume it
"""
raise NotImplementedError
@abstractmethod
def peek_next_task(self) -> Task | None:
"""
Returns the first task that is ready to run,
if there is any, but doesn't remove it
"""
raise NotImplementedError
@abstractmethod
def get_paused_task(self) -> Task | None:
"""
Dequeues the first paused task in the queue,
if it exists
"""
raise NotImplementedError
@abstractmethod
def schedule(self, task: Task):
"""
Schedules a task for execution
"""
raise NotImplementedError
@abstractmethod
def pause(self, task: Task):
"""
Pauses the given task
"""
raise NotImplementedError
@abstractmethod
def discard(self, task: Task):
"""
Discards the given task from the policy
"""
raise NotImplementedError
@abstractmethod
def get_next_task(self) -> Task | None:
"""
Returns the next runnable task. None
may returned if no runnable tasks are
available
"""
raise NotImplementedError
@abstractmethod
def get_closest_deadline(self) -> Any:
"""
Returns the closest deadline to be satisfied
"""
raise NotImplementedError
class AsyncResource(ABC):
"""
A generic asynchronous resource which needs to
be closed properly, possibly blocking. Can be
used as a context manager (note that only the
__aexit__ method actually blocks!)
"""
async def __aenter__(self):
return self
@abstractmethod
async def close(self):
raise NotImplementedError
async def __aexit__(self, exc_type, exc_val, exc_tb):
await self.close()
class WritableStream(AsyncResource, ABC):
"""
Interface for writing binary data to
a byte stream
"""
@abstractmethod
async def write(self, _data):
raise NotImplementedError
class ReadableStream(AsyncResource, ABC):
"""
Interface for reading binary data from
a byte stream. The stream implements the
asynchronous iterator protocol and can
therefore be used with "async for" loops
"""
@abstractmethod
async def _read(self, _size: int = -1):
raise NotImplementedError
class Stream(ReadableStream, WritableStream, ABC):
"""
A generic, asynchronous, readable/writable binary stream
"""
def __init__(self, f):
if isinstance(f, io.TextIOBase):
raise TypeError("only binary files can be streamed")
self.fileobj = f
self.buf = bytearray()
os.set_blocking(self.fileobj.fileno(), False)
@abstractmethod
async def flush(self):
"""
Flushes the underlying resource asynchronously
"""
raise NotImplementedError
class WriteCloseableStream(Stream, ABC):
"""
Extension to the Stream class that allows
shutting down the write end of the stream
without closing the read side on our end
nor the read/write side on the other one
"""
@abstractmethod
async def eof(self):
"""
Send an end-of-file on this stream, if possible.
The resource can still be read from (and the
other end can still read/write to it), but no more
data can be written after an EOF has been sent. If an
EOF has already been sent, this method is a no-op
"""
class ReadableChannel(AsyncResource, ABC):
"""
Interface for reading data from a
channel
"""
@abstractmethod
async def receive(self):
"""
Receive an object from the channel,
possibly blocking
"""
raise NotImplementedError
def __aiter__(self):
"""
Implements asynchronous iteration
"""
return self
async def __anext__(self):
"""
Implements asynchronous iteration
"""
try:
return await self.receive()
except structio.ResourceClosed:
raise StopAsyncIteration()
@abstractmethod
def pending(self):
"""
Returns if there is any data waiting
to be read
"""
@abstractmethod
def readers(self):
"""
Returns how many tasks are waiting to
read from the channel
"""
class WritableChannel(AsyncResource, ABC):
"""
Interface for writing data to a
channel
"""
@abstractmethod
async def send(self, value):
"""
Send the given object on the channel,
possibly blocking
"""
raise NotImplementedError
@abstractmethod
def writers(self):
"""
Returns how many tasks are waiting
to write to the channel
"""
class Channel(WritableChannel, ReadableChannel, ABC):
"""
A generic, two-way channel
"""
class Debugger(ABC):
"""
The base for all debugger objects
"""
def on_start(self):
"""
This method is called when the event
loop starts executing
"""
return NotImplemented
def on_exit(self):
"""
This method is called when the event
loop exits entirely (all tasks completed)
"""
return NotImplemented
def on_task_spawn(self, task: Task):
"""
This method is called when a new task is
spawned
:param task: The Task that was spawned
:type task: :class: structio.objects.Task
"""
return NotImplemented
def on_task_exit(self, task: Task):
"""
This method is called when a task exits
:param task: The Task that exited
:type task: :class: structio.objects.Task
"""
return NotImplemented
def before_task_step(self, task: Task):
"""
This method is called right before
calling a task's run() method
:param task: The Task that is about to run
:type task: :class: structio.objects.Task
"""
return NotImplemented
def after_task_step(self, task: Task):
"""
This method is called right after
calling a task's run() method
:param task: The Task that has run
:type task: :class: structio.objects.Task
"""
return NotImplemented
def before_sleep(self, task: Task, seconds: float):
"""
This method is called before a task goes
to sleep
:param task: The Task that is about to sleep
:type task: :class: structio.objects.Task
:param seconds: The amount of seconds the
task wants to sleep for
:type seconds: int
"""
return NotImplemented
def after_sleep(self, task: Task, seconds: float):
"""
This method is called after a tasks
awakes from sleeping
:param task: The Task that has just slept
:type task: :class: structio.objects.Task
:param seconds: The amount of seconds the
task slept for
:type seconds: float
"""
return NotImplemented
def before_io(self, timeout: float):
"""
This method is called right before
the event loop checks for I/O events
:param timeout: The max. amount of seconds
that the loop will hang for while waiting
for I/O events
:type timeout: float
"""
return NotImplemented
def after_io(self, timeout: float):
"""
This method is called right after
the event loop has checked for I/O events
:param timeout: The actual amount of seconds
that the loop has hung for while waiting
for I/O events
:type timeout: float
"""
return NotImplemented
def before_cancel(self, task: Task):
"""
This method is called right before a task
gets cancelled
:param task: The Task that is about to be cancelled
:type task: :class: structio.objects.Task
"""
return NotImplemented
def after_cancel(self, task: Task) -> object:
"""
This method is called right after a task
gets successfully cancelled
:param task: The Task that was cancelled
:type task: :class: structio.objects.Task
"""
return NotImplemented
def on_exception_raised(self, task: Task, exc: BaseException):
"""
This method is called right after a task
has raised an exception
:param task: The Task that raised the error
:type task: :class: structio.objects.Task
:param exc: The exception that was raised
:type exc: BaseException
"""
return NotImplemented
class IOManager(ABC):
"""
Base class for all I/O managers
"""
@abstractmethod
def wait_io(self):
"""
Waits for I/O and reschedules tasks
when data is ready to be read/written
"""
raise NotImplementedError
@abstractmethod
def request_read(self, rsc, task: Task):
"""
"Requests" a read operation on the given
resource to the I/O manager from the given
task
"""
raise NotImplementedError
@abstractmethod
def request_write(self, rsc, task: Task):
"""
"Requests" a write operation on the given
resource to the I/O manager from the given
task
"""
raise NotImplementedError
@abstractmethod
def pending(self):
"""
Returns whether there's any tasks waiting
to read from/write to a resource registered
in the manager
"""
raise NotImplementedError
@abstractmethod
def release(self, resource):
"""
Releases the given async resource from the
manager. Note that the resource is *not*
closed!
"""
raise NotImplementedError
@abstractmethod
def release_task(self, task: Task):
"""
Releases ownership of the given
resource from the given task. Note
that if the resource is being used by
other tasks that this method will
not unschedule it for those as well
"""
raise NotImplementedError
@abstractmethod
def get_reader(self, rsc):
"""
Returns the task reading from the given
resource, if any (None otherwise)
"""
raise NotImplementedError
@abstractmethod
def get_writer(self, rsc):
"""
Returns the task writing to the given
resource, if any (None otherwise)
"""
raise NotImplementedError
@abstractmethod
def get_readers(self) -> tuple["structio.io.FdWrapper", Task]:
"""
Returns all I/O resources currently watched
by the manager for read events
"""
raise NotImplementedError
@abstractmethod
def get_writers(self) -> tuple["structio.io.FdWrapper", Task]:
"""
Returns all I/O resources currently watched
by the manager for write events
"""
raise NotImplementedError
@abstractmethod
def close(self):
"""
Close the I/O manager, forbidding any
further scheduling of resources. Existing
resources are unscheduled internally
"""
class SignalManager(ABC):
"""
A signal manager
"""
@abstractmethod
def install(self):
"""
Installs the signal handler
"""
raise NotImplementedError
@abstractmethod
def uninstall(self):
"""
Uninstalls the signal handler
"""
raise NotImplementedError
class Kernel(ABC):
"""
Abstract kernel base class
"""
def __init__(
self,
policy: SchedulingPolicy,
clock: Clock,
io_manager: IOManager,
signal_managers: list[SignalManager],
tools: list[Debugger] | None = None,
restrict_ki_to_checkpoints: bool = False,
):
if not issubclass(clock.__class__, Clock):
raise TypeError(
f"clock must be a subclass of {Clock.__module__}.{Clock.__qualname__}, not {type(clock)}"
)
if not issubclass(policy.__class__, SchedulingPolicy):
raise TypeError(
f"policy must be a subclass of {SchedulingPolicy.__module__}.{SchedulingPolicy.__qualname__}, not {type(policy)}"
)
if not issubclass(io_manager.__class__, IOManager):
raise TypeError(
f"io_manager must be a subclass of {IOManager.__module__}.{IOManager.__qualname__}, not {type(io_manager)}"
)
for tool in tools or []:
if not issubclass(tool.__class__, Debugger):
raise TypeError(
f"tools must be a subclass of {Debugger.__module__}.{Debugger.__qualname__}, not {type(tool)}"
)
for mgr in signal_managers or []:
if not issubclass(mgr.__class__, SignalManager):
raise TypeError(
f"signal manager must be a subclass of {SignalManager.__module__}.{SignalManager.__qualname__}, not {type(mgr)}"
)
self.clock = clock
self.current_task: Task | None = None
self.current_pool: "structio.TaskPool" = None # noqa
self.current_scope: structio.TaskScope = None # noqa
self.tools: list[Debugger] = tools or []
self.restrict_ki_to_checkpoints: bool = restrict_ki_to_checkpoints
self.io_manager = io_manager
self.signal_managers = signal_managers
self.entry_point: Task | None = None
self.policy = policy
# Pool for system tasks
self.pool: "structio.TaskPool" = None # noqa
def get_system_pool(self) -> "structio.TaskPool":
"""
Returns the kernel's "system" pool, where tasks
spawned via spawn_system_task() as well as the
entry point are implicitly run into. This is meant
to be used as an internal method for structio's
scheduling policy implementations
"""
if self.pool is None:
raise StructIOException("broken state: system pool is None")
self.pool: "structio.TaskPool"
return self.pool
@abstractmethod
def wait_readable(self, resource: AsyncResource):
"""
Schedule the given resource for reading from
the current task
"""
raise NotImplementedError
@abstractmethod
def wait_writable(self, resource: AsyncResource):
"""
Schedule the given resource for reading from
the current task
"""
raise NotImplementedError
@abstractmethod
def release_resource(self, resource: AsyncResource):
"""
Releases the given resource from the scheduler
"""
raise NotImplementedError
@abstractmethod
def notify_closing(
self, resource: AsyncResource, broken: bool = False, owner: Task | None = None
):
"""
Notifies the event loop that a given resource
is about to be closed and can be unscheduled
"""
raise NotImplementedError
@abstractmethod
def cancel_task(self, task: Task):
"""
Cancels the given task individually
"""
raise NotImplementedError
@abstractmethod
def signal_notify(self, sig: int, frame: FrameType):
"""
Notifies the event loop that a signal was
received
"""
raise NotImplementedError
@abstractmethod
def spawn(self, func: Callable[[Any, Any], Coroutine[Any, Any, Any]], *args,
ki_protected: bool = False,
pool: "structio.TaskPool" = None,
system_task: bool = False,
entry_point: bool = False) -> Task:
"""
Readies a task for execution. All positional arguments are passed
to the given coroutine (for keyword arguments, use `functools.partial`)
"""
raise NotImplementedError
@abstractmethod
def spawn_system_task(
self, func: Callable[[Any, Any], Coroutine[Any, Any, Any]], *args
) -> Task:
"""
Spawns a system task. System tasks run in a special internal
task pool and begin execution in a scope with Ctrl+C protection
enabled. Please note that if a system tasks raises an exception,
all tasks are cancelled and a StructIOException is propagated into the
loop's entry point. System tasks are guaranteed to always run at least
one task step regardless of the state of the entry point and are cancelled
automatically when the entry point exits (unless a shielded TaskScope is
used
"""
raise NotImplementedError
@abstractmethod
def get_closest_deadline(self) -> Any:
"""
Returns the closest deadline to be satisfied
"""
raise NotImplementedError
@abstractmethod
def setup(self):
"""
This method is called right before startup and can
be used by implementors to perform extra setup before
starting the event loop
"""
@abstractmethod
def teardown(self):
"""
This method is called right before exiting, even
if an error occurred, and can be used by implementors
to perform extra cleanup before terminating the event loop
"""
@abstractmethod
def throw(self, task: Task, err: BaseException):
"""
Throws the given exception into the given
task
"""
raise NotImplementedError
@abstractmethod
def reschedule(self, task: Task):
"""
Reschedules the given task for further
execution
"""
raise NotImplementedError
@abstractmethod
def event(self, evt_name, *args):
"""
Fires the specified event for every registered tool
in the event loop
"""
raise NotImplementedError
@abstractmethod
def run(self):
"""
This is the actual "loop" part
of the "event loop"
"""
raise NotImplementedError
@abstractmethod
def sleep(self, amount):
"""
Puts the current task to sleep for the given amount of
time as defined by our current clock
"""
raise NotImplementedError
@abstractmethod
def suspend(self):
"""
Suspends the current task until it is rescheduled
"""
raise NotImplementedError
@abstractmethod
def init_scope(self, scope):
"""
Initializes the given task scope (called by
TaskScope.__enter__)
"""
raise NotImplementedError
@abstractmethod
def close_scope(self, scope):
"""
Closes the given task scope (called by
TaskScope.__exit__)
"""
raise NotImplementedError
@abstractmethod
def init_pool(self, pool):
"""
Initializes the given task pool (called by
TaskPool.__aenter__)
"""
raise NotImplementedError
@abstractmethod
def close_pool(self, pool):
"""
Closes the given task pool (called by
TaskPool.__aexit__)
"""
raise NotImplementedError
@abstractmethod
def cancel_scope(self, scope):
"""
Cancels the given scope
"""
raise NotImplementedError
def start(self, entry_point: Callable[[Any, Any], Coroutine[Any, Any, Any]], *args):
"""
Starts the event loop from a synchronous entry
point. This method only returns once execution
has finished. Normally, this method doesn't need
to be overridden: consider using setup() and teardown()
if you need to do some operations before startup/teardown
"""
self.setup()
self.event("on_start")
self.current_pool = self.pool
self.entry_point = self.spawn(entry_point, *args, entry_point=True)
assert not self.entry_point.is_system_task
self.current_pool.scope.owner = self.entry_point
self.entry_point.pool = self.current_pool
self.current_pool.entry_point = self.entry_point
self.current_scope = self.current_pool.scope
try:
self.run()
finally:
self.teardown()
self.close(force=True)
if self.entry_point.exc:
raise self.entry_point.exc
self.event("on_exit")
return self.entry_point.result
@abstractmethod
def raise_ki(self, task: Task | None = None):
"""
Raises a KeyboardInterrupt exception into a
task: If one is passed explicitly, the exception
is thrown there, otherwise a suitable task is
awakened and thrown into
"""
raise NotImplementedError
@abstractmethod
def done(self):
"""
Returns whether the loop has work to do
"""
raise NotImplementedError
def close(self, force: bool = False):
"""
Terminates and shuts down the event loop.
This method is meant to be extended (*not*
overridden!) by other implementations to do
their own cleanup
:param force: When force equals False,
the default, and the event loop is
not done executing, this function raises a
StructIOException. If True, implementors
should cancel all tasks and shut down the
event loop
"""
if not self.done() and not force:
raise StructIOException("the event loop is running")
@abstractmethod
def add_shutdown_task(
self, func: Callable[[Any, Any], Coroutine[Any, Any, Any]], *args
) -> Any:
"""
Registers a task to be run right before the event loop shuts
down. The task is spawned as a system task when (and if) the main
task exits cleanly. Note that shutdown tasks are started all
at once in no particular order, so if you need them to do so in a
deterministic way, the burden of synchronizing them is on you (fortunately,
structio's synchronization primitives make that rather easy). Returns a
unique identifier that can be used to unregister the shutdown task
"""
raise NotImplementedError
@abstractmethod
def remove_shutdown_task(self, ident: Any) -> bool:
"""
Unregisters a previously registered shutdown task.
Returns whether a task was actually removed
"""
raise NotImplementedError