Initial work

This commit is contained in:
Mattia Giambirtone 2023-05-15 18:25:02 +02:00
parent d973aab5e7
commit 4d6baf36c4
Signed by: nocturn9x
GPG Key ID: 8270F9F467971E59
36 changed files with 2506 additions and 0 deletions

8
.idea/.gitignore vendored Normal file
View File

@ -0,0 +1,8 @@
# Default ignored files
/shelf/
/workspace.xml
# Editor-based HTTP Client requests
/httpRequests/
# Datasource local storage ignored files
/dataSources/
/dataSources.local.xml

10
.idea/StructuredIO.iml Normal file
View File

@ -0,0 +1,10 @@
<?xml version="1.0" encoding="UTF-8"?>
<module type="PYTHON_MODULE" version="4">
<component name="NewModuleRootManager">
<content url="file://$MODULE_DIR$">
<excludeFolder url="file://$MODULE_DIR$/venv" />
</content>
<orderEntry type="inheritedJdk" />
<orderEntry type="sourceFolder" forTests="false" />
</component>
</module>

View File

@ -0,0 +1,6 @@
<component name="InspectionProjectProfileManager">
<settings>
<option name="USE_PROJECT_PROFILE" value="false" />
<version value="1.0" />
</settings>
</component>

4
.idea/misc.xml Normal file
View File

@ -0,0 +1,4 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="ProjectRootManager" version="2" project-jdk-name="Python 3.10 (StructuredIO)" project-jdk-type="Python SDK" />
</project>

8
.idea/modules.xml Normal file
View File

@ -0,0 +1,8 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="ProjectModuleManager">
<modules>
<module fileurl="file://$PROJECT_DIR$/.idea/StructuredIO.iml" filepath="$PROJECT_DIR$/.idea/StructuredIO.iml" />
</modules>
</component>
</project>

23
setup.py Normal file
View File

@ -0,0 +1,23 @@
import setuptools
if __name__ == "__main__":
#with open("README.md", "r") as readme:
# long_description = readme.read()
setuptools.setup(
name="StructuredIO",
version="0.1.0",
author="Nocturn9x",
author_email="nocturn9x@nocturn9x.space",
description="Python structured concurrency framework",
#long_description=long_description,
#long_description_content_type="text/markdown",
url="https://example.com",
packages=setuptools.find_packages(),
classifiers=[
"Programming Language :: Python :: 3",
"Operating System :: OS Independent",
#"License :: OSI Approved :: Apache License 2.0",
],
python_requires=">=3.8",
)

83
structio/__init__.py Normal file
View File

@ -0,0 +1,83 @@
from structio.core import run as _run
from typing import Coroutine, Any, Callable
from structio.core.kernels.fifo import FIFOKernel
from structio.core.managers.io.simple import SimpleIOManager
from structio.core.managers.signals.sigint import SigIntManager
from structio.core.time.clock import DefaultClock
from structio.core.syscalls import sleep
from structio.core.context import TaskPool, TaskScope
from structio.core.exceptions import Cancelled
from structio.sync import Event, Queue, MemoryChannel
from structio.core.abc import Channel, Stream, ChannelReader, ChannelWriter
def run(func: Callable[[Any, Any], Coroutine[Any, Any, Any]],
*args,
restrict_ki_to_checkpoints: bool = False,
tools: list | None = None,
):
result = _run.run(func, FIFOKernel, SimpleIOManager(), [SigIntManager()], DefaultClock(), tools,
restrict_ki_to_checkpoints, *args)
return result
run.__doc__ = _run.__doc__
def create_pool() -> TaskPool:
"""
Creates a new task pool
"""
return TaskPool()
def skip_after(timeout) -> TaskScope:
"""
Creates a new task scope with the
specified timeout. No error is raised
when the timeout expires
"""
result = TaskScope()
result.timeout = timeout
result.silent = True
return result
def with_timeout(timeout) -> TaskScope:
"""
Creates a new task scope with the
specified timeout. TimeoutError is raised
when the timeout expires
"""
result = TaskScope()
result.timeout = timeout
return result
def clock():
"""
Returns the current clock time of
the event loop
"""
return _run.current_loop().clock.current_time()
__all__ = ["run",
"sleep",
"create_pool",
"clock",
"Cancelled",
"skip_after",
"with_timeout",
"Event",
"Queue",
"MemoryChannel",
"Channel",
"Stream",
"ChannelReader",
"ChannelWriter"
]

View File

703
structio/core/abc.py Normal file
View File

