951 lines
24 KiB
Python
951 lines
24 KiB
Python
import io
|
|
import os
|
|
from abc import abstractmethod, ABC
|
|
from types import FrameType
|
|
|
|
import structio
|
|
from structio.core.task import Task
|
|
from structio.exceptions import StructIOException
|
|
from typing import Callable, Any, Coroutine
|
|
|
|
|
|
class Clock(ABC):
|
|
"""
|
|
Abstract base clock class
|
|
"""
|
|
|
|
@abstractmethod
|
|
def __init__(self):
|
|
pass
|
|
|
|
@abstractmethod
|
|
def start(self):
|
|
raise NotImplementedError
|
|
|
|
@abstractmethod
|
|
def setup(self):
|
|
raise NotImplementedError
|
|
|
|
@abstractmethod
|
|
def teardown(self):
|
|
raise NotImplementedError
|
|
|
|
@abstractmethod
|
|
def current_time(self):
|
|
raise NotImplementedError
|
|
|
|
@abstractmethod
|
|
def deadline(self, deadline):
|
|
raise NotImplementedError
|
|
|
|
|
|
class SchedulingPolicy(ABC):
|
|
"""
|
|
A generic scheduling policy. This is what
|
|
controls the way tasks are scheduled in the
|
|
event loop
|
|
"""
|
|
|
|
@abstractmethod
|
|
def is_scheduled(self, task: Task) -> bool:
|
|
"""
|
|
Returns whether the given task is
|
|
scheduled to run. This doesn't
|
|
necessarily mean that the task will
|
|
actually get executed, just that the
|
|
policy knows about this task
|
|
"""
|
|
|
|
raise NotImplementedError
|
|
|
|
@abstractmethod
|
|
def has_next_task(self) -> bool:
|
|
"""
|
|
Returns whether the policy has a next
|
|
candidate task to run
|
|
"""
|
|
|
|
raise NotImplementedError
|
|
|
|
@abstractmethod
|
|
def has_paused_task(self) -> bool:
|
|
"""
|
|
Returns whether the policy has any paused
|
|
tasks waiting to be rescheduled
|
|
"""
|
|
|
|
raise NotImplementedError
|
|
|
|
@abstractmethod
|
|
def peek_paused_task(self) -> Task | None:
|
|
"""
|
|
Returns the first paused task in the queue,
|
|
if there is any, but doesn't consume it
|
|
"""
|
|
|
|
raise NotImplementedError
|
|
|
|
@abstractmethod
|
|
def peek_next_task(self) -> Task | None:
|
|
"""
|
|
Returns the first task that is ready to run,
|
|
if there is any, but doesn't remove it
|
|
"""
|
|
|
|
raise NotImplementedError
|
|
|
|
@abstractmethod
|
|
def get_paused_task(self) -> Task | None:
|
|
"""
|
|
Dequeues the first paused task in the queue,
|
|
if it exists
|
|
"""
|
|
|
|
raise NotImplementedError
|
|
|
|
@abstractmethod
|
|
def schedule(self, task: Task):
|
|
"""
|
|
Schedules a task for execution
|
|
"""
|
|
|
|
raise NotImplementedError
|
|
|
|
@abstractmethod
|
|
def pause(self, task: Task):
|
|
"""
|
|
Pauses the given task
|
|
"""
|
|
|
|
raise NotImplementedError
|
|
|
|
@abstractmethod
|
|
def discard(self, task: Task):
|
|
"""
|
|
Discards the given task from the policy
|
|
"""
|
|
|
|
raise NotImplementedError
|
|
|
|
@abstractmethod
|
|
def get_next_task(self) -> Task | None:
|
|
"""
|
|
Returns the next runnable task. None
|
|
may returned if no runnable tasks are
|
|
available
|
|
"""
|
|
|
|
raise NotImplementedError
|
|
|
|
@abstractmethod
|
|
def get_closest_deadline(self) -> Any:
|
|
"""
|
|
Returns the closest deadline to be satisfied
|
|
"""
|
|
|
|
raise NotImplementedError
|
|
|
|
|
|
class AsyncResource(ABC):
|
|
"""
|
|
A generic asynchronous resource which needs to
|
|
be closed properly, possibly blocking. Can be
|
|
used as a context manager (note that only the
|
|
__aexit__ method actually blocks!)
|
|
"""
|
|
|
|
async def __aenter__(self):
|
|
return self
|
|
|
|
@abstractmethod
|
|
async def close(self):
|
|
raise NotImplementedError
|
|
|
|
async def __aexit__(self, exc_type, exc_val, exc_tb):
|
|
await self.close()
|
|
|
|
|
|
class WritableStream(AsyncResource, ABC):
|
|
"""
|
|
Interface for writing binary data to
|
|
a byte stream
|
|
"""
|
|
|
|
@abstractmethod
|
|
async def write(self, _data):
|
|
raise NotImplementedError
|
|
|
|
|
|
class ReadableStream(AsyncResource, ABC):
|
|
"""
|
|
Interface for reading binary data from
|
|
a byte stream. The stream implements the
|
|
asynchronous iterator protocol and can
|
|
therefore be used with "async for" loops
|
|
"""
|
|
|
|
@abstractmethod
|
|
async def _read(self, _size: int = -1):
|
|
raise NotImplementedError
|
|
|
|
|
|
class Stream(ReadableStream, WritableStream, ABC):
|
|
"""
|
|
A generic, asynchronous, readable/writable binary stream
|
|
"""
|
|
|
|
def __init__(self, f):
|
|
if isinstance(f, io.TextIOBase):
|
|
raise TypeError("only binary files can be streamed")
|
|
self.fileobj = f
|
|
self.buf = bytearray()
|
|
os.set_blocking(self.fileobj.fileno(), False)
|
|
|
|
@abstractmethod
|
|
async def flush(self):
|
|
"""
|
|
Flushes the underlying resource asynchronously
|
|
"""
|
|
|
|
raise NotImplementedError
|
|
|
|
|
|
class WriteCloseableStream(Stream, ABC):
|
|
"""
|
|
Extension to the Stream class that allows
|
|
shutting down the write end of the stream
|
|
without closing the read side on our end
|
|
nor the read/write side on the other one
|
|
"""
|
|
|
|
@abstractmethod
|
|
async def eof(self):
|
|
"""
|
|
Send an end-of-file on this stream, if possible.
|
|
The resource can still be read from (and the
|
|
other end can still read/write to it), but no more
|
|
data can be written after an EOF has been sent. If an
|
|
EOF has already been sent, this method is a no-op
|
|
"""
|
|
|
|
|
|
class ReadableChannel(AsyncResource, ABC):
|
|
"""
|
|
Interface for reading data from a
|
|
channel
|
|
"""
|
|
|
|
@abstractmethod
|
|
async def receive(self):
|
|
"""
|
|
Receive an object from the channel,
|
|
possibly blocking
|
|
"""
|
|
|
|
raise NotImplementedError
|
|
|
|
def __aiter__(self):
|
|
"""
|
|
Implements asynchronous iteration
|
|
"""
|
|
return self
|
|
|
|
async def __anext__(self):
|
|
"""
|
|
Implements asynchronous iteration
|
|
"""
|
|
|
|
try:
|
|
return await self.receive()
|
|
except structio.ResourceClosed:
|
|
raise StopAsyncIteration()
|
|
|
|
@abstractmethod
|
|
def pending(self):
|
|
"""
|
|
Returns if there is any data waiting
|
|
to be read
|
|
"""
|
|
|
|
@abstractmethod
|
|
def readers(self):
|
|
"""
|
|
Returns how many tasks are waiting to
|
|
read from the channel
|
|
"""
|
|
|
|
|
|
class WritableChannel(AsyncResource, ABC):
|
|
"""
|
|
Interface for writing data to a
|
|
channel
|
|
"""
|
|
|
|
@abstractmethod
|
|
async def send(self, value):
|
|
"""
|
|
Send the given object on the channel,
|
|
possibly blocking
|
|
"""
|
|
|
|
raise NotImplementedError
|
|
|
|
@abstractmethod
|
|
def writers(self):
|
|
"""
|
|
Returns how many tasks are waiting
|
|
to write to the channel
|
|
"""
|
|
|
|
|
|
class Channel(WritableChannel, ReadableChannel, ABC):
|
|
"""
|
|
A generic, two-way channel
|
|
"""
|
|
|
|
|
|
class Debugger(ABC):
|
|
"""
|
|
The base for all debugger objects
|
|
"""
|
|
|
|
def on_start(self):
|
|
"""
|
|
This method is called when the event
|
|
loop starts executing
|
|
"""
|
|
|
|
return NotImplemented
|
|
|
|
def on_exit(self):
|
|
"""
|
|
This method is called when the event
|
|
loop exits entirely (all tasks completed)
|
|
"""
|
|
|
|
return NotImplemented
|
|
|
|
def on_task_spawn(self, task: Task):
|
|
"""
|
|
This method is called when a new task is
|
|
spawned
|
|
|
|
:param task: The Task that was spawned
|
|
:type task: :class: structio.objects.Task
|
|
"""
|
|
|
|
return NotImplemented
|
|
|
|
def on_task_exit(self, task: Task):
|
|
"""
|
|
This method is called when a task exits
|
|
|
|
:param task: The Task that exited
|
|
:type task: :class: structio.objects.Task
|
|
"""
|
|
|
|
return NotImplemented
|
|
|
|
def before_task_step(self, task: Task):
|
|
"""
|
|
This method is called right before
|
|
calling a task's run() method
|
|
|
|
:param task: The Task that is about to run
|
|
:type task: :class: structio.objects.Task
|
|
"""
|
|
|
|
return NotImplemented
|
|
|
|
def after_task_step(self, task: Task):
|
|
"""
|
|
This method is called right after
|
|
calling a task's run() method
|
|
|
|
:param task: The Task that has run
|
|
:type task: :class: structio.objects.Task
|
|
"""
|
|
|
|
return NotImplemented
|
|
|
|
def before_sleep(self, task: Task, seconds: float):
|
|
"""
|
|
This method is called before a task goes
|
|
to sleep
|
|
|
|
:param task: The Task that is about to sleep
|
|
:type task: :class: structio.objects.Task
|
|
:param seconds: The amount of seconds the
|
|
task wants to sleep for
|
|
:type seconds: int
|
|
"""
|
|
|
|
return NotImplemented
|
|
|
|
def after_sleep(self, task: Task, seconds: float):
|
|
"""
|
|
This method is called after a tasks
|
|
awakes from sleeping
|
|
|
|
:param task: The Task that has just slept
|
|
:type task: :class: structio.objects.Task
|
|
:param seconds: The amount of seconds the
|
|
task slept for
|
|
:type seconds: float
|
|
"""
|
|
|
|
return NotImplemented
|
|
|
|
def before_io(self, timeout: float):
|
|
"""
|
|
This method is called right before
|
|
the event loop checks for I/O events
|
|
|
|
:param timeout: The max. amount of seconds
|
|
that the loop will hang for while waiting
|
|
for I/O events
|
|
:type timeout: float
|
|
"""
|
|
|
|
return NotImplemented
|
|
|
|
def after_io(self, timeout: float):
|
|
"""
|
|
This method is called right after
|
|
the event loop has checked for I/O events
|
|
|
|
:param timeout: The actual amount of seconds
|
|
that the loop has hung for while waiting
|
|
for I/O events
|
|
:type timeout: float
|
|
"""
|
|
|
|
return NotImplemented
|
|
|
|
def before_cancel(self, task: Task):
|
|
"""
|
|
This method is called right before a task
|
|
gets cancelled
|
|
|
|
:param task: The Task that is about to be cancelled
|
|
:type task: :class: structio.objects.Task
|
|
"""
|
|
|
|
return NotImplemented
|
|
|
|
def after_cancel(self, task: Task) -> object:
|
|
"""
|
|
This method is called right after a task
|
|
gets successfully cancelled
|
|
|
|
:param task: The Task that was cancelled
|
|
:type task: :class: structio.objects.Task
|
|
"""
|
|
|
|
return NotImplemented
|
|
|
|
def on_exception_raised(self, task: Task, exc: BaseException):
|
|
"""
|
|
This method is called right after a task
|
|
has raised an exception
|
|
|
|
:param task: The Task that raised the error
|
|
:type task: :class: structio.objects.Task
|
|
:param exc: The exception that was raised
|
|
:type exc: BaseException
|
|
"""
|
|
|
|
return NotImplemented
|
|
|
|
|
|
class IOManager(ABC):
|
|
"""
|
|
Base class for all I/O managers
|
|
"""
|
|
|
|
@abstractmethod
|
|
def wait_io(self):
|
|
"""
|
|
Waits for I/O and reschedules tasks
|
|
when data is ready to be read/written
|
|
"""
|
|
|
|
raise NotImplementedError
|
|
|
|
@abstractmethod
|
|
def request_read(self, rsc, task: Task):
|
|
"""
|
|
"Requests" a read operation on the given
|
|
resource to the I/O manager from the given
|
|
task
|
|
"""
|
|
|
|
raise NotImplementedError
|
|
|
|
@abstractmethod
|
|
def request_write(self, rsc, task: Task):
|
|
"""
|
|
"Requests" a write operation on the given
|
|
resource to the I/O manager from the given
|
|
task
|
|
"""
|
|
|
|
raise NotImplementedError
|
|
|
|
@abstractmethod
|
|
def pending(self):
|
|
"""
|
|
Returns whether there's any tasks waiting
|
|
to read from/write to a resource registered
|
|
in the manager
|
|
"""
|
|
|
|
raise NotImplementedError
|
|
|
|
@abstractmethod
|
|
def release(self, resource):
|
|
"""
|
|
Releases the given async resource from the
|
|
manager. Note that the resource is *not*
|
|
closed!
|
|
"""
|
|
|
|
raise NotImplementedError
|
|
|
|
@abstractmethod
|
|
def release_task(self, task: Task):
|
|
"""
|
|
Releases ownership of the given
|
|
resource from the given task. Note
|
|
that if the resource is being used by
|
|
other tasks that this method will
|
|
not unschedule it for those as well
|
|
"""
|
|
|
|
raise NotImplementedError
|
|
|
|
@abstractmethod
|
|
def get_reader(self, rsc):
|
|
"""
|
|
Returns the task reading from the given
|
|
resource, if any (None otherwise)
|
|
"""
|
|
|
|
raise NotImplementedError
|
|
|
|
@abstractmethod
|
|
def get_writer(self, rsc):
|
|
"""
|
|
Returns the task writing to the given
|
|
resource, if any (None otherwise)
|
|
"""
|
|
|
|
raise NotImplementedError
|
|
|
|
@abstractmethod
|
|
def get_readers(self) -> tuple["structio.io.FdWrapper", Task]:
|
|
"""
|
|
Returns all I/O resources currently watched
|
|
by the manager for read events
|
|
"""
|
|
|
|
raise NotImplementedError
|
|
|
|
@abstractmethod
|
|
def get_writers(self) -> tuple["structio.io.FdWrapper", Task]:
|
|
"""
|
|
Returns all I/O resources currently watched
|
|
by the manager for write events
|
|
"""
|
|
|
|
raise NotImplementedError
|
|
|
|
@abstractmethod
|
|
def close(self):
|
|
"""
|
|
Close the I/O manager, forbidding any
|
|
further scheduling of resources. Existing
|
|
resources are unscheduled internally
|
|
"""
|
|
|
|
|
|
class SignalManager(ABC):
|
|
"""
|
|
A signal manager
|
|
"""
|
|
|
|
@abstractmethod
|
|
def install(self):
|
|
"""
|
|
Installs the signal handler
|
|
"""
|
|
|
|
raise NotImplementedError
|
|
|
|
@abstractmethod
|
|
def uninstall(self):
|
|
"""
|
|
Uninstalls the signal handler
|
|
"""
|
|
|
|
raise NotImplementedError
|
|
|
|
|
|
class Kernel(ABC):
|
|
"""
|
|
Abstract kernel base class
|
|
"""
|
|
|
|
def __init__(
|
|
self,
|
|
policy: SchedulingPolicy,
|
|
clock: Clock,
|
|
io_manager: IOManager,
|
|
signal_managers: list[SignalManager],
|
|
tools: list[Debugger] | None = None,
|
|
restrict_ki_to_checkpoints: bool = False,
|
|
):
|
|
if not issubclass(clock.__class__, Clock):
|
|
raise TypeError(
|
|
f"clock must be a subclass of {Clock.__module__}.{Clock.__qualname__}, not {type(clock)}"
|
|
)
|
|
if not issubclass(policy.__class__, SchedulingPolicy):
|
|
raise TypeError(
|
|
f"policy must be a subclass of {SchedulingPolicy.__module__}.{SchedulingPolicy.__qualname__}, not {type(policy)}"
|
|
)
|
|
if not issubclass(io_manager.__class__, IOManager):
|
|
raise TypeError(
|
|
f"io_manager must be a subclass of {IOManager.__module__}.{IOManager.__qualname__}, not {type(io_manager)}"
|
|
)
|
|
for tool in tools or []:
|
|
if not issubclass(tool.__class__, Debugger):
|
|
raise TypeError(
|
|
f"tools must be a subclass of {Debugger.__module__}.{Debugger.__qualname__}, not {type(tool)}"
|
|
)
|
|
for mgr in signal_managers or []:
|
|
if not issubclass(mgr.__class__, SignalManager):
|
|
raise TypeError(
|
|
f"signal manager must be a subclass of {SignalManager.__module__}.{SignalManager.__qualname__}, not {type(mgr)}"
|
|
)
|
|
self.clock = clock
|
|
self.current_task: Task | None = None
|
|
self.current_pool: "structio.TaskPool" = None # noqa
|
|
self.current_scope: structio.TaskScope = None # noqa
|
|
self.tools: list[Debugger] = tools or []
|
|
self.restrict_ki_to_checkpoints: bool = restrict_ki_to_checkpoints
|
|
self.io_manager = io_manager
|
|
self.signal_managers = signal_managers
|
|
self.entry_point: Task | None = None
|
|
self.policy = policy
|
|
# Pool for system tasks
|
|
self.pool: "structio.TaskPool" = None # noqa
|
|
|
|
def get_system_pool(self) -> "structio.TaskPool":
|
|
"""
|
|
Returns the kernel's "system" pool, where tasks
|
|
spawned via spawn_system_task() as well as the
|
|
entry point are implicitly run into. This is meant
|
|
to be used as an internal method for structio's
|
|
scheduling policy implementations
|
|
"""
|
|
|
|
if self.pool is None:
|
|
raise StructIOException("broken state: system pool is None")
|
|
self.pool: "structio.TaskPool"
|
|
return self.pool
|
|
|
|
@abstractmethod
|
|
def wait_readable(self, resource: AsyncResource):
|
|
"""
|
|
Schedule the given resource for reading from
|
|
the current task
|
|
"""
|
|
|
|
raise NotImplementedError
|
|
|
|
@abstractmethod
|
|
def wait_writable(self, resource: AsyncResource):
|
|
"""
|
|
Schedule the given resource for reading from
|
|
the current task
|
|
"""
|
|
|
|
raise NotImplementedError
|
|
|
|
@abstractmethod
|
|
def release_resource(self, resource: AsyncResource):
|
|
"""
|
|
Releases the given resource from the scheduler
|
|
"""
|
|
|
|
raise NotImplementedError
|
|
|
|
@abstractmethod
|
|
def notify_closing(
|
|
self, resource: AsyncResource, broken: bool = False, owner: Task | None = None
|
|
):
|
|
"""
|
|
Notifies the event loop that a given resource
|
|
is about to be closed and can be unscheduled
|
|
"""
|
|
|
|
raise NotImplementedError
|
|
|
|
@abstractmethod
|
|
def cancel_task(self, task: Task):
|
|
"""
|
|
Cancels the given task individually
|
|
"""
|
|
|
|
raise NotImplementedError
|
|
|
|
@abstractmethod
|
|
def signal_notify(self, sig: int, frame: FrameType):
|
|
"""
|
|
Notifies the event loop that a signal was
|
|
received
|
|
"""
|
|
|
|
raise NotImplementedError
|
|
|
|
@abstractmethod
|
|
def spawn(self, func: Callable[[Any, Any], Coroutine[Any, Any, Any]], *args,
|
|
ki_protected: bool = False,
|
|
pool: "structio.TaskPool" = None,
|
|
system_task: bool = False,
|
|
entry_point: bool = False) -> Task:
|
|
"""
|
|
Readies a task for execution. All positional arguments are passed
|
|
to the given coroutine (for keyword arguments, use `functools.partial`)
|
|
"""
|
|
|
|
raise NotImplementedError
|
|
|
|
@abstractmethod
|
|
def spawn_system_task(
|
|
self, func: Callable[[Any, Any], Coroutine[Any, Any, Any]], *args
|
|
) -> Task:
|
|
"""
|
|
Spawns a system task. System tasks run in a special internal
|
|
task pool and begin execution in a scope with Ctrl+C protection
|
|
enabled. Please note that if a system tasks raises an exception,
|
|
all tasks are cancelled and a StructIOException is propagated into the
|
|
loop's entry point. System tasks are guaranteed to always run at least
|
|
one task step regardless of the state of the entry point and are cancelled
|
|
automatically when the entry point exits (unless a shielded TaskScope is
|
|
used
|
|
"""
|
|
|
|
raise NotImplementedError
|
|
|
|
@abstractmethod
|
|
def get_closest_deadline(self) -> Any:
|
|
"""
|
|
Returns the closest deadline to be satisfied
|
|
"""
|
|
|
|
raise NotImplementedError
|
|
|
|
@abstractmethod
|
|
def setup(self):
|
|
"""
|
|
This method is called right before startup and can
|
|
be used by implementors to perform extra setup before
|
|
starting the event loop
|
|
"""
|
|
|
|
@abstractmethod
|
|
def teardown(self):
|
|
"""
|
|
This method is called right before exiting, even
|
|
if an error occurred, and can be used by implementors
|
|
to perform extra cleanup before terminating the event loop
|
|
"""
|
|
|
|
@abstractmethod
|
|
def throw(self, task: Task, err: BaseException):
|
|
"""
|
|
Throws the given exception into the given
|
|
task
|
|
"""
|
|
|
|
raise NotImplementedError
|
|
|
|
@abstractmethod
|
|
def reschedule(self, task: Task):
|
|
"""
|
|
Reschedules the given task for further
|
|
execution
|
|
"""
|
|
|
|
raise NotImplementedError
|
|
|
|
@abstractmethod
|
|
def event(self, evt_name, *args):
|
|
"""
|
|
Fires the specified event for every registered tool
|
|
in the event loop
|
|
"""
|
|
|
|
raise NotImplementedError
|
|
|
|
@abstractmethod
|
|
def run(self):
|
|
"""
|
|
This is the actual "loop" part
|
|
of the "event loop"
|
|
"""
|
|
|
|
raise NotImplementedError
|
|
|
|
@abstractmethod
|
|
def sleep(self, amount):
|
|
"""
|
|
Puts the current task to sleep for the given amount of
|
|
time as defined by our current clock
|
|
"""
|
|
|
|
raise NotImplementedError
|
|
|
|
@abstractmethod
|
|
def suspend(self):
|
|
"""
|
|
Suspends the current task until it is rescheduled
|
|
"""
|
|
|
|
raise NotImplementedError
|
|
|
|
@abstractmethod
|
|
def init_scope(self, scope):
|
|
"""
|
|
Initializes the given task scope (called by
|
|
TaskScope.__enter__)
|
|
"""
|
|
|
|
raise NotImplementedError
|
|
|
|
@abstractmethod
|
|
def close_scope(self, scope):
|
|
"""
|
|
Closes the given task scope (called by
|
|
TaskScope.__exit__)
|
|
"""
|
|
|
|
raise NotImplementedError
|
|
|
|
@abstractmethod
|
|
def init_pool(self, pool):
|
|
"""
|
|
Initializes the given task pool (called by
|
|
TaskPool.__aenter__)
|
|
"""
|
|
|
|
raise NotImplementedError
|
|
|
|
@abstractmethod
|
|
def close_pool(self, pool):
|
|
"""
|
|
Closes the given task pool (called by
|
|
TaskPool.__aexit__)
|
|
"""
|
|
|
|
raise NotImplementedError
|
|
|
|
@abstractmethod
|
|
def cancel_scope(self, scope):
|
|
"""
|
|
Cancels the given scope
|
|
"""
|
|
|
|
raise NotImplementedError
|
|
|
|
def start(self, entry_point: Callable[[Any, Any], Coroutine[Any, Any, Any]], *args):
|
|
"""
|
|
Starts the event loop from a synchronous entry
|
|
point. This method only returns once execution
|
|
has finished. Normally, this method doesn't need
|
|
to be overridden: consider using setup() and teardown()
|
|
if you need to do some operations before startup/teardown
|
|
"""
|
|
|
|
self.setup()
|
|
self.event("on_start")
|
|
self.current_pool = self.pool
|
|
self.entry_point = self.spawn(entry_point, *args, entry_point=True)
|
|
assert not self.entry_point.is_system_task
|
|
self.current_pool.scope.owner = self.entry_point
|
|
self.entry_point.pool = self.current_pool
|
|
self.current_pool.entry_point = self.entry_point
|
|
self.current_scope = self.current_pool.scope
|
|
try:
|
|
self.run()
|
|
finally:
|
|
self.teardown()
|
|
self.close(force=True)
|
|
if self.entry_point.exc:
|
|
raise self.entry_point.exc
|
|
self.event("on_exit")
|
|
return self.entry_point.result
|
|
|
|
@abstractmethod
|
|
def raise_ki(self, task: Task | None = None):
|
|
"""
|
|
Raises a KeyboardInterrupt exception into a
|
|
task: If one is passed explicitly, the exception
|
|
is thrown there, otherwise a suitable task is
|
|
awakened and thrown into
|
|
"""
|
|
|
|
raise NotImplementedError
|
|
|
|
@abstractmethod
|
|
def done(self):
|
|
"""
|
|
Returns whether the loop has work to do
|
|
"""
|
|
|
|
raise NotImplementedError
|
|
|
|
def close(self, force: bool = False):
|
|
"""
|
|
Terminates and shuts down the event loop.
|
|
This method is meant to be extended (*not*
|
|
overridden!) by other implementations to do
|
|
their own cleanup
|
|
|
|
:param force: When force equals False,
|
|
the default, and the event loop is
|
|
not done executing, this function raises a
|
|
StructIOException. If True, implementors
|
|
should cancel all tasks and shut down the
|
|
event loop
|
|
"""
|
|
|
|
if not self.done() and not force:
|
|
raise StructIOException("the event loop is running")
|
|
|
|
@abstractmethod
|
|
def add_shutdown_task(
|
|
self, func: Callable[[Any, Any], Coroutine[Any, Any, Any]], *args
|
|
) -> Any:
|
|
"""
|
|
Registers a task to be run right before the event loop shuts
|
|
down. The task is spawned as a system task when (and if) the main
|
|
task exits cleanly. Note that shutdown tasks are started all
|
|
at once in no particular order, so if you need them to do so in a
|
|
deterministic way, the burden of synchronizing them is on you (fortunately,
|
|
structio's synchronization primitives make that rather easy). Returns a
|
|
unique identifier that can be used to unregister the shutdown task
|
|
"""
|
|
|
|
raise NotImplementedError
|
|
|
|
@abstractmethod
|
|
def remove_shutdown_task(self, ident: Any) -> bool:
|
|
"""
|
|
Unregisters a previously registered shutdown task.
|
|
Returns whether a task was actually removed
|
|
"""
|
|
|
|
raise NotImplementedError
|