Initial work on task contexts

This commit is contained in:
Nocturn9x 2022-10-18 17:26:58 +02:00
parent 989981b746
commit f3ff5fcec5
16 changed files with 254 additions and 38 deletions

View File

@ -1,3 +1,18 @@
# aiosched: Yet another Python async scheduler # aiosched: Yet another Python async scheduler
TODO ## Reasoning
Why make yet another async library? Well, there's a few reasons really:
- I'm bored and love reinventing the wheel
- My [first attempt](https://git.nocturn9x.space/nocturn9x/giambio) at writing a modern, fully-featured async scheduler
helped me understand a lot of things about how such a library should be designed, so I'm starting from scratch to apply
what I've learned and hopefully not make the same mistakes as last time
- Did I mention that I'm bored?
## Disclaimer
Everything is very much a work in progress for now, which is reflected by the versioning scheme I'm using (0.0.x): API
changes may (and likely will) occur at any time even between commits, at least until I get to something simple, stable and
most importantly easy to reason about both while reading the library's code and while using it. Once the API is deemed
stable and everything is well tested, I'll start releasing 1.x.y versions of the library (probably under a different name,
as that's a work in progress too).

View File

@ -1,7 +1,7 @@
""" """
aiosched: Yet another Python async scheduler aiosched: Yet another Python async scheduler
Copyright (C) 2020 nocturn9x Copyright (C) 2022 nocturn9x
Licensed under the Apache License, Version 2.0 (the "License"); Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License. you may not use this file except in compliance with the License.
@ -15,9 +15,22 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and See the License for the specific language governing permissions and
limitations under the License. limitations under the License.
""" """
from aiosched.runtime import run, get_event_loop, new_event_loop, clock from aiosched.runtime import run, get_event_loop, new_event_loop, clock, with_context
from aiosched.internals.syscalls import spawn, wait, sleep, cancel from aiosched.internals.syscalls import spawn, wait, sleep, cancel
import aiosched.task import aiosched.task
import aiosched.errors import aiosched.errors
import aiosched.context
__all__ = ["run", "get_event_loop", "new_event_loop", "spawn", "wait", "sleep", "task", "errors", "cancel"]
__all__ = [
"run",
"get_event_loop",
"new_event_loop",
"spawn",
"wait",
"sleep",
"task",
"errors",
"cancel",
"with_context",
]

114
aiosched/context.py Normal file
View File

@ -0,0 +1,114 @@
"""
aiosched: Yet another Python async scheduler
Copyright (C) 2022 nocturn9x
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
https:www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
from aiosched.task import Task
from aiosched.internals.syscalls import spawn, wait, cancel
from typing import Any, Coroutine, Callable
class TaskContext(Task):
"""
An asynchronous task context that automatically waits
for all tasks spawned within it. A TaskContext object
behaves like a task and is handled as a single unit
inside the event loop
"""
name: str = ""
def __init__(self) -> None:
"""
Object constructor
"""
# All the tasks that belong to this context. This
# includes any inner contexts contained within this
# one
self.tasks: list[Task | "TaskContext"] = []
self.name = object.__repr__(self)
# Whether we have been cancelled or not
self.cancelled: bool = False
async def spawn(
self, func: Callable[..., Coroutine[Any, Any, Any]], *args, **kwargs
) -> Task:
"""
Spawns a child task
"""
task = await spawn(func, *args, **kwargs)
self.tasks.append(task)
return task
async def __aenter__(self):
"""
Implements the asynchronous context manager interface
"""
return self
async def __aexit__(self, exc_type: Exception, exc: Exception, tb):
"""
Implements the asynchronous context manager interface, waiting
for all the tasks spawned inside the context
"""
for task in self.tasks:
# This forces the interpreter to stop at the
# end of the block and wait for all
# children to exit
try:
await wait(task)
self.tasks.remove(task)
except BaseException:
self.tasks.remove(task)
await self.cancel()
raise
async def cancel(self):
"""
Cancels the entire context, iterating over all
of its tasks and cancelling them
"""
for task in self.tasks:
await cancel(task)
self.cancelled = True
self.tasks = []
def done(self) -> bool:
"""
Returns True if all the tasks inside the
context have exited, False otherwise
"""
return all([task.done() for task in self.tasks])
def __del__(self):
"""
Context destructor
"""
for task in self.tasks:
task.__del__()
def __repr__(self):
"""
Implements repr(self)
"""
return object.__repr__(self)

View File

@ -1,7 +1,7 @@
""" """
aiosched: Yet another Python async scheduler aiosched: Yet another Python async scheduler
Copyright (C) 2020 nocturn9x Copyright (C) 2022 nocturn9x
Licensed under the Apache License, Version 2.0 (the "License"); Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License. you may not use this file except in compliance with the License.
@ -44,6 +44,13 @@ class ResourceClosed(SchedulerError):
""" """
class ResourceBroken(SchedulerError):
"""
Raised when I/O is attempted on a broken
resource
"""
class TimedOutError(SchedulerError): class TimedOutError(SchedulerError):
""" """
This is raised if a timeout expires This is raised if a timeout expires

View File

@ -1,7 +1,7 @@
""" """
aiosched: Yet another Python async scheduler aiosched: Yet another Python async scheduler
Copyright (C) 2020 nocturn9x Copyright (C) 2022 nocturn9x
Licensed under the Apache License, Version 2.0 (the "License"); Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License. you may not use this file except in compliance with the License.

View File

@ -1,7 +1,7 @@
""" """
aiosched: Yet another Python async scheduler aiosched: Yet another Python async scheduler
Copyright (C) 2020 nocturn9x Copyright (C) 2022 nocturn9x
Licensed under the Apache License, Version 2.0 (the "License"); Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License. you may not use this file except in compliance with the License.

View File

@ -1,7 +1,7 @@
""" """
aiosched: Yet another Python async scheduler aiosched: Yet another Python async scheduler
Copyright (C) 2020 nocturn9x Copyright (C) 2022 nocturn9x
Licensed under the Apache License, Version 2.0 (the "License"); Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License. you may not use this file except in compliance with the License.

View File

@ -1,7 +1,7 @@
""" """
aiosched: Yet another Python async scheduler aiosched: Yet another Python async scheduler
Copyright (C) 2020 nocturn9x Copyright (C) 2022 nocturn9x
Licensed under the Apache License, Version 2.0 (the "License"); Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License. you may not use this file except in compliance with the License.
@ -23,7 +23,7 @@ from timeit import default_timer
from aiosched.internals.queues import TimeQueue from aiosched.internals.queues import TimeQueue
from aiosched.util.debugging import BaseDebugger from aiosched.util.debugging import BaseDebugger
from typing import Callable, Any, Coroutine from typing import Callable, Any, Coroutine
from aiosched.errors import InternalError, ResourceBusy, Cancelled, ResourceClosed from aiosched.errors import InternalError, ResourceBusy, Cancelled, ResourceClosed, ResourceBroken
from selectors import DefaultSelector, BaseSelector from selectors import DefaultSelector, BaseSelector
@ -227,7 +227,9 @@ class FIFOKernel:
else: else:
# Run a single step with the calculation (i.e. until a yield # Run a single step with the calculation (i.e. until a yield
# somewhere) # somewhere)
method, args, kwargs = self.current_task.run(self.data.get(self.current_task)) method, args, kwargs = self.current_task.run(
self.data.get(self.current_task)
)
self.data.pop(self.current_task, None) self.data.pop(self.current_task, None)
if not hasattr(self, method) and not callable(getattr(self, method)): if not hasattr(self, method) and not callable(getattr(self, method)):
# This if block is meant to be triggered by other async # This if block is meant to be triggered by other async
@ -277,21 +279,20 @@ class FIFOKernel:
# Otherwise, while there are tasks ready to run, we run them! # Otherwise, while there are tasks ready to run, we run them!
self.handle_task_run(self.run_task_step) self.handle_task_run(self.run_task_step)
def start(self, func: Callable[..., Coroutine[Any, Any, Any]], *args, loop: bool = True) -> Any: def start(
self, func: Callable[..., Coroutine[Any, Any, Any]], *args, **kwargs
) -> Any:
""" """
Starts the event loop from a synchronous context. If the loop parameter Starts the event loop from a synchronous context
is false, the event loop will not start listening for events
automatically and the dispatching is on the users' shoulders
""" """
entry_point = Task(func.__name__ or str(func), func(*args)) entry_point = Task(func.__name__ or str(func), func(*args, **kwargs))
self.run_ready.append(entry_point) self.run_ready.append(entry_point)
self.debugger.on_start() self.debugger.on_start()
if loop: try:
try: self.run()
self.run() finally:
finally: self.debugger.on_exit()
self.debugger.on_exit()
if entry_point.exc: if entry_point.exc:
raise entry_point.exc raise entry_point.exc
return entry_point.result return entry_point.result
@ -316,10 +317,11 @@ class FIFOKernel:
if task in key.data: if task in key.data:
key.data.remove(task) key.data.remove(task)
if not key.data: if not key.data:
self.notify_closing(key.fileobj, broken=True)
self.selector.unregister(key.fileobj) self.selector.unregister(key.fileobj)
task.last_io = () task.last_io = ()
def notify_closing(self, stream): def notify_closing(self, stream, broken: bool = False):
""" """
Notifies paused tasks that a stream Notifies paused tasks that a stream
is about to be closed. The stream is about to be closed. The stream
@ -327,12 +329,18 @@ class FIFOKernel:
closed by the caller closed by the caller
""" """
if not broken:
exc = ResourceClosed("stream has been closed")
else:
exc = ResourceBroken("stream might be corrupted")
for k in filter( for k in filter(
lambda o: o.fileobj == stream, lambda o: o.fileobj == stream,
dict(self.selector.get_map()).values(), dict(self.selector.get_map()).values(),
): ):
for task in k.data: for task in k.data:
self.handle_task_run(partial(task.throw, ResourceClosed("stream has been closed")), task) self.handle_task_run(
partial(task.throw, exc), task
)
def cancel(self, task: Task): def cancel(self, task: Task):
""" """
@ -341,14 +349,12 @@ class FIFOKernel:
""" """
self.reschedule_running() self.reschedule_running()
if task.done():
return
match task.state: match task.state:
case TaskState.IO: case TaskState.IO:
self.io_release_task(task) self.io_release_task(task)
case TaskState.PAUSED: case TaskState.PAUSED:
self.paused.discard(task) self.paused.discard(task)
case TaskState.INIT: case TaskState.INIT, TaskState.CANCELLED, TaskState.CRASHED:
return return
self.handle_task_run(partial(task.throw, Cancelled(task)), task) self.handle_task_run(partial(task.throw, Cancelled(task)), task)
if task.state == TaskState.CANCELLED: if task.state == TaskState.CANCELLED:

View File

@ -1,7 +1,7 @@
""" """
aiosched: Yet another Python async scheduler aiosched: Yet another Python async scheduler
Copyright (C) 2020 nocturn9x Copyright (C) 2022 nocturn9x
Licensed under the Apache License, Version 2.0 (the "License"); Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License. you may not use this file except in compliance with the License.
@ -23,6 +23,7 @@ from aiosched.kernel import FIFOKernel
from aiosched.errors import SchedulerError from aiosched.errors import SchedulerError
from aiosched.util.debugging import BaseDebugger from aiosched.util.debugging import BaseDebugger
from typing import Coroutine, Callable, Any from typing import Coroutine, Callable, Any
from aiosched.context import TaskContext
local_storage = local() local_storage = local()
@ -61,7 +62,11 @@ def new_event_loop(clock_function: Callable, debugger: BaseDebugger | None = Non
local_storage.loop = FIFOKernel(clock_function, debugger) local_storage.loop = FIFOKernel(clock_function, debugger)
def run(func: Callable[[Any, Any], Coroutine[Any, Any, Any]], debugger: BaseDebugger | None = None, *args, **kwargs): def run(
func: Callable[[Any, Any], Coroutine[Any, Any, Any]],
*args,
**kwargs
):
""" """
Starts the event loop from a synchronous entry point Starts the event loop from a synchronous entry point
""" """
@ -73,8 +78,23 @@ def run(func: Callable[[Any, Any], Coroutine[Any, Any, Any]], debugger: BaseDebu
) )
elif not inspect.iscoroutinefunction(func): elif not inspect.iscoroutinefunction(func):
raise SchedulerError("aiosched.run() requires an async function as parameter!") raise SchedulerError("aiosched.run() requires an async function as parameter!")
new_event_loop(kwargs.get("clock", default_timer), debugger) clock_function = default_timer
get_event_loop().start(func, *args) debugger = None
if "clock_function" in kwargs:
clock_function = kwargs.pop("clock_function")
if "debugger" in kwargs:
debugger = kwargs.pop("debugger")
new_event_loop(clock_function, debugger)
get_event_loop().start(func, *args, **kwargs)
def with_context() -> TaskContext:
"""
Creates and returns a new TaskContext
object
"""
return TaskContext()
def clock() -> float: def clock() -> float:

View File

@ -1,7 +1,7 @@
""" """
aiosched: Yet another Python async scheduler aiosched: Yet another Python async scheduler
Copyright (C) 2020 nocturn9x Copyright (C) 2022 nocturn9x
Licensed under the Apache License, Version 2.0 (the "License"); Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License. you may not use this file except in compliance with the License.

View File

@ -10,7 +10,7 @@ async def child(name: str, n: int):
await aiosched.sleep(n) await aiosched.sleep(n)
except aiosched.errors.Cancelled: except aiosched.errors.Cancelled:
print(f"[child {name}] Oh no, I've been cancelled!") print(f"[child {name}] Oh no, I've been cancelled!")
raise # We re-raise, or things break raise # We re-raise, or things break
print(f"[child {name}] Done! Slept for {aiosched.clock() - before:.2f} seconds") print(f"[child {name}] Done! Slept for {aiosched.clock() - before:.2f} seconds")
@ -32,5 +32,5 @@ async def main(children: list[tuple[str, int]]):
print(f"[main] Child #{i + 1} has exited") print(f"[main] Child #{i + 1} has exited")
print(f"[main] Children exited in {aiosched.clock() - before:.2f} seconds") print(f"[main] Children exited in {aiosched.clock() - before:.2f} seconds")
if __name__ == "__main__":
aiosched.run(main, None, [("first", 1), ("second", 2), ("third", 3)]) aiosched.run(main, [("first", 1), ("second", 2), ("third", 3)])

View File

@ -22,4 +22,5 @@ async def main(n: int):
print(f"[main] Child exited in {aiosched.clock() - before:.2f} seconds") print(f"[main] Child exited in {aiosched.clock() - before:.2f} seconds")
aiosched.run(main, None, 5) if __name__ == "__main__":
aiosched.run(main, 5)

20
tests/context_catch.py Normal file
View File

@ -0,0 +1,20 @@
import aiosched
from catch import child
from debugger import Debugger
async def main(children: list[tuple[str, int]]):
try:
async with aiosched.with_context() as ctx:
print("[main] Spawning children")
for name, delay in children:
await ctx.spawn(child, name, delay)
print(f"[main] Spawned {len(ctx.tasks)} children")
before = aiosched.clock()
except BaseException as err:
print(f"[main] Child raised an exception -> {type(err).__name__}: {err}")
print(f"[main] Children exited in {aiosched.clock() - before:.2f} seconds")
if __name__ == "__main__":
aiosched.run(main, [("first", 1), ("second", 2), ("third", 3)], debugger=None)

17
tests/context_wait.py Normal file
View File

@ -0,0 +1,17 @@
import aiosched
from wait import child
from debugger import Debugger
async def main(children: list[tuple[str, int]]):
print("[main] Spawning children")
async with aiosched.with_context() as ctx:
for name, delay in children:
await ctx.spawn(child, name, delay)
print(f"[main] Spawned {len(ctx.tasks)} children")
before = aiosched.clock()
print(f"[main] Children exited in {aiosched.clock() - before:.2f} seconds")
if __name__ == "__main__":
aiosched.run(main, [("first", 1), ("second", 2), ("third", 3)], debugger=None)

View File

@ -13,7 +13,9 @@ class Debugger(BaseDebugger):
print("## Finished running") print("## Finished running")
def on_task_schedule(self, task, delay: int): def on_task_schedule(self, task, delay: int):
print(f">> A task named '{task.name}' was scheduled to run in {delay:.2f} seconds") print(
f">> A task named '{task.name}' was scheduled to run in {delay:.2f} seconds"
)
def on_task_spawn(self, task): def on_task_spawn(self, task):
print(f">> A task named '{task.name}' was spawned") print(f">> A task named '{task.name}' was spawned")
@ -48,4 +50,4 @@ class Debugger(BaseDebugger):
print(f"// Cancelled '{task.name}'") print(f"// Cancelled '{task.name}'")
def on_exception_raised(self, task, exc): def on_exception_raised(self, task, exc):
print(f"== '{task.name}' raised {repr(exc)}") print(f"== '{task.name}' raised {repr(exc)}")

View File

@ -24,4 +24,5 @@ async def main(children: list[tuple[str, int]]):
print(f"[main] Children exited in {aiosched.clock() - before:.2f} seconds") print(f"[main] Children exited in {aiosched.clock() - before:.2f} seconds")
aiosched.run(main, Debugger(), [("first", 1), ("second", 2), ("third", 3)]) if __name__ == "__main__":
aiosched.run(main, [("first", 1), ("second", 2), ("third", 3)])