@ -0,0 +1,703 @@
from abc import abstractmethod, ABC
from structio.core.task import Task
from structio.core.exceptions import StructIOException
from typing import Callable, Any, Coroutine
from types import FrameType
class BaseClock(ABC):
"""
Abstract base clock class
"""
@abstractmethod
def __init__(self):
pass
@abstractmethod
def start(self):
return NotImplemented
@abstractmethod
def setup(self):
return NotImplemented
@abstractmethod
def teardown(self):
return NotImplemented
@abstractmethod
def current_time(self):
return NotImplemented
@abstractmethod
def deadline(self, deadline):
return NotImplemented
class AsyncResource(ABC):
"""
A generic asynchronous resource which needs to
be closed properly, possibly blocking. Can be
used as a context manager (note that only the
__aexit__ method actually blocks!)
"""
async def __aenter__(self):
return self
@abstractmethod
async def close(self):
return NotImplemented
async def __aexit__(self, exc_type, exc_val, exc_tb):
await self.close()
class StreamWriter(AsyncResource):
"""
Interface for writing binary data to
a byte stream
"""
@abstractmethod
async def write_all(self, data: bytes):
"""
Write the given data onto the stream,
possibly blocking
"""
return NotImplemented
@abstractmethod
async def wait_for_write(self):
"""
Wait until the underlying resource is
ready to be written on. Implementations
of this method should try their best not
to return until the underlying resource is
known to be ready, but it's not guaranteed
that a call to write_all() will not block
after calling this method
"""
return NotImplemented
class StreamReader(AsyncResource):
"""
Interface for reading binary data from
a byte stream. The stream implements the
asynchronous iterator protocol and can
therefore be used with "async for" loops
"""
@abstractmethod
async def read_some(self, max_size: int | None = None) -> bytes:
"""
Read up to max_size bytes from the underlying
resource and return it. When max_size is None,
implementors should pick a reasonable default.
Returns b"" iff the stream has reached end-of-file
"""
async def __aiter__(self):
return self
async def __anext__(self):
if not (data := await self.read_some()):
raise StopAsyncIteration()
return data
@abstractmethod
async def wait_for_read(self):
"""
Wait until the underlying resource is
ready to be read from. Implementations
of this method should try their best not
to return until the underlying resource is
known to be ready, but it's not guaranteed
that a call to read_some() will not block
after calling this method
"""
return NotImplemented
class Stream(StreamReader, StreamWriter):
"""
A generic, asynchronous, readable/writable binary stream
"""
class WriteCloseableStream(Stream):
"""
Extension to the Stream class that allows
shutting down the write end of the stream
without closing the read side on our end
nor the read/write side on the other one
"""
@abstractmethod
async def eof(self):
"""
Send an end-of-file on this stream, if possible.
The resource can still be read from (and the
other end can still read/write to it), but no more
data can be written after an EOF has been sent. If an
EOF has already been sent, this method is a no-op
"""
class ChannelReader(AsyncResource):
"""
Interface for reading data from a
channel
"""
@abstractmethod
async def receive(self):
"""
Receive an object from the chanel,
possibly blocking
"""
return NotImplemented
class ChannelWriter(AsyncResource):
"""
Interface for writing data to a
channel
"""
@abstractmethod
async def send(self, value):
"""
Send the given object on the channel,
possibly blocking
"""
return NotImplemented
class Channel(ChannelWriter, ChannelReader):
"""
A generic, two-way channel
"""
class BaseDebugger(ABC):
"""
The base for all debugger objects
"""
def on_start(self):
"""
This method is called when the event
loop starts executing
"""
return NotImplemented
def on_exit(self):
"""
This method is called when the event
loop exits entirely (all tasks completed)
"""
return NotImplemented
def on_task_schedule(self, task: Task, delay: float):
"""
This method is called when a new task is
scheduled (not spawned)
:param task: The Task that was (re)scheduled
:type task: :class: structio.objects.Task
:param delay: The delay, in seconds, after which
the task will start executing
:type delay: float
"""
return NotImplemented
def on_task_spawn(self, task: Task):
"""
This method is called when a new task is
spawned
:param task: The Task that was spawned
:type task: :class: structio.objects.Task
"""
return NotImplemented
def on_task_exit(self, task: Task):
"""
This method is called when a task exits
:param task: The Task that exited
:type task: :class: structio.objects.Task
"""
return NotImplemented
def before_task_step(self, task: Task):
"""
This method is called right before
calling a task's run() method
:param task: The Task that is about to run
:type task: :class: structio.objects.Task
"""
return NotImplemented
def after_task_step(self, task: Task):
"""
This method is called right after
calling a task's run() method
:param task: The Task that has ran
:type task: :class: structio.objects.Task
"""
return NotImplemented
def before_sleep(self, task: Task, seconds: float):
"""
This method is called before a task goes
to sleep
:param task: The Task that is about to sleep
:type task: :class: structio.objects.Task
:param seconds: The amount of seconds the
task wants to sleep for
:type seconds: int
"""
return NotImplemented
def after_sleep(self, task: Task, seconds: float):
"""
This method is called after a tasks
awakes from sleeping
:param task: The Task that has just slept
:type task: :class: structio.objects.Task
:param seconds: The amount of seconds the
task slept for
:type seconds: float
"""
return NotImplemented
def before_io(self, timeout: float):
"""
This method is called right before
the event loop checks for I/O events
:param timeout: The max. amount of seconds
that the loop will hang for while waiting
for I/O events
:type timeout: float
"""
return NotImplemented
def after_io(self, timeout: float):
"""
This method is called right after
the event loop has checked for I/O events
:param timeout: The actual amount of seconds
that the loop has hung for while waiting
for I/O events
:type timeout: float
"""
return NotImplemented
def before_cancel(self, task: Task):
"""
This method is called right before a task
gets cancelled
:param task: The Task that is about to be cancelled
:type task: :class: structio.objects.Task
"""
return NotImplemented
def after_cancel(self, task: Task) -> object:
"""
This method is called right after a task
gets successfully cancelled
:param task: The Task that was cancelled
:type task: :class: structio.objects.Task
"""
return NotImplemented
def on_exception_raised(self, task: Task, exc: BaseException):
"""
This method is called right after a task
has raised an exception
:param task: The Task that raised the error
:type task: :class: structio.objects.Task
:param exc: The exception that was raised
:type exc: BaseException
"""
return NotImplemented
def on_io_schedule(self, stream, event: int):
"""
This method is called whenever an
I/O resource is scheduled for listening
"""
return NotImplemented
def on_io_unschedule(self, stream):
"""
This method is called whenever a stream
is unregistered from the loop's I/O selector
"""
return NotImplemented
class BaseIOManager(ABC):
"""
Base class for all I/O managers
"""
@abstractmethod
def wait_io(self):
"""
Waits for I/O and reschedules tasks
when data is ready to be read/written
"""
return NotImplemented
@abstractmethod
def request_read(self, rsc: AsyncResource):
"""
"Requests" a read operation on the given
resource to the I/O manager from the current task
"""
return NotImplemented
@abstractmethod
def request_write(self, rsc: AsyncResource):
"""
"Requests" a write operation on the given
resource to the I/O manager from the current task
"""
return NotImplemented
@abstractmethod
def pending(self):
"""
Returns a boolean value that indicates whether
there's any I/O registered in the manager
"""
return NotImplemented
@abstractmethod
def release(self, resource: AsyncResource):
"""
Releases the given async resource from the
manager. Note that the resource is *not*
closed!
"""
return NotImplemented
@abstractmethod
def release_task(self, task: Task):
"""
Releases ownership of the given
resource from the given task. Note
that if the task is being used by
other tasks that this method will
not unschedule it for those as well
"""
return NotImplemented
class SignalManager(ABC):
"""
A signal manager
"""
@abstractmethod
def handle(self, sig: int, frame: FrameType):
"""
Handles the signal
"""
return NotImplemented
@abstractmethod
def install(self):
"""
Installs the signal handler
"""
return NotImplemented
@abstractmethod
def uninstall(self):
"""
Uninstalls the signal handler
"""
return NotImplemented
class BaseKernel(ABC):
"""
Abstract kernel base class
"""
def __init__(self, clock: BaseClock, io_manager: BaseIOManager,
signal_managers: list[SignalManager],
tools: list[BaseDebugger] | None = None,
restrict_ki_to_checkpoints: bool = False):
self.clock = clock
self.current_task: Task | None = None
self.current_pool: "TaskPool" = None
self.current_scope: "TaskScope" = None
self.tools: list[BaseDebugger] = tools or []
self.restrict_ki_to_checkpoints: bool = restrict_ki_to_checkpoints
self.running: bool = False
self.io_manager = io_manager
self.signal_managers = signal_managers
self.entry_point: Task | None = None
# Pool for system tasks
self.pool: "TaskPool" = None
@abstractmethod
def signal_notify(self, sig: int, frame: FrameType):
"""
Notifies the event loop that a signal was
received. If the signal was supposed to trigger
any exceptions (i.e. SIGINT -> KeyboardInterrupt),
this handler being called means the current context
does not allow for an exception to be raised right
now. Implementors should make sure to remember that
this method was called and to deliver the appropriate
exception as soon as possible
"""
return NotImplemented
@abstractmethod
def spawn(self, func: Callable[[Any, Any], Coroutine[Any, Any, Any]], *args):
"""
Readies a task for execution. All positional arguments are passed
to the given coroutine (for keyword arguments, use functools.partial)
"""
return NotImplemented
@abstractmethod
def spawn_system_task(self, func: Callable[[Any, Any], Coroutine[Any, Any, Any]], *args):
"""
Spawns a system task. System tasks run in a special internal
task pool and begin execution in a scope shielded by cancellations
and with Ctrl+C protection enabled
"""
return NotImplemented
@abstractmethod
def get_closest_deadline(self):
"""
Returns the closest deadline to be satisfied
"""
return NotImplemented
@abstractmethod
def setup(self):
"""
This method is called right before startup and can
be used by implementors to perform extra setup before
starting the event loop
"""
@abstractmethod
def teardown(self):
"""
This method is called right before exiting, even
if an error occurred, and can be used by implementors
to perform extra cleanup before terminating the event loop
"""
@abstractmethod
def throw(self, task: Task, err: BaseException):
"""
Throws the given exception into the given
task
"""
return NotImplemented
@abstractmethod
def reschedule(self, task: Task):
"""
Reschedules the given task for further
execution
"""
return NotImplemented
@abstractmethod
def event(self, evt_name, *args):
"""
Fires the specified event for every registered tool
in the event loop
"""
return NotImplemented
@abstractmethod
def run(self):
"""
This is the actual "loop" part
of the "event loop"
"""
return NotImplemented
@abstractmethod
def sleep(self, amount):
"""
Puts the current task to sleep for the given amount of
time as defined by our current clock
"""
return NotImplemented
@abstractmethod
def suspend(self):
"""
Suspends the current task until it is rescheduled
"""
return NotImplemented
@abstractmethod
def get_closest_deadline_owner(self) -> Task:
"""
Similar to get_closest_deadline, but it returns
the task which will be rescheduled instead of its
deadline
"""
return NotImplemented
@abstractmethod
def init_scope(self, scope):
"""
Initializes the given task scope (called by
TaskScope.__enter__)
"""
return NotImplemented
@abstractmethod
def close_scope(self, scope):
"""
Closes the given task scope (called by
TaskScope.__exit__)
"""
return NotImplemented
@abstractmethod
def init_pool(self, pool):
"""
Initializes the given task pool (called by
TaskPool.__aenter__)
"""
return NotImplemented
@abstractmethod
def close_pool(self, pool):
"""
Closes the given task pool (called by
TaskPool.__aexit__)
"""
return NotImplemented
@abstractmethod
def cancel_scope(self, scope):
"""
Cancels the given scope
"""
return NotImplemented
def start(self, entry_point: Callable[[Any, Any], Coroutine[Any, Any, Any]], *args):
"""
Starts the event loop from a synchronous entry
point. This method only returns once execution
has finished. Normally, this method doesn't need
to be overridden: consider using setup() and teardown()
if you need to do some operations before startup/teardown
"""
self.setup()
self.event("on_start")
self.current_pool = self.pool
self.entry_point = self.spawn(entry_point, *args)
self.current_pool.scope.owner = self.entry_point
self.entry_point.pool = self.current_pool
self.current_pool.entry_point = entry_point
try:
self.run()
finally:
self.teardown()
self.close(force=True)
if self.entry_point.exc:
raise self.entry_point.exc
self.event("on_exit")
return self.entry_point.result
@abstractmethod
def done(self):
"""
Returns whether the loop has work to do
"""
return NotImplemented
def close(self, force: bool = False):
"""
Terminates and shuts down the event loop
This method is meant to be extended by
implementations to do their own cleanup
:param force: When force equals false,
the default, and the event loop is
not done, this function raises a
StructIOException
"""
if not self.done() and not force:
raise StructIOException("the event loop is running")

