From 15d0a0674ff680f21aa4a8a8772a2ff203796faf Mon Sep 17 00:00:00 2001 From: nocturn9x Date: Tue, 16 May 2023 11:59:01 +0200 Subject: [PATCH] Added a proper timeout exception and enhance some tests --- structio/__init__.py | 5 +++-- structio/core/context.py | 2 +- structio/core/exceptions.py | 15 ++++++++++++++- structio/core/kernels/fifo.py | 19 ++++++++++++------- tests/nested_pool_inner_raises.py | 29 +++++++++++++++++++++++++++++ tests/self_cancel.py | 23 ++++++++++++++++++++--- tests/timeouts.py | 2 +- 7 files changed, 80 insertions(+), 15 deletions(-) diff --git a/structio/__init__.py b/structio/__init__.py index 86c1310..9244789 100644 --- a/structio/__init__.py +++ b/structio/__init__.py @@ -6,7 +6,7 @@ from structio.core.managers.signals.sigint import SigIntManager from structio.core.time.clock import DefaultClock from structio.core.syscalls import sleep from structio.core.context import TaskPool, TaskScope -from structio.core.exceptions import Cancelled +from structio.core.exceptions import Cancelled, TimedOut from structio.sync import Event, Queue, MemoryChannel, Semaphore from structio.core.abc import Channel, Stream, ChannelReader, ChannelWriter @@ -80,5 +80,6 @@ __all__ = ["run", "Stream", "ChannelReader", "ChannelWriter", - "Semaphore" + "Semaphore", + "TimedOut" ] diff --git a/structio/core/context.py b/structio/core/context.py index 806f5ba..4d00d9a 100644 --- a/structio/core/context.py +++ b/structio/core/context.py @@ -63,7 +63,7 @@ class TaskScope: def __exit__(self, exc_type: type, exc_val: BaseException, exc_tb): current_loop().close_scope(self) - if isinstance(exc_val, TimeoutError) and self.timed_out: + if isinstance(exc_val, structio.TimedOut) and exc_val.scope is self: return self.silent return False diff --git a/structio/core/exceptions.py b/structio/core/exceptions.py index 08c6199..75f9146 100644 --- a/structio/core/exceptions.py +++ b/structio/core/exceptions.py @@ -10,4 +10,17 @@ class Cancelled(BaseException): # ignore cancellations """ A cancellation exception - """ \ No newline at end of file + """ + + scope: "TaskScope" + + +class TimedOut(StructIOException): + """ + Raised when a task scope times out. + The scope attribute can be used to + know which scope originally timed + out + """ + + scope: "TaskScope" diff --git a/structio/core/kernels/fifo.py b/structio/core/kernels/fifo.py index 872d261..8df9efa 100644 --- a/structio/core/kernels/fifo.py +++ b/structio/core/kernels/fifo.py @@ -1,13 +1,12 @@ import traceback import warnings from types import FrameType - from structio.core.abc import BaseKernel, BaseClock, BaseDebugger, BaseIOManager, SignalManager from structio.core.context import TaskPool, TaskScope from structio.core.task import Task, TaskState from structio.util.ki import CTRLC_PROTECTION_ENABLED from structio.core.time.queue import TimeQueue -from structio.core.exceptions import StructIOException, Cancelled +from structio.core.exceptions import StructIOException, Cancelled, TimedOut from collections import deque from typing import Callable, Coroutine, Any from functools import partial @@ -141,8 +140,7 @@ class FIFOKernel(BaseKernel): def check_cancelled(self): if self.current_task.pending_cancellation: - self.throw(self.current_task, Cancelled()) - + self.cancel_task(self.current_task) def sleep(self, amount): """ @@ -169,7 +167,9 @@ class FIFOKernel(BaseKernel): if scope.get_actual_timeout() <= self.clock.current_time(): scope.timed_out = True scope.cancel() - self.throw(scope.owner, TimeoutError("timed out")) + error = TimedOut("timed out") + error.scope = scope + self.throw(scope.owner, error) def wakeup(self): while self.paused and self.paused.peek().next_deadline <= self.clock.current_time(): @@ -276,6 +276,7 @@ class FIFOKernel(BaseKernel): """ self.event("on_exception_raised", task, task.exc) + task.pool.scope.cancel() current = task.pool.scope while current and current is not self.pool.scope: # Unroll nested task scopes until one of @@ -294,7 +295,10 @@ class FIFOKernel(BaseKernel): # proceed! break current = current.outer + self.throw(task.pool.scope.owner, task.exc) self.release(task) + self.current_scope = task.pool.scope.outer + self.current_pool = task.pool.outer def on_cancel(self, task: Task): """ @@ -312,7 +316,6 @@ class FIFOKernel(BaseKernel): self.scopes.append(scope) def close_scope(self, scope: TaskScope): - assert scope == self.current_scope self.current_scope = scope.outer self.reschedule(scope.owner) self.scopes.pop() @@ -320,7 +323,9 @@ class FIFOKernel(BaseKernel): def cancel_task(self, task: Task): if task.done(): return - self.throw(task, Cancelled()) + err = Cancelled() + err.scope = task.pool.scope + self.throw(task, err) if task.state != TaskState.CANCELLED: task.pending_cancellation = True diff --git a/tests/nested_pool_inner_raises.py b/tests/nested_pool_inner_raises.py index 530e8bb..dab250f 100644 --- a/tests/nested_pool_inner_raises.py +++ b/tests/nested_pool_inner_raises.py @@ -36,9 +36,38 @@ async def main( print(f"[main] Children exited in {structio.clock() - before:.2f} seconds") +async def main_nested( + children_outer: list[tuple[str, int]], children_inner: list[tuple[str, int]] +): + before = structio.clock() + try: + async with structio.create_pool() as p1: + print(f"[main] Spawning children in first context ({hex(id(p1))})") + for name, delay in children_outer: + p1.spawn(successful, name, delay) + print("[main] Children spawned") + async with structio.create_pool() as p2: + print(f"[main] Spawning children in second context ({hex(id(p2))})") + for name, delay in children_outer: + p2.spawn(successful, name, delay) + print("[main] Children spawned") + async with structio.create_pool() as p3: + print(f"[main] Spawning children in third context ({hex(id(p3))})") + for name, delay in children_inner: + p3.spawn(failing, name, delay) + print("[main] Children spawned") + except TypeError: + print("[main] TypeError caught!") + print(f"[main] Children exited in {structio.clock() - before:.2f} seconds") + if __name__ == "__main__": structio.run( main, [("first", 1), ("third", 3)], [("second", 2), ("fourth", 4)], ) + structio.run( + main_nested, + [("first", 1), ("third", 3)], + [("second", 2), ("fourth", 4)], + ) diff --git a/tests/self_cancel.py b/tests/self_cancel.py index be5dab1..b1e814a 100644 --- a/tests/self_cancel.py +++ b/tests/self_cancel.py @@ -12,8 +12,8 @@ async def sleeper(n): print("[sleeper] Woke up!") -async def main(n, o, p): - print(f"[main] Parent is alive, spawning {n} children sleeping {o} seconds each") +async def main_simple(n, o, p): + print(f"[main] Parent is alive, spawning {o} children sleeping {n} seconds each") t = structio.clock() async with structio.create_pool() as pool: for i in range(o): @@ -25,4 +25,21 @@ async def main(n, o, p): print(f"[main] Parent exited in {structio.clock() - t:.2f} seconds") -structio.run(main, 5, 2, 2) +async def main_nested(n, o, p): + print(f"[main] Parent is alive, spawning {o} children in two contexts sleeping {n} seconds each") + t = structio.clock() + async with structio.create_pool() as p1: + for i in range(o): + p1.spawn(sleeper, n) + async with structio.create_pool() as p2: + for i in range(o): + p2.spawn(sleeper, n) + print(f"[main] Children spawned, sleeping {p} seconds before cancelling") + await structio.sleep(p) + # Note that cancellations propagate to all inner task scopes! + p1.scope.cancel() + print(f"[main] Parent exited in {structio.clock() - t:.2f} seconds") + + +structio.run(main_simple, 5, 2, 2) +structio.run(main_nested, 5, 2, 2) diff --git a/tests/timeouts.py b/tests/timeouts.py index 0d784a6..16bbcb8 100644 --- a/tests/timeouts.py +++ b/tests/timeouts.py @@ -17,7 +17,7 @@ async def test_loud(i, j): with structio.with_timeout(i) as scope: print(f"[test] Sleeping for {j} seconds") await structio.sleep(j) - except TimeoutError: + except structio.TimedOut: print("[test] Timed out!") print(f"[test] Finished in {structio.clock() - k:.2f} seconds")