diff --git a/README.md b/README.md index c05af4b..d7d0259 100644 --- a/README.md +++ b/README.md @@ -1,3 +1,18 @@ # aiosched: Yet another Python async scheduler -TODO \ No newline at end of file +## Reasoning + +Why make yet another async library? Well, there's a few reasons really: +- I'm bored and love reinventing the wheel +- My [first attempt](https://git.nocturn9x.space/nocturn9x/giambio) at writing a modern, fully-featured async scheduler +helped me understand a lot of things about how such a library should be designed, so I'm starting from scratch to apply +what I've learned and hopefully not make the same mistakes as last time +- Did I mention that I'm bored? + +## Disclaimer + +Everything is very much a work in progress for now, which is reflected by the versioning scheme I'm using (0.0.x): API +changes may (and likely will) occur at any time even between commits, at least until I get to something simple, stable and +most importantly easy to reason about both while reading the library's code and while using it. Once the API is deemed +stable and everything is well tested, I'll start releasing 1.x.y versions of the library (probably under a different name, +as that's a work in progress too). \ No newline at end of file diff --git a/aiosched/__init__.py b/aiosched/__init__.py index f8bbda2..be6fb89 100644 --- a/aiosched/__init__.py +++ b/aiosched/__init__.py @@ -1,7 +1,7 @@ """ aiosched: Yet another Python async scheduler -Copyright (C) 2020 nocturn9x +Copyright (C) 2022 nocturn9x Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. @@ -15,9 +15,22 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. """ -from aiosched.runtime import run, get_event_loop, new_event_loop, clock +from aiosched.runtime import run, get_event_loop, new_event_loop, clock, with_context from aiosched.internals.syscalls import spawn, wait, sleep, cancel import aiosched.task import aiosched.errors +import aiosched.context -__all__ = ["run", "get_event_loop", "new_event_loop", "spawn", "wait", "sleep", "task", "errors", "cancel"] + +__all__ = [ + "run", + "get_event_loop", + "new_event_loop", + "spawn", + "wait", + "sleep", + "task", + "errors", + "cancel", + "with_context", +] diff --git a/aiosched/context.py b/aiosched/context.py new file mode 100644 index 0000000..56b758e --- /dev/null +++ b/aiosched/context.py @@ -0,0 +1,114 @@ +""" +aiosched: Yet another Python async scheduler + +Copyright (C) 2022 nocturn9x + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + https:www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +""" +from aiosched.task import Task +from aiosched.internals.syscalls import spawn, wait, cancel +from typing import Any, Coroutine, Callable + + +class TaskContext(Task): + """ + An asynchronous task context that automatically waits + for all tasks spawned within it. A TaskContext object + behaves like a task and is handled as a single unit + inside the event loop + """ + + name: str = "" + + def __init__(self) -> None: + """ + Object constructor + """ + + # All the tasks that belong to this context. This + # includes any inner contexts contained within this + # one + self.tasks: list[Task | "TaskContext"] = [] + self.name = object.__repr__(self) + # Whether we have been cancelled or not + self.cancelled: bool = False + + async def spawn( + self, func: Callable[..., Coroutine[Any, Any, Any]], *args, **kwargs + ) -> Task: + """ + Spawns a child task + """ + + task = await spawn(func, *args, **kwargs) + self.tasks.append(task) + return task + + async def __aenter__(self): + """ + Implements the asynchronous context manager interface + """ + + return self + + async def __aexit__(self, exc_type: Exception, exc: Exception, tb): + """ + Implements the asynchronous context manager interface, waiting + for all the tasks spawned inside the context + """ + + for task in self.tasks: + # This forces the interpreter to stop at the + # end of the block and wait for all + # children to exit + try: + await wait(task) + self.tasks.remove(task) + except BaseException: + self.tasks.remove(task) + await self.cancel() + raise + + async def cancel(self): + """ + Cancels the entire context, iterating over all + of its tasks and cancelling them + """ + + for task in self.tasks: + await cancel(task) + self.cancelled = True + self.tasks = [] + + def done(self) -> bool: + """ + Returns True if all the tasks inside the + context have exited, False otherwise + """ + + return all([task.done() for task in self.tasks]) + + def __del__(self): + """ + Context destructor + """ + + for task in self.tasks: + task.__del__() + + def __repr__(self): + """ + Implements repr(self) + """ + + return object.__repr__(self) diff --git a/aiosched/errors.py b/aiosched/errors.py index d04412e..06ba2fb 100644 --- a/aiosched/errors.py +++ b/aiosched/errors.py @@ -1,7 +1,7 @@ """ aiosched: Yet another Python async scheduler -Copyright (C) 2020 nocturn9x +Copyright (C) 2022 nocturn9x Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. @@ -44,6 +44,13 @@ class ResourceClosed(SchedulerError): """ +class ResourceBroken(SchedulerError): + """ + Raised when I/O is attempted on a broken + resource + """ + + class TimedOutError(SchedulerError): """ This is raised if a timeout expires diff --git a/aiosched/internals/__init__.py b/aiosched/internals/__init__.py index bd705b6..3594d6e 100644 --- a/aiosched/internals/__init__.py +++ b/aiosched/internals/__init__.py @@ -1,7 +1,7 @@ """ aiosched: Yet another Python async scheduler -Copyright (C) 2020 nocturn9x +Copyright (C) 2022 nocturn9x Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. diff --git a/aiosched/internals/queues.py b/aiosched/internals/queues.py index 158d25e..9cb7190 100644 --- a/aiosched/internals/queues.py +++ b/aiosched/internals/queues.py @@ -1,7 +1,7 @@ """ aiosched: Yet another Python async scheduler -Copyright (C) 2020 nocturn9x +Copyright (C) 2022 nocturn9x Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. diff --git a/aiosched/internals/syscalls.py b/aiosched/internals/syscalls.py index 05d45a0..6cbbb40 100644 --- a/aiosched/internals/syscalls.py +++ b/aiosched/internals/syscalls.py @@ -1,7 +1,7 @@ """ aiosched: Yet another Python async scheduler -Copyright (C) 2020 nocturn9x +Copyright (C) 2022 nocturn9x Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. diff --git a/aiosched/kernel.py b/aiosched/kernel.py index 484b061..cef91eb 100644 --- a/aiosched/kernel.py +++ b/aiosched/kernel.py @@ -1,7 +1,7 @@ """ aiosched: Yet another Python async scheduler -Copyright (C) 2020 nocturn9x +Copyright (C) 2022 nocturn9x Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. @@ -23,7 +23,7 @@ from timeit import default_timer from aiosched.internals.queues import TimeQueue from aiosched.util.debugging import BaseDebugger from typing import Callable, Any, Coroutine -from aiosched.errors import InternalError, ResourceBusy, Cancelled, ResourceClosed +from aiosched.errors import InternalError, ResourceBusy, Cancelled, ResourceClosed, ResourceBroken from selectors import DefaultSelector, BaseSelector @@ -227,7 +227,9 @@ class FIFOKernel: else: # Run a single step with the calculation (i.e. until a yield # somewhere) - method, args, kwargs = self.current_task.run(self.data.get(self.current_task)) + method, args, kwargs = self.current_task.run( + self.data.get(self.current_task) + ) self.data.pop(self.current_task, None) if not hasattr(self, method) and not callable(getattr(self, method)): # This if block is meant to be triggered by other async @@ -277,21 +279,20 @@ class FIFOKernel: # Otherwise, while there are tasks ready to run, we run them! self.handle_task_run(self.run_task_step) - def start(self, func: Callable[..., Coroutine[Any, Any, Any]], *args, loop: bool = True) -> Any: + def start( + self, func: Callable[..., Coroutine[Any, Any, Any]], *args, **kwargs + ) -> Any: """ - Starts the event loop from a synchronous context. If the loop parameter - is false, the event loop will not start listening for events - automatically and the dispatching is on the users' shoulders + Starts the event loop from a synchronous context """ - entry_point = Task(func.__name__ or str(func), func(*args)) + entry_point = Task(func.__name__ or str(func), func(*args, **kwargs)) self.run_ready.append(entry_point) self.debugger.on_start() - if loop: - try: - self.run() - finally: - self.debugger.on_exit() + try: + self.run() + finally: + self.debugger.on_exit() if entry_point.exc: raise entry_point.exc return entry_point.result @@ -316,10 +317,11 @@ class FIFOKernel: if task in key.data: key.data.remove(task) if not key.data: + self.notify_closing(key.fileobj, broken=True) self.selector.unregister(key.fileobj) task.last_io = () - def notify_closing(self, stream): + def notify_closing(self, stream, broken: bool = False): """ Notifies paused tasks that a stream is about to be closed. The stream @@ -327,12 +329,18 @@ class FIFOKernel: closed by the caller """ + if not broken: + exc = ResourceClosed("stream has been closed") + else: + exc = ResourceBroken("stream might be corrupted") for k in filter( lambda o: o.fileobj == stream, dict(self.selector.get_map()).values(), ): for task in k.data: - self.handle_task_run(partial(task.throw, ResourceClosed("stream has been closed")), task) + self.handle_task_run( + partial(task.throw, exc), task + ) def cancel(self, task: Task): """ @@ -341,14 +349,12 @@ class FIFOKernel: """ self.reschedule_running() - if task.done(): - return match task.state: case TaskState.IO: self.io_release_task(task) case TaskState.PAUSED: self.paused.discard(task) - case TaskState.INIT: + case TaskState.INIT, TaskState.CANCELLED, TaskState.CRASHED: return self.handle_task_run(partial(task.throw, Cancelled(task)), task) if task.state == TaskState.CANCELLED: diff --git a/aiosched/runtime.py b/aiosched/runtime.py index 3779b52..9e70cb2 100644 --- a/aiosched/runtime.py +++ b/aiosched/runtime.py @@ -1,7 +1,7 @@ """ aiosched: Yet another Python async scheduler -Copyright (C) 2020 nocturn9x +Copyright (C) 2022 nocturn9x Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. @@ -23,6 +23,7 @@ from aiosched.kernel import FIFOKernel from aiosched.errors import SchedulerError from aiosched.util.debugging import BaseDebugger from typing import Coroutine, Callable, Any +from aiosched.context import TaskContext local_storage = local() @@ -61,7 +62,11 @@ def new_event_loop(clock_function: Callable, debugger: BaseDebugger | None = Non local_storage.loop = FIFOKernel(clock_function, debugger) -def run(func: Callable[[Any, Any], Coroutine[Any, Any, Any]], debugger: BaseDebugger | None = None, *args, **kwargs): +def run( + func: Callable[[Any, Any], Coroutine[Any, Any, Any]], + *args, + **kwargs +): """ Starts the event loop from a synchronous entry point """ @@ -73,8 +78,23 @@ def run(func: Callable[[Any, Any], Coroutine[Any, Any, Any]], debugger: BaseDebu ) elif not inspect.iscoroutinefunction(func): raise SchedulerError("aiosched.run() requires an async function as parameter!") - new_event_loop(kwargs.get("clock", default_timer), debugger) - get_event_loop().start(func, *args) + clock_function = default_timer + debugger = None + if "clock_function" in kwargs: + clock_function = kwargs.pop("clock_function") + if "debugger" in kwargs: + debugger = kwargs.pop("debugger") + new_event_loop(clock_function, debugger) + get_event_loop().start(func, *args, **kwargs) + + +def with_context() -> TaskContext: + """ + Creates and returns a new TaskContext + object + """ + + return TaskContext() def clock() -> float: diff --git a/aiosched/task.py b/aiosched/task.py index 219acf6..b1e4c2c 100644 --- a/aiosched/task.py +++ b/aiosched/task.py @@ -1,7 +1,7 @@ """ aiosched: Yet another Python async scheduler -Copyright (C) 2020 nocturn9x +Copyright (C) 2022 nocturn9x Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. diff --git a/tests/cancel.py b/tests/cancel.py index ba94ec0..8eace86 100644 --- a/tests/cancel.py +++ b/tests/cancel.py @@ -10,7 +10,7 @@ async def child(name: str, n: int): await aiosched.sleep(n) except aiosched.errors.Cancelled: print(f"[child {name}] Oh no, I've been cancelled!") - raise # We re-raise, or things break + raise # We re-raise, or things break print(f"[child {name}] Done! Slept for {aiosched.clock() - before:.2f} seconds") @@ -32,5 +32,5 @@ async def main(children: list[tuple[str, int]]): print(f"[main] Child #{i + 1} has exited") print(f"[main] Children exited in {aiosched.clock() - before:.2f} seconds") - -aiosched.run(main, None, [("first", 1), ("second", 2), ("third", 3)]) +if __name__ == "__main__": + aiosched.run(main, [("first", 1), ("second", 2), ("third", 3)]) diff --git a/tests/catch.py b/tests/catch.py index 91b9dd8..1b9c1df 100644 --- a/tests/catch.py +++ b/tests/catch.py @@ -22,4 +22,5 @@ async def main(n: int): print(f"[main] Child exited in {aiosched.clock() - before:.2f} seconds") -aiosched.run(main, None, 5) +if __name__ == "__main__": + aiosched.run(main, 5) diff --git a/tests/context_catch.py b/tests/context_catch.py new file mode 100644 index 0000000..465bb44 --- /dev/null +++ b/tests/context_catch.py @@ -0,0 +1,20 @@ +import aiosched +from catch import child +from debugger import Debugger + + +async def main(children: list[tuple[str, int]]): + try: + async with aiosched.with_context() as ctx: + print("[main] Spawning children") + for name, delay in children: + await ctx.spawn(child, name, delay) + print(f"[main] Spawned {len(ctx.tasks)} children") + before = aiosched.clock() + except BaseException as err: + print(f"[main] Child raised an exception -> {type(err).__name__}: {err}") + print(f"[main] Children exited in {aiosched.clock() - before:.2f} seconds") + + +if __name__ == "__main__": + aiosched.run(main, [("first", 1), ("second", 2), ("third", 3)], debugger=None) diff --git a/tests/context_wait.py b/tests/context_wait.py new file mode 100644 index 0000000..05cdff3 --- /dev/null +++ b/tests/context_wait.py @@ -0,0 +1,17 @@ +import aiosched +from wait import child +from debugger import Debugger + + +async def main(children: list[tuple[str, int]]): + print("[main] Spawning children") + async with aiosched.with_context() as ctx: + for name, delay in children: + await ctx.spawn(child, name, delay) + print(f"[main] Spawned {len(ctx.tasks)} children") + before = aiosched.clock() + print(f"[main] Children exited in {aiosched.clock() - before:.2f} seconds") + + +if __name__ == "__main__": + aiosched.run(main, [("first", 1), ("second", 2), ("third", 3)], debugger=None) diff --git a/tests/debugger.py b/tests/debugger.py index 57c7c81..ab33212 100644 --- a/tests/debugger.py +++ b/tests/debugger.py @@ -13,7 +13,9 @@ class Debugger(BaseDebugger): print("## Finished running") def on_task_schedule(self, task, delay: int): - print(f">> A task named '{task.name}' was scheduled to run in {delay:.2f} seconds") + print( + f">> A task named '{task.name}' was scheduled to run in {delay:.2f} seconds" + ) def on_task_spawn(self, task): print(f">> A task named '{task.name}' was spawned") @@ -48,4 +50,4 @@ class Debugger(BaseDebugger): print(f"// Cancelled '{task.name}'") def on_exception_raised(self, task, exc): - print(f"== '{task.name}' raised {repr(exc)}") \ No newline at end of file + print(f"== '{task.name}' raised {repr(exc)}") diff --git a/tests/wait.py b/tests/wait.py index e2e84e5..fe43559 100644 --- a/tests/wait.py +++ b/tests/wait.py @@ -24,4 +24,5 @@ async def main(children: list[tuple[str, int]]): print(f"[main] Children exited in {aiosched.clock() - before:.2f} seconds") -aiosched.run(main, Debugger(), [("first", 1), ("second", 2), ("third", 3)]) +if __name__ == "__main__": + aiosched.run(main, [("first", 1), ("second", 2), ("third", 3)])