136
structio/core/context.py Normal file
View File

@ -0,0 +1,136 @@
import structio
from structio.core.run import current_loop
from structio.core.task import Task
from structio.core.syscalls import suspend
from typing import Callable, Coroutine, Any
class TaskScope:
"""
A task scope
"""
def __init__(self, timeout: int | float | None = None, silent: bool = False):
"""
Public object constructor
"""
# When do we expire?
self.timeout = timeout
# Do we raise an error on timeout?
self.silent = silent
# Have we timed out?
self.timed_out: bool = False
# Can we be indirectly cancelled? Note that this
# does not affect explicit cancellations via the
# cancel() method
self.cancellable: bool = True
# Data about inner and outer scopes.
# This is used internally to make sure
# nesting task scopes works as expected
self.inner: TaskScope | None = None
self.outer: TaskScope | None = None
# Which tasks do we contain?
self.tasks: list[Task] = []
self.owner: Task | None = None
def cancel(self):
"""
Cancels the task scope and all the work
that belongs to it
"""
current_loop().cancel_scope(self)
def get_actual_timeout(self):
"""
Returns the effective timeout of the whole
cancel scope. This is different from the
self.timeout parameter because cancel scopes
can be nested, and we might have a parent with
a lower timeout than us
:return:
"""
if self.outer is None:
return self.timeout
return min([self.timeout, self.outer.get_actual_timeout()])
def __enter__(self):
self.timeout = current_loop().clock.deadline(self.timeout)
current_loop().init_scope(self)
return self
def __exit__(self, exc_type: type, exc_val: BaseException, exc_tb):
current_loop().close_scope(self)
if isinstance(exc_val, TimeoutError) and self.timed_out:
return self.silent
return False
def done(self):
"""
Returns whether the task scope has finished executing
"""
if self.inner and not self.inner.done():
return False
return all(task.done() for task in self.tasks)
class TaskPool:
"""
A task pool
"""
def __init__(self):
"""
Public object constructor
"""
self.entry_point: Task | None = None
self.scope: TaskScope = TaskScope(timeout=float("inf"))
# Data about inner and outer pools.
# This is used internally to make sure
# nesting task pools works as expected
self.inner: TaskPool | None = None
self.outer: TaskPool | None = None
# Have we errored out?
self.error: BaseException | None = None
async def __aenter__(self):
self.scope.__enter__()
current_loop().init_pool(self)
return self
async def __aexit__(self, exc_type: type, exc_val: BaseException, exc_tb):
try:
await suspend()
except (Exception, KeyboardInterrupt) as e:
self.error = e
self.scope.cancel()
finally:
current_loop().close_pool(self)
self.scope.__exit__(exc_type, exc_val, exc_tb)
if self.error:
raise self.error
def done(self):
"""
Returns whether the task pool has finished executing
"""
return self.scope.done()
def spawn(self, func: Callable[[Any, Any], Coroutine[Any, Any, Any]], *args) -> Task:
"""
Schedule a new concurrent task for execution in the task pool from the given
async function. All positional arguments are passed to the underlying coroutine
(for keyword arguments, consider using functools.partial). A Task object is
returned. Note that the coroutine is merely scheduled to run and does not begin
executing until it is picked by the scheduler later on
"""
return current_loop().spawn(func, *args)

View File

@ -0,0 +1,13 @@
class StructIOException(Exception):
"""
A generic StructIO error
"""
class Cancelled(BaseException):
# We inherit from BaseException
# so that users don't accidentally
# ignore cancellations
"""
A cancellation exception
"""

View File

View File

