From c2bb63149b1ddf425412719a4fcb833cf576b82b Mon Sep 17 00:00:00 2001 From: Nocturn9x Date: Wed, 19 Oct 2022 11:54:32 +0200 Subject: [PATCH] Added Event class and reformatted with black --- aiosched/__init__.py | 3 +- aiosched/context.py | 14 ++++++- aiosched/errors.py | 4 +- aiosched/internals/syscalls.py | 8 ++++ aiosched/kernel.py | 45 +++++++++++++++----- aiosched/runtime.py | 6 +-- aiosched/sync.py | 65 +++++++++++++++++++++++++++++ tests/cancel.py | 1 + tests/context_silent_catch.py | 4 +- tests/context_wait.py | 2 +- tests/events.py | 35 ++++++++++++++++ tests/nested_context_catch_inner.py | 11 ++++- tests/nested_context_catch_outer.py | 11 ++++- tests/nested_context_wait.py | 11 ++++- 14 files changed, 190 insertions(+), 30 deletions(-) create mode 100644 aiosched/sync.py create mode 100644 tests/events.py diff --git a/aiosched/__init__.py b/aiosched/__init__.py index be6fb89..f6db51a 100644 --- a/aiosched/__init__.py +++ b/aiosched/__init__.py @@ -20,7 +20,7 @@ from aiosched.internals.syscalls import spawn, wait, sleep, cancel import aiosched.task import aiosched.errors import aiosched.context - +from aiosched.sync import Event __all__ = [ "run", @@ -33,4 +33,5 @@ __all__ = [ "errors", "cancel", "with_context", + "Event", ] diff --git a/aiosched/context.py b/aiosched/context.py index 15d4b93..17d1a8a 100644 --- a/aiosched/context.py +++ b/aiosched/context.py @@ -17,7 +17,14 @@ limitations under the License. """ from aiosched.task import Task from aiosched.errors import Cancelled -from aiosched.internals.syscalls import spawn, wait, cancel, set_context, close_context, join +from aiosched.internals.syscalls import ( + spawn, + wait, + cancel, + set_context, + close_context, + join, +) from typing import Any, Coroutine, Callable @@ -139,7 +146,10 @@ class TaskContext(Task): continue if not task.done(): return False - if not isinstance(self.entry_point, TaskContext) and not self.entry_point.done(): + if ( + not isinstance(self.entry_point, TaskContext) + and not self.entry_point.done() + ): return False if self.inner: return self.inner.done() diff --git a/aiosched/errors.py b/aiosched/errors.py index 5e9a23b..98baa18 100644 --- a/aiosched/errors.py +++ b/aiosched/errors.py @@ -96,9 +96,7 @@ class ErrorStack(SchedulerError): tracebacks = "" for i, err in enumerate(self.errors): if i not in (1, len(self.errors)): - tracebacks += ( - f"\n{''.join(traceback.format_exception(type(err), err, err.__traceback__))}\n{'-' * 32}\n" - ) + tracebacks += f"\n{''.join(traceback.format_exception(type(err), err, err.__traceback__))}\n{'-' * 32}\n" else: tracebacks += f"\n{''.join(traceback.format_exception(type(err), err, err.__traceback__))}" return f"Multiple errors occurred:\n{tracebacks}" diff --git a/aiosched/internals/syscalls.py b/aiosched/internals/syscalls.py index 5642d5c..356aeeb 100644 --- a/aiosched/internals/syscalls.py +++ b/aiosched/internals/syscalls.py @@ -43,6 +43,14 @@ def syscall(method: str, *args, **kwargs) -> Any | None: return result +async def schedule(task: Task): + """ + Reschedules a task that had been + previously suspended + """ + + await syscall("schedule", task) + async def spawn(func: Callable[..., Coroutine[Any, Any, Any]], *args, **kwargs) -> Task: """ Spawns a task from a coroutine and returns it. The coroutine diff --git a/aiosched/kernel.py b/aiosched/kernel.py index 5a27d97..d174bb9 100644 --- a/aiosched/kernel.py +++ b/aiosched/kernel.py @@ -23,7 +23,13 @@ from timeit import default_timer from aiosched.internals.queue import TimeQueue from aiosched.util.debugging import BaseDebugger from typing import Callable, Any, Coroutine -from aiosched.errors import InternalError, ResourceBusy, Cancelled, ResourceClosed, ResourceBroken +from aiosched.errors import ( + InternalError, + ResourceBusy, + Cancelled, + ResourceClosed, + ResourceBroken, +) from aiosched.context import TaskContext from selectors import DefaultSelector, BaseSelector @@ -118,7 +124,9 @@ class FIFOKernel: """ if not self.done() and not force: - self.current_task.throw(InternalError("cannot shut down a running event loop")) + self.current_task.throw( + InternalError("cannot shut down a running event loop") + ) for task in self.all(): self.cancel(task) @@ -185,6 +193,15 @@ class FIFOKernel: self.run_ready.append(self.current_task) + def schedule(self, task: Task): + """ + Schedules a task that was previously + suspended + """ + + self.run_ready.append(task) + self.reschedule_running() + def suspend(self): """ Suspends execution of the current task. This is basically @@ -213,7 +230,7 @@ class FIFOKernel: # We need to make sure we don't try to execute # exited tasks that are on the running queue if not self.run_ready: - return # No more tasks to run! + return # No more tasks to run! self.current_task = self.run_ready.popleft() self.debugger.before_task_step(self.current_task) # Some debugging and internal chatter here @@ -226,15 +243,19 @@ class FIFOKernel: else: # Run a single step with the calculation (i.e. until a yield # somewhere) - method, args, kwargs = self.current_task.run(self.data.pop(self.current_task, None)) + method, args, kwargs = self.current_task.run( + self.data.pop(self.current_task, None) + ) if not hasattr(self, method) or not callable(getattr(self, method)): # This if block is meant to be triggered by other async # libraries, which most likely have different trap names and behaviors # compared to us. If you get this exception, and you're 100% sure you're # not mixing async primitives from other libraries, then it's a bug! - self.current_task.throw(InternalError( - "Uh oh! Something very bad just happened, did you try to mix primitives from other async libraries?" - )) + self.current_task.throw( + InternalError( + "Uh oh! Something very bad just happened, did you try to mix primitives from other async libraries?" + ) + ) # Sneaky method call, thanks to David Beazley for this ;) getattr(self, method)(*args, **kwargs) self.debugger.after_task_step(self.current_task) @@ -289,7 +310,11 @@ class FIFOKernel: self.run() finally: self.debugger.on_exit() - if self.entry_point.exc and self.entry_point.context is None and self.entry_point.propagate: + if ( + self.entry_point.exc + and self.entry_point.context is None + and self.entry_point.propagate + ): # Contexts already manage exceptions for us, # no need to raise it manually raise self.entry_point.exc @@ -336,9 +361,7 @@ class FIFOKernel: dict(self.selector.get_map()).values(), ): for task in k.data: - self.handle_task_run( - partial(task.throw, exc), task - ) + self.handle_task_run(partial(task.throw, exc), task) def cancel(self, task: Task): """ diff --git a/aiosched/runtime.py b/aiosched/runtime.py index d40d7c4..de821d2 100644 --- a/aiosched/runtime.py +++ b/aiosched/runtime.py @@ -62,11 +62,7 @@ def new_event_loop(clock_function: Callable, debugger: BaseDebugger | None = Non local_storage.loop = FIFOKernel(clock_function, debugger) -def run( - func: Callable[[Any, Any], Coroutine[Any, Any, Any]], - *args, - **kwargs -): +def run(func: Callable[[Any, Any], Coroutine[Any, Any, Any]], *args, **kwargs): """ Starts the event loop from a synchronous entry point """ diff --git a/aiosched/sync.py b/aiosched/sync.py new file mode 100644 index 0000000..a24d7a9 --- /dev/null +++ b/aiosched/sync.py @@ -0,0 +1,65 @@ +""" +aiosched: Yet another Python async scheduler + +Copyright (C) 2022 nocturn9x + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + https:www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +""" +from typing import Any +from aiosched.errors import SchedulerError +from aiosched.internals.syscalls import ( + suspend, + schedule, + current_task, +) + + +class Event: + """ + An asynchronous, non thread-safe event + """ + + def __init__(self): + """ + Object constructor + """ + + self.set = False + self.waiters = set() + + def reset(self): + """ + Resets the event's state + """ + + self.__init__() + + async def trigger(self): + """ + Sets the event, waking up all tasks that called + wait() on it + """ + + if self.set: + raise SchedulerError("The event has already been set") + self.set = True + for waiter in self.waiters: + await schedule(waiter) + + async def wait(self) -> Any: + """ + Waits until the event is set + """ + + self.waiters.add(await current_task()) + await suspend() # We get unsuspended by trigger() diff --git a/tests/cancel.py b/tests/cancel.py index 8eace86..0e1254e 100644 --- a/tests/cancel.py +++ b/tests/cancel.py @@ -32,5 +32,6 @@ async def main(children: list[tuple[str, int]]): print(f"[main] Child #{i + 1} has exited") print(f"[main] Children exited in {aiosched.clock() - before:.2f} seconds") + if __name__ == "__main__": aiosched.run(main, [("first", 1), ("second", 2), ("third", 3)]) diff --git a/tests/context_silent_catch.py b/tests/context_silent_catch.py index 2afac99..e5f4e03 100644 --- a/tests/context_silent_catch.py +++ b/tests/context_silent_catch.py @@ -11,7 +11,9 @@ async def main(children: list[tuple[str, int]]): print("[main] Children spawned") before = aiosched.clock() if ctx.exc: - print(f"[main] Child raised an exception -> {type(ctx.exc).__name__}: {ctx.exc}") + print( + f"[main] Child raised an exception -> {type(ctx.exc).__name__}: {ctx.exc}" + ) print(f"[main] Children exited in {aiosched.clock() - before:.2f} seconds") diff --git a/tests/context_wait.py b/tests/context_wait.py index 8f9cac6..c9b3d01 100644 --- a/tests/context_wait.py +++ b/tests/context_wait.py @@ -14,4 +14,4 @@ async def main(children: list[tuple[str, int]]): if __name__ == "__main__": - aiosched.run(main, [("first", 1), ("second", 2), ("third", 3)], debugger=None) \ No newline at end of file + aiosched.run(main, [("first", 1), ("second", 2), ("third", 3)], debugger=None) diff --git a/tests/events.py b/tests/events.py new file mode 100644 index 0000000..90ec023 --- /dev/null +++ b/tests/events.py @@ -0,0 +1,35 @@ +from debugger import Debugger +import aiosched + + +async def child(ev: aiosched.Event, pause: int): + print("[child] Child is alive! Going to wait until notified") + start_total = aiosched.clock() + await ev.wait() + end_pause = aiosched.clock() - start_total + print(f"[child] Parent set the event with, exiting in {pause} seconds") + start_sleep = aiosched.clock() + await aiosched.sleep(pause) + end_sleep = aiosched.clock() - start_sleep + end_total = aiosched.clock() - start_total + print( + f"[child] Done! Slept for {end_total:.2f} seconds total ({end_pause:.2f} waiting, {end_sleep:.2f} sleeping), nice nap!" + ) + + +async def parent(pause: int = 1): + async with aiosched.with_context() as ctx: + event = aiosched.Event() + print("[parent] Spawning child task") + await ctx.spawn(child, event, pause + 2) + start = aiosched.clock() + print(f"[parent] Sleeping {pause} second(s) before setting the event") + await aiosched.sleep(pause) + await event.trigger() + print("[parent] Event set, awaiting child completion") + end = aiosched.clock() - start + print(f"[parent] Child exited in {end:.2f} seconds") + + +if __name__ == "__main__": + aiosched.run(parent, 3, debugger=None) diff --git a/tests/nested_context_catch_inner.py b/tests/nested_context_catch_inner.py index 2317702..f9bbf95 100644 --- a/tests/nested_context_catch_inner.py +++ b/tests/nested_context_catch_inner.py @@ -4,7 +4,9 @@ from wait import child as successful from debugger import Debugger -async def main(children_outer: list[tuple[str, int]], children_inner: list[tuple[str, int]]): +async def main( + children_outer: list[tuple[str, int]], children_inner: list[tuple[str, int]] +): before = aiosched.clock() async with aiosched.with_context() as ctx: print("[main] Spawning children in first context") @@ -23,4 +25,9 @@ async def main(children_outer: list[tuple[str, int]], children_inner: list[tuple if __name__ == "__main__": - aiosched.run(main, [("first", 1), ("second", 2)], [("third", 3), ("fourth", 4)], debugger=None) + aiosched.run( + main, + [("first", 1), ("second", 2)], + [("third", 3), ("fourth", 4)], + debugger=None, + ) diff --git a/tests/nested_context_catch_outer.py b/tests/nested_context_catch_outer.py index cd96614..effa85b 100644 --- a/tests/nested_context_catch_outer.py +++ b/tests/nested_context_catch_outer.py @@ -4,7 +4,9 @@ from debugger import Debugger # TODO: This crashes 1 second later than it should be -async def main(children_outer: list[tuple[str, int]], children_inner: list[tuple[str, int]]): +async def main( + children_outer: list[tuple[str, int]], children_inner: list[tuple[str, int]] +): try: async with aiosched.with_context() as ctx: before = aiosched.clock() @@ -23,4 +25,9 @@ async def main(children_outer: list[tuple[str, int]], children_inner: list[tuple if __name__ == "__main__": - aiosched.run(main, [("first", 1), ("second", 2)], [("third", 3), ("fourth", 4)], debugger=None) + aiosched.run( + main, + [("first", 1), ("second", 2)], + [("third", 3), ("fourth", 4)], + debugger=None, + ) diff --git a/tests/nested_context_wait.py b/tests/nested_context_wait.py index 0ac9b24..fdad03d 100644 --- a/tests/nested_context_wait.py +++ b/tests/nested_context_wait.py @@ -3,7 +3,9 @@ from wait import child from debugger import Debugger -async def main(children_outer: list[tuple[str, int]], children_inner: list[tuple[str, int]]): +async def main( + children_outer: list[tuple[str, int]], children_inner: list[tuple[str, int]] +): async with aiosched.with_context() as ctx: before = aiosched.clock() print("[main] Spawning children in first context") @@ -19,4 +21,9 @@ async def main(children_outer: list[tuple[str, int]], children_inner: list[tuple if __name__ == "__main__": - aiosched.run(main, [("first", 1), ("second", 2)], [("third", 3), ("fourth", 4)], debugger=None) + aiosched.run( + main, + [("first", 1), ("second", 2)], + [("third", 3), ("fourth", 4)], + debugger=None, + )