@ -0,0 +1,356 @@
import traceback
import warnings
from types import FrameType
from structio.core.abc import BaseKernel, BaseClock, BaseDebugger, BaseIOManager, SignalManager
from structio.core.context import TaskPool, TaskScope
from structio.core.task import Task, TaskState
from structio.util.ki import CTRLC_PROTECTION_ENABLED
from structio.core.time.queue import TimeQueue
from structio.core.exceptions import StructIOException, Cancelled
from collections import deque
from typing import Callable, Coroutine, Any
from functools import partial
import signal
class FIFOKernel(BaseKernel):
"""
An asynchronous event loop implementation
with a FIFO scheduling policy
"""
def __init__(self, clock: BaseClock, io_manager: BaseIOManager,
signal_managers: list[SignalManager],
tools: list[BaseDebugger] | None = None,
restrict_ki_to_checkpoints: bool = False):
super().__init__(clock, io_manager, signal_managers, tools, restrict_ki_to_checkpoints)
self.skip: bool = False
# Tasks that are ready to run
self.run_queue: deque[Task] = deque()
# Data to send back to tasks
self.data: dict[Task, Any] = {}
# Have we handled SIGINT?
self._sigint_handled: bool = False
# Paused tasks along with their deadlines
self.paused: TimeQueue = TimeQueue(self.clock)
# All task scopes we handle
self.scopes: list[TaskScope] = []
self.pool = TaskPool()
self.current_pool = self.pool
self.current_scope = self.current_pool.scope
self.current_scope.cancellable = False
self.scopes.append(self.current_scope)
def get_closest_deadline(self):
return min([self.current_scope.get_actual_timeout(), self.paused.get_closest_deadline()])
def get_closest_deadline_owner(self):
return self.paused.peek()
def event(self, evt_name: str, *args):
if not hasattr(BaseDebugger, evt_name):
warnings.warn(f"Invalid debugging event fired: {evt_name!r}")
return
for tool in self.tools:
if f := getattr(tool, evt_name, None):
try:
f(*args)
except BaseException as e:
# We really can't afford to have our internals explode,
# sorry!
warnings.warn(f"Exception during debugging event delivery ({evt_name!r}): {type(e).__name__} -> {e}",
)
traceback.print_tb(e.__traceback__)
def done(self):
if any([self.run_queue, self.paused, self.io_manager.pending()]):
return False
for scope in self.scopes:
if not scope.done():
return False
return True
def spawn(self, func: Callable[[Any, Any], Coroutine[Any, Any, Any]], *args):
task = Task(func.__name__ or repr(func), func(*args), self.current_pool)
# We inject our magic secret variable into the coroutine's stack frame, so
# we can look it up later
task.coroutine.cr_frame.f_locals.setdefault(CTRLC_PROTECTION_ENABLED, False)
task.pool.scope.tasks.append(task)
# self.data[self.current_task] = task
self.run_queue.append(task)
self.event("on_task_spawn")
return task
def spawn_system_task(self, func: Callable[[Any, Any], Coroutine[Any, Any, Any]], *args):
task = Task(func.__name__ or repr(func), func(*args), self.pool)
task.coroutine.cr_frame.f_locals.setdefault(CTRLC_PROTECTION_ENABLED, True)
task.pool.scope.tasks.append(task)
# self.data[self.current_task] = task
self.run_queue.append(task)
self.event("on_task_spawn")
return task
def signal_notify(self, sig: int, frame: FrameType):
match sig:
case signal.SIGINT:
self._sigint_handled = True
case _:
pass
def step(self):
"""
Run a single task step (i.e. until an "await" to our
primitives somewhere)
"""
self.current_task = self.run_queue.popleft()
while self.current_task.done():
if not self.run_queue:
return
self.current_task = self.run_queue.popleft()
runner = partial(self.current_task.coroutine.send, self.data.pop(self.current_task, None))
if self.current_task.pending_cancellation:
_runner = partial(self.current_task.coroutine.throw, Cancelled())
elif self._sigint_handled:
self._sigint_handled = False
runner = partial(self.current_task.coroutine.throw, KeyboardInterrupt())
self.event("before_task_step", self.current_task)
self.running = True
method, args, kwargs = runner()
if not callable(getattr(self, method, None)):
# This if block is meant to be triggered by other async
# libraries, which most likely have different method names and behaviors
# compared to us. If you get this exception, and you're 100% sure you're
# not mixing async primitives from other libraries, then it's a bug!
self.throw(self.current_task, StructIOException("Uh oh! Something bad just happened: did you try to mix "
"primitives from other async libraries?"))
# Sneaky method call, thanks to David Beazley for this ;)
getattr(self, method)(*args, **kwargs)
self.event("after_task_step", self.current_task)
def throw(self, task: Task, err: BaseException):
if task.done():
return
if task.state == TaskState.PAUSED:
self.paused.discard(task)
self.handle_errors(partial(task.coroutine.throw, err), task)
def reschedule(self, task: Task):
self.run_queue.append(task)
def check_cancelled(self):
if self.current_task.pending_cancellation:
self.throw(self.current_task, Cancelled())
def sleep(self, amount):
"""
Puts the current task to sleep for the given amount of
time as defined by our current clock
"""
# Just to avoid code duplication, you know
self.suspend()
if amount > 0:
self.event("before_sleep", self.current_task, amount)
self.current_task.next_deadline = self.current_task.paused_when + amount
self.paused.put(self.current_task, amount)
else:
# If sleep is called with 0 as argument,
# then it's just a checkpoint!
self.skip = True
self.reschedule_running()
def check_scopes(self):
for scope in self.scopes:
if scope.timed_out:
continue
if scope.get_actual_timeout() <= self.clock.current_time():
scope.timed_out = True
scope.cancel()
self.throw(scope.owner, TimeoutError("timed out"))
def wakeup(self):
while self.paused and self.paused.peek().next_deadline <= self.clock.current_time():
task, _ = self.paused.get()
task.next_deadline = 0
self.event("after_sleep", task, task.paused_when - self.clock.current_time())
self.reschedule(task)
def run(self):
"""
This is the actual "loop" part
of the "event loop"
"""
while not self.done():
if self.run_queue and not self.skip:
self.handle_errors(self.step)
self.running = False
self.skip = False
if self._sigint_handled and not self.restrict_ki_to_checkpoints:
self.throw(self.entry_point, KeyboardInterrupt())
if self.io_manager.pending():
self.io_manager.wait_io()
self.wakeup()
self.check_scopes()
self.close()
def reschedule_running(self):
"""
Reschedules the currently running task
"""
self.run_queue.append(self.current_task)
def handle_errors(self, func: Callable, task: Task | None = None):
"""
Convenience method for handling various exceptions
from tasks
"""
try:
func()
except StopIteration as ret:
# We re-define it because we call step() with
# this method and that changes the current task
task = task or self.current_task
# At the end of the day, coroutines are generator functions with
# some tricky behaviors, and this is one of them. When a coroutine
# hits a return statement (either explicit or implicit), it raises
# a StopIteration exception, which has an attribute named value that
# represents the return value of the coroutine, if it has one. Of course
# this exception is not an error, and we should happily keep going after it:
# most of this code below is just useful for internal/debugging purposes
task.state = TaskState.FINISHED
task.result = ret.value
self.on_success(self.current_task)
except Cancelled:
# When a task needs to be cancelled, we try to do it gracefully first:
# if the task is paused in either I/O or sleeping, that's perfect.
# But we also need to cancel a task if it was not sleeping or waiting on
# any I/O because it could never do so (therefore blocking everything
# forever). So, when cancellation can't be done right away, we schedule
# it for the next execution step of the task. We will also make sure
# to re-raise cancellations at every checkpoint until the task lets the
# exception propagate into us, because we *really* want the task to be
# cancelled
task = task or self.current_task
task.state = TaskState.CANCELLED
task.pending_cancellation = False
self.event("after_cancel")
self.on_cancel(task)
except (Exception, KeyboardInterrupt) as err:
# Any other exception is caught here
task = task or self.current_task
task.exc = err
task.state = TaskState.CRASHED
self.event("on_exception_raised", task)
self.on_error(task)
def release(self, task: Task):
"""
Releases the timeouts and associated
I/O resourced that the given task owns
"""
self.io_manager.release_task(task)
self.paused.discard(task)
def on_success(self, task: Task):
"""
The given task has exited gracefully: hooray!
"""
# TODO: Anything else?
task.pool: TaskPool
if task.pool.done() and task is not self.entry_point:
self.reschedule(task.pool.entry_point)
self.event("on_task_exit", task)
self.io_manager.release_task(task)
def on_error(self, task: Task):
"""
The given task raised an exception
"""
self.event("on_exception_raised", task, task.exc)
current = task.pool.scope
while current and current is not self.pool.scope:
# Unroll nested task scopes until one of
# them catches the exception, or we reach
# the topmost one (i.e. ours), in which case
# we'll crash later
current.cancel()
# We re-raise the original exception into
# the parent of the task scope
# TODO: Implement something akin to trio.MultiError, or (better)
# ExceptionGroup (which is Python 3.11+ only)
self.throw(current.owner, task.exc)
if current.owner.state.done():
# The scope's entry point has managed
# the exception and has exited, we can
# proceed!
break
current = current.outer
self.release(task)
def on_cancel(self, task: Task):
"""
The given task crashed because of a
cancellation exception
"""
self.release(task)
def init_scope(self, scope: TaskScope):
scope.owner = self.current_task
self.current_scope.inner = scope
scope.outer = self.current_scope
self.current_scope = scope
self.scopes.append(scope)
def close_scope(self, scope: TaskScope):
assert scope == self.current_scope
self.current_scope = scope.outer
self.reschedule(scope.owner)
self.scopes.pop()
def cancel_task(self, task: Task):
if task.done():
return
self.throw(task, Cancelled())
if task.state != TaskState.CANCELLED:
task.pending_cancellation = True
def cancel_scope(self, scope: TaskScope):
if scope.done():
return
inner = scope.inner
if inner and inner.cancellable and inner is not self.pool.scope:
scope.inner.cancel()
self.reschedule(inner.owner)
for task in scope.tasks:
self.cancel_task(task)
def init_pool(self, pool: TaskPool):
pool.outer = self.current_pool
pool.entry_point = self.current_task
self.current_pool.inner = pool
self.current_pool = pool
def close_pool(self, pool: TaskPool):
self.current_pool = self.current_pool.outer
def suspend(self):
self.current_task.state = TaskState.PAUSED
self.current_task.paused_when = self.clock.current_time()
def setup(self):
for manager in self.signal_managers:
manager.install()
def teardown(self):
for manager in self.signal_managers:
manager.uninstall()

View File

View File

View File

@ -0,0 +1,80 @@
from collections import defaultdict
from structio.core.abc import BaseIOManager, AsyncResource, BaseKernel
from structio.core.context import Task
from structio.core.run import current_loop, current_task
import socket
import select
class SimpleIOManager(BaseIOManager):
"""
A simple, cross-platform, select()-based
I/O manager
"""
def __init__(self):
"""
Public object constructor
"""
self.readers: dict[AsyncResource, Task] = {}
self.writers: dict[AsyncResource, Task] = {}
# This allows us to have a bidirectional mapping:
# we know both which tasks are using which resources
# and which resources are used by which tasks,
# without having to go through too many hoops and jumps.
self.tasks: dict[Task, list[AsyncResource]] = defaultdict(list)
def pending(self):
# We don't return bool(self.resources) because there is
# no pending I/O to do if no tasks are waiting to read or
# write, even if there's dangling resources around!
return bool(self.readers or self.writers)
def _collect_readers(self) -> list[AsyncResource]:
"""
Collects all resources that need to be read from,
so we can select() on them later
"""
result = []
for resource in self.readers:
result.append(resource)
return result
def _collect_writers(self) -> list[AsyncResource]:
"""
Collects all resources that need to be written to,
so we can select() on them later
"""
result = []
for resource in self.writers:
result.append(resource)
return result
def wait_io(self):
kernel: BaseKernel = current_loop()
readable, writable, _ = select.select(self._collect_readers(), self._collect_writers(),
[], kernel.get_closest_deadline())
for read_ready in readable:
kernel.reschedule(self.readers[read_ready])
for write_ready in writable:
kernel.reschedule(self.writers[write_ready])
def request_read(self, rsc: AsyncResource):
task = current_task()
self.readers[rsc] = task
def request_write(self, rsc: AsyncResource):
task = current_task()
self.writers[rsc] = task
def release(self, resource: AsyncResource):
self.readers.pop(resource, None)
self.writers.pop(resource, None)
def release_task(self, task: Task):
for resource in self.tasks[task]:
self.release(resource)

View File

@ -0,0 +1,35 @@
from structio.core.abc import SignalManager
from types import FrameType
from structio.util.ki import currently_protected
from structio.core.run import current_loop
import warnings
import signal
class SigIntManager(SignalManager):
"""
Handles Ctrl+C
"""
def __init__(self):
self.installed = False
def handle(self, sig: int, frame: FrameType):
loop = current_loop()
if currently_protected():
loop.signal_notify(sig, frame)
else:
raise KeyboardInterrupt()
def install(self):
if signal.getsignal(signal.SIGINT) != signal.default_int_handler:
warnings.warn(f"structio has detected a custom SIGINT handler and won't touch it: keep in mind"
f" this is likely to break KeyboardInterrupt delivery!")
return
signal.signal(signal.SIGINT, self.handle)
self.installed = True
def uninstall(self):
if self.installed:
signal.signal(signal.SIGINT, signal.default_int_handler)
self.installed = False

68
structio/core/run.py Normal file
View File

@ -0,0 +1,68 @@
import inspect
import functools
from threading import local
from structio.core.abc import BaseKernel, BaseDebugger, BaseClock, SignalManager, BaseIOManager
from structio.core.exceptions import StructIOException
from structio.core.task import Task
from typing import Callable, Any, Coroutine
_RUN = local()
def current_loop() -> BaseKernel:
try:
return _RUN.kernel
except AttributeError:
raise StructIOException("must be called from async context") from None
def current_task() -> Task:
return current_loop().current_task
def new_event_loop(kernel: BaseKernel):
"""
Initializes a new event loop using the
given kernel implementation. Cannot be
called from an asynchronous context
"""
try:
current_loop()
except StructIOException:
_RUN.kernel = kernel
else:
if not _RUN.kernel.done():
raise StructIOException("cannot be called from async context") from None
def run(func: Callable[[Any, Any], Coroutine[Any, Any, Any]],
kernel: type, io_manager: BaseIOManager, signal_managers: list[SignalManager],
clock: BaseClock, tools: list[BaseDebugger] | None = None,
restrict_ki_to_checkpoints: bool = False,
*args):
"""
Starts the event loop from a synchronous entry point. All
positional arguments are passed to the given coroutine
function. If you want to pass keyword arguments, consider
using functools.partial()
"""
if not issubclass(kernel, BaseKernel):
raise TypeError(f"kernel must be a subclass of structio.core.abc.BaseKernel!")
params = []
check = func
if isinstance(func, functools.partial):
check = func.func
if inspect.iscoroutine(check):
raise StructIOException(
"Looks like you tried to call structio.run(your_func(arg1, arg2, ...)), that is wrong!"
"\nWhat you wanna do, instead, is this: structio.run(your_func, arg1, arg2, ...)"
)
elif not inspect.iscoroutinefunction(check):
raise StructIOException("structio.run() requires an async function as its first argument!")
params.extend(args)
new_event_loop(kernel(clock=clock, restrict_ki_to_checkpoints=restrict_ki_to_checkpoints,
io_manager=io_manager, signal_managers=signal_managers,
tools=tools))
return current_loop().start(func, *args)

70
structio/core/syscalls.py Normal file
View File

@ -0,0 +1,70 @@
from types import coroutine
from typing import Any
@coroutine
def syscall(method: str, *args, **kwargs) -> Any | None:
"""
Lowest-level primitive to interact with the event loop:
calls a loop method with the provided arguments. This
function should not be used directly, but through abstraction
layers. All positional and keyword arguments are passed to
the method itself and its return value is provided once the
loop yields control back to us
:param method: The loop method to call
:type method: str
:returns: The result of the method call, if any
"""
result = yield method, args, kwargs
return result
async def sleep(amount):
"""
Puts the caller asleep for the given amount of
time which is, by default, measured in seconds,
but this is not enforced: if a custom clock
implementation is being used, the values passed
to this function may have a different meaning
"""
await syscall("sleep", amount)
async def suspend():
"""
Pauses the caller indefinitely
until it is rescheduled
"""
await syscall("suspend")
async def check_cancelled():
"""
Introduce a cancellation point, but
not a schedule point
"""
return await syscall("check_cancelled")
async def schedule_point():
"""
Introduce a schedule point, but not a
cancellation point
"""
return await syscall("schedule_point")
async def checkpoint():
"""
Introduce a cancellation point and a
schedule point
"""
await check_cancelled()
await schedule_point()

58
structio/core/task.py Normal file
View File

@ -0,0 +1,58 @@
from enum import Enum, auto
from dataclasses import dataclass, field
from typing import Coroutine, Any
class TaskState(Enum):
INIT: int = auto()
RUNNING: int = auto()
PAUSED: int = auto()
FINISHED: int = auto()
CRASHED: int = auto()
CANCELLED: int = auto()
IO: int = auto()
@dataclass
class Task:
"""
An asynchronous task wrapper
"""
# The task's name
name: str
# The underlying coroutine of this
# task
coroutine: Coroutine
# The task's pool
pool: "TaskPool"
# The state of the task
state: TaskState = field(default=TaskState.INIT)
# What error did the task raise, if any?
exc: BaseException | None = None
# The task's return value, if any
result: Any | None = None
# When did the task pause last time?
paused_when: Any = -1
# When is the task's next deadline?
next_deadline: Any = -1
# Is cancellation pending?
pending_cancellation: bool = False
def done(self):
"""
Returns whether the task is running
"""
return self.state in [
TaskState.CRASHED,
TaskState.FINISHED,
TaskState.CANCELLED
]
def __hash__(self):
"""
Implements hash(self)
"""
return self.coroutine.__hash__()

View File

View File

@ -0,0 +1,28 @@
import random
from timeit import default_timer
from structio.core.abc import BaseClock
class DefaultClock(BaseClock):
def __init__(self):
super().__init__()
# We add a large random offset to our timer value
# so users notice the problem if they try to compare
# them across different runs
self.offset: int = random.randint(100_000, 1_000_000)
def start(self):
pass
def setup(self):
pass
def teardown(self):
pass
def current_time(self) -> float:
return default_timer() + self.offset
def deadline(self, deadline):
return self.current_time() + deadline

164
structio/core/time/queue.py Normal file
View File

@ -0,0 +1,164 @@
from typing import Any
from structio.core.task import Task, TaskState
from structio.core.abc import BaseClock
from heapq import heappush, heappop, heapify
class TimeQueue:
"""
An abstraction layer over a heap queue based on time. This is where
paused tasks will be put when they are not running
:param clock: The same clock that was passed to the thread-local event loop.
It is important for the queue to be synchronized with the loop as this allows
the sleeping mechanism to work reliably
"""
def __init__(self, clock: BaseClock):
"""
Object constructor
"""
self.clock = clock
# The sequence float handles the race condition
# of two tasks with identical deadlines, acting
# as a tiebreaker
self.sequence = 0
self.container: list[tuple[float, int, Task, dict[str, Any]]] = []
def peek(self) -> Task:
"""
Returns the first task in the queue
"""
return self.container[0][2]
def __len__(self):
"""
Returns len(self)
"""
return len(self.container)
def __contains__(self, item: Task):
"""
Implements item in self. This method behaves
as if the queue only contained tasks and ignores
their timeouts and tiebreakers
"""
for i in self.container:
if i[2] == item:
return True
return False
def index(self, item: Task):
"""
Returns the index of the given item in the list
or -1 if it is not present
"""
for i, e in enumerate(self.container):
if e[2] == item:
return i
return -1
def discard(self, item: Task):
"""
Discards an item from the queue and
calls heapify(self.container) to keep
the heap invariant if an element is removed.
This method does nothing if the item is not
in the queue, but note that in this case the
operation would still take O(n) iterations
to complete
:param item: The item to be discarded
"""
idx = self.index(item)
if idx != -1:
self.container.pop(idx)
heapify(self.container)
def get_closest_deadline(self) -> float:
"""
Returns the closest deadline that is meant to expire
or raises IndexError if the queue is empty
"""
if not self:
raise IndexError("TimeQueue is empty")
return self.container[0][0]
def __iter__(self):
"""
Implements iter(self)
"""
return self
def __next__(self):
"""
Implements next(self)
"""
try:
return self.get()
except IndexError:
raise StopIteration from None
def __getitem__(self, item: int):
"""
Implements self[n]
"""
return self.container.__getitem__(item)
def __bool__(self):
"""
Implements bool(self)
"""
return bool(self.container)
def __repr__(self):
"""
Implements repr(self) and str(self)
"""
return f"TimeQueue({self.container}, clock={self.clock})"
def put(self, task: Task, delay: float, metadata: dict[str, Any] | None = None):
"""
Pushes a task onto the queue together with its
delay and optional metadata
:param task: The task that is meant to sleep
:type task: :class: Task
:param delay: The delay associated with the task
:type delay: float
:param metadata: A dictionary representing additional
task metadata. Defaults to None
:type metadata: dict[str, Any], optional
"""
time = self.clock.current_time()
task.paused_when = time
task.state = TaskState.PAUSED
task.next_deadline = task.paused_when + delay
heappush(self.container, (time + delay, self.sequence, task, metadata))
self.sequence += 1
def get(self) -> tuple[Task, dict[str, Any] | None]:
"""
Gets the first task that is meant to run along
with its metadata
:raises: IndexError if the queue is empty
"""
if not self.container:
raise IndexError("get from empty TimeQueue")
_, __, task, meta = heappop(self.container)
return task, meta

10
structio/core/tooling.py Normal file
View File

@ -0,0 +1,10 @@
from structio.core.abc import BaseDebugger
class SimpleDebugger(BaseDebugger):
def on_start(self):
print(">> Started")
def on_exit(self):
print(f"<< Stopped")

212
structio/sync.py Normal file
View File

@ -0,0 +1,212 @@
# Task synchronization primitives
from structio.core.syscalls import suspend
from structio.core.run import current_task, current_loop
from structio.core.abc import ChannelReader, ChannelWriter, Channel
from structio.util.ki import enable_ki_protection
from collections import deque
from typing import Any
class Event:
"""
A wrapper around a boolean value that can be waited
on asynchronously. The majority of structio's API is
designed on top of/around this class, as it constitutes
the simplest synchronization primitive there is
"""
def __init__(self):
"""
Public object constructor
"""
self._set = False
self.waiters = deque()
def is_set(self):
return self._set
@enable_ki_protection
async def wait(self):
"""
Wait until someone else calls set() on
this event. If the event has already been
set, this method returns immediately
"""
if self.is_set():
return
self.waiters.append(current_task())
await suspend() # We get re-scheduled by set()
def set(self):
"""
Sets
:return:
"""
if self.is_set():
raise RuntimeError("the event has already been set")
self._set = True
for waiter in self.waiters:
current_loop().reschedule(waiter)
self.waiters.clear()
class Queue:
"""
An asynchronous FIFO queue
"""
def __init__(self, maxsize: int | None = None):
"""
Object constructor
"""
self.maxsize = maxsize
# Stores event objects for tasks wanting to
# get items from the queue
self.getters = deque()
# Stores event objects for tasks wanting to
# put items on the queue
self.putters = deque()
self.container = deque()
def __len__(self):
"""
Returns the length of the queue
"""
return len(self.container)
def __repr__(self) -> str:
return f"{type(self).__name__}({f', '.join(map(str, self.container))})"
async def __aiter__(self):
"""
Implements the asynchronous iterator protocol
"""
return self
async def __anext__(self):
"""
Implements the asynchronous iterator protocol
"""
return await self.get()
@enable_ki_protection
async def put(self, item: Any):
"""
Pushes an element onto the queue. If the
queue is full, waits until there's
enough space for the queue
"""
if self.getters:
self.getters.popleft().set()
self.container.append(item)
elif len(self.container) < self.maxsize:
self.container.append(item)
else:
self.putters.append(Event())
await self.putters[-1].wait()
self.container.append(item)
@enable_ki_protection
async def get(self) -> Any:
"""
Pops an element off the queue. Blocks until
an element is put onto it again if the queue
is empty
"""
if self.putters:
self.putters.popleft().set()
if not self.container:
self.getters.append(Event())
await self.getters[-1].wait()
return self.container.popleft()
def clear(self):
"""
Clears the queue
"""
self.container.clear()
def reset(self):
"""
Resets the queue
"""
self.clear()
self.getters.clear()
self.putters.clear()
class MemorySendChannel(ChannelWriter):
"""
An in-memory one-way channel to send
data
"""
def __init__(self, buffer):
self.buffer = buffer
self._closed = False
async def send(self, value):
if self._closed:
raise IOError("cannot operate on a closed channel")
await self.buffer.put(value)
async def close(self):
if self._closed:
raise IOError("cannot operate on a closed channel")
self._closed = True
class MemoryReceiveChannel(ChannelReader):
"""
An in-memory one-way channel to read
data
"""
def __init__(self, buffer):
self.buffer = buffer
self._closed = False
async def receive(self):
if self._closed:
raise IOError("cannot operate on a closed channel")
return await self.buffer.get()
async def close(self):
if self._closed:
raise IOError("cannot operate on a closed channel")
self._closed = True
class MemoryChannel(Channel):
"""
An in-memory two-way channel between
tasks with optional buffering
"""
def __init__(self, buffer_size):
self._buffer = Queue(buffer_size)
self.reader = MemoryReceiveChannel(self._buffer)
self.writer = MemorySendChannel(self._buffer)
async def receive(self):
return self.reader.receive()
async def send(self, value):
return self.writer.send(value)
async def close(self):
await self.reader.close()
await self.writer.close()

View File

124
structio/util/ki.py Normal file
View File

@ -0,0 +1,124 @@
"""
aiosched: Yet another Python async scheduler
Copyright (C) 2022 nocturn9x
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
https:www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
import sys
import inspect
from functools import wraps
from types import FrameType
# Special magic module half-stolen from Trio (thanks njsmith I love you)
# that makes Ctrl+C work. P.S.: Please Python, get your signals straight.
# Just a funny variable name that is not a valid
# identifier (but still a string so tools that hack
# into frames don't freak out when they look at the
# local variables) which will get injected silently
# into every frame to enable/disable the safeguards
# for Ctrl+C/KeyboardInterrupt
CTRLC_PROTECTION_ENABLED = "|yes-it-is|"
def critical_section(frame: FrameType) -> bool:
"""
Returns whether Ctrl+C protection is currently
enabled in the given frame or in any of its children.
Stolen from Trio
"""
while frame is not None:
if CTRLC_PROTECTION_ENABLED in frame.f_locals:
return frame.f_locals[CTRLC_PROTECTION_ENABLED]
if frame.f_code.co_name == "__del__":
return True
frame = frame.f_back
return True
def currently_protected() -> bool:
"""
Returns whether Ctrl+C protection is currently
enabled in the current context
"""
return critical_section(sys._getframe())
def legacy_isasyncgenfunction(obj):
return getattr(obj, "_async_gen_function", None) == id(obj)
def _ki_protection_decorator(enabled):
def decorator(fn):
# In some version of Python, isgeneratorfunction returns true for
# coroutine functions, so we have to check for coroutine functions
# first.
if inspect.iscoroutinefunction(fn):
@wraps(fn)
def wrapper(*args, **kwargs):
# See the comment for regular generators below
coro = fn(*args, **kwargs)
coro.cr_frame.f_locals[CTRLC_PROTECTION_ENABLED] = enabled
return coro
return wrapper
elif inspect.isgeneratorfunction(fn):
@wraps(fn)
def wrapper(*args, **kwargs):
# It's important that we inject this directly into the
# generator's locals, as opposed to setting it here and then
# doing 'yield from'. The reason is, if a generator is
# throw()n into, then it may magically pop to the top of the
# stack. And @contextmanager generators in particular are a
# case where we often want KI protection, and which are often
# thrown into! See:
# https://bugs.python.org/issue29590
gen = fn(*args, **kwargs)
gen.gi_frame.f_locals[CTRLC_PROTECTION_ENABLED] = enabled
return gen
return wrapper
elif inspect.isasyncgenfunction(fn) or legacy_isasyncgenfunction(fn):
@wraps(fn)
def wrapper(*args, **kwargs):
# See the comment for regular generators above
agen = fn(*args, **kwargs)
agen.ag_frame.f_locals[CTRLC_PROTECTION_ENABLED] = enabled
return agen
return wrapper
else:
@wraps(fn)
def wrapper(*args, **kwargs):
locals()[CTRLC_PROTECTION_ENABLED] = enabled
return fn(*args, **kwargs)
return wrapper
return decorator
enable_ki_protection = _ki_protection_decorator(True)
enable_ki_protection.__name__ = "enable_ki_protection"
disable_ki_protection = _ki_protection_decorator(False)
disable_ki_protection.__name__ = "disable_ki_protection"

25
tests/events.py Normal file
View File

@ -0,0 +1,25 @@
import structio
async def child(ev: structio.Event, n):
print(f"[child] I'm alive! Waiting {n} seconds before setting the event")
await structio.sleep(n)
print("[child] Slept! Setting the event")
ev.set()
assert ev.is_set()
async def main(i):
print("[main] Parent is alive")
j = structio.clock()
async with structio.create_pool() as pool:
evt = structio.Event()
print("[main] Spawning child")
pool.spawn(child, evt, i)
print("[main] Child spawned, waiting on the event")
await evt.wait()
assert evt.is_set()
print(f"[main] Exited in {structio.clock() - j:.2f} seconds")
structio.run(main, 5)

32
tests/ki_test.py Normal file
View File

@ -0,0 +1,32 @@
import structio
async def child(n: int):
print(f"Going to sleep for {n} seconds!")
i = structio.clock()
try:
await structio.sleep(n)
except structio.Cancelled:
slept = structio.clock() - i
print(f"Oh no, I've been cancelled! (was gonna sleep {n - slept:.2f} more seconds)")
raise
print(f"Slept for {structio.clock() - i:.2f} seconds!")
async def main() -> int:
print("Parent is alive. Spawning children")
t = structio.clock()
try:
async with structio.create_pool() as pool:
pool.spawn(child, 5)
pool.spawn(child, 3)
pool.spawn(child, 8)
print(f"Children spawned, awaiting completion")
except KeyboardInterrupt:
print("Ctrl+C caught")
print(f"Children have completed in {structio.clock() - t:.2f} seconds")
return 0
assert structio.run(main) == 0
print("Execution complete")

47
tests/memory_channel.py Normal file
View File

@ -0,0 +1,47 @@
import structio
from typing import Any
async def reader(ch: structio.ChannelReader):
print("[reader] Reader is alive!")
async with ch:
while True:
print(f"[reader] Awaiting messages")
data = await ch.receive()
if not data:
break
print(f"[reader] Got: {data}")
# Simulates some work
await structio.sleep(1)
print("[reader] Done!")
async def writer(ch: structio.ChannelWriter, objects: list[Any]):
print("[writer] Writer is alive!")
async with ch:
for obj in objects:
print(f"[writer] Sending {obj!r}")
await ch.send(obj)
print(f"[writer] Sent {obj!r}")
# Let's make the writer twice as fast as the receiver
# to test backpressure :)
await structio.sleep(0.5)
await ch.send(None)
print("[writer] Done!")
async def main(objects: list[Any]):
print("[main] Parent is alive")
# We construct a new memory channel...
channel = structio.MemoryChannel(1)
async with structio.create_pool() as pool:
# ... and dispatch the two ends to different
# tasks. Isn't this neat?
pool.spawn(reader, channel.reader)
pool.spawn(writer, channel.writer, objects)
print("[main] Children spawned")
print("[main] Done!")
structio.run(main, [1, 2, 3, 4])

View File

@ -0,0 +1,44 @@
import structio
async def successful(name: str, n: int):
before = structio.clock()
print(f"[child {name}] Sleeping for {n} seconds")
await structio.sleep(n)
print(f"[child {name}] Done! Slept for {structio.clock() - before:.2f} seconds")
async def failing(name: str, n: int):
before = structio.clock()
print(f"[child {name}] Sleeping for {n} seconds")
await structio.sleep(n)
print(f"[child {name}] Done! Slept for {structio.clock() - before:.2f} seconds, raising now!")
raise TypeError("waa")
async def main(
children_outer: list[tuple[str, int]], children_inner: list[tuple[str, int]]
):
before = structio.clock()
try:
async with structio.create_pool() as p1:
print(f"[main] Spawning children in first context ({hex(id(p1))})")
for name, delay in children_outer:
p1.spawn(successful, name, delay)
print("[main] Children spawned")
async with structio.create_pool() as p2:
print(f"[main] Spawning children in second context ({hex(id(p2))})")
for name, delay in children_inner:
p2.spawn(failing, name, delay)
print("[main] Children spawned")
except TypeError:
print("[main] TypeError caught!")
print(f"[main] Children exited in {structio.clock() - before:.2f} seconds")
if __name__ == "__main__":
structio.run(
main,
[("first", 1), ("third", 3)],
[("second", 2), ("fourth", 4)],
)

View File

@ -0,0 +1,30 @@
import structio
from nested_pool_inner_raises import successful, failing
async def main(
children_outer: list[tuple[str, int]], children_inner: list[tuple[str, int]]
):
before = structio.clock()
try:
async with structio.create_pool() as p1:
print(f"[main] Spawning children in first context ({hex(id(p1))})")
for name, delay in children_outer:
p1.spawn(failing, name, delay)
print("[main] Children spawned")
async with structio.create_pool() as p2:
print(f"[main] Spawning children in second context ({hex(id(p2))})")
for name, delay in children_inner:
p2.spawn(successful, name, delay)
print("[main] Children spawned")
except TypeError:
print("[main] TypeError caught!")
print(f"[main] Children exited in {structio.clock() - before:.2f} seconds")
if __name__ == "__main__":
structio.run(
main,
[("second", 2), ("third", 3)],
[("first", 1), ("fourth", 4)],
)

40
tests/queue.py Normal file
View File

@ -0,0 +1,40 @@
import structio
async def producer(q: structio.Queue, n: int):
for i in range(n):
# This will wait until the
# queue is emptied by the
# consumer
await q.put(i)
print(f"Produced {i}")
await q.put(None)
print("Producer done")
async def consumer(q: structio.Queue):
while True:
# Hangs until there is
# something on the queue
item = await q.get()
if item is None:
print("Consumer done")
break
print(f"Consumed {item}")
# Simulates some work so the
# producer waits before putting
# the next value
await structio.sleep(1)
async def main(q: structio.Queue, n: int):
print("Starting consumer and producer")
async with structio.create_pool() as ctx:
ctx.spawn(producer, q, n)
ctx.spawn(consumer, q)
print("Bye!")
if __name__ == "__main__":
queue = structio.Queue(2) # Queue has size limit of 2
structio.run(main, queue, 5)

33
tests/sliding_deadline.py Normal file
View File

@ -0,0 +1,33 @@
import structio
async def main(n):
print(f"[main] Starting sliding timer with timeout {n}")
i = structio.clock()
with structio.skip_after(n - 1) as scope:
while n:
# This looks weird, but it allows us to
# handle floating point values (basically
# if n equals say, 7.5, then this loop will
# sleep 7.5 seconds instead of 8), which would
# otherwise cause this loop to run forever and
# the deadline to shift indefinitely into the
# future (because n would never reach zero, getting
# immediately negative instead)
shift = min(n, 1)
print(f"[main] Waiting {shift:.2f} second{'' if shift == 1 else 's'}")
await structio.sleep(shift)
print(f"[main] Shifting deadline")
# Updating the scope's timeout causes
# its deadline to shift accordingly!
scope.timeout += shift
n -= shift
print("[main] Deadline shifting complete")
# Should take about n seconds to run, because we shift
# the deadline of the cancellation n times and wait at most
# 1 second after every shift
print(f"[main] Exited in {structio.clock() - i:.2f} seconds")
structio.run(main, 7.5)

56
tests/timeouts.py Normal file
View File

@ -0,0 +1,56 @@
import structio
async def test_silent(i, j):
print(f"[test] Parent is alive, exiting after {i:.2f} seconds")
k = structio.clock()
with structio.skip_after(i) as scope:
print(f"[test] Sleeping for {j} seconds")
await structio.sleep(j)
print(f"[test] Finished in {structio.clock() - k:.2f} seconds (timed out: {scope.timed_out})")
async def test_loud(i, j):
print(f"[test] Parent is alive, exiting after {i:.2f} seconds")
k = structio.clock()
try:
with structio.with_timeout(i) as scope:
print(f"[test] Sleeping for {j} seconds")
await structio.sleep(j)
except TimeoutError:
print("[test] Timed out!")
print(f"[test] Finished in {structio.clock() - k:.2f} seconds")
async def deadlock():
await structio.Event().wait()
async def test_deadlock(i):
print(f"[test] Parent is alive, will exit in {i} seconds")
t = structio.clock()
with structio.skip_after(i):
print("[test] Entering deadlock")
await deadlock()
print(f"[test] Done in {structio.clock() - t:.2f} seconds")
async def test_nested(i):
print(f"[test] Parent is alive, will exit in {i} seconds")
t = structio.clock()
with structio.skip_after(i):
print("[test] Entered first scope")
with structio.skip_after(i * 2):
# Even though this scope's timeout is
# larger than its parent, structio will
# still cancel it when its containing
# scope expires
print("[test] Entered second scope")
await deadlock()
print(f"[test] Done in {structio.clock() - t:.2f} seconds")
structio.run(test_silent, 3, 5)
structio.run(test_loud, 3, 5)
structio.run(test_deadlock, 5)
structio.run(test_nested, 5)