Added a proper timeout exception and enhance some tests

This commit is contained in:
Mattia Giambirtone 2023-05-16 11:59:01 +02:00
parent f7dedeeb6c
commit 15d0a0674f
Signed by: nocturn9x
GPG Key ID: 8270F9F467971E59
7 changed files with 80 additions and 15 deletions

View File

@ -6,7 +6,7 @@ from structio.core.managers.signals.sigint import SigIntManager
from structio.core.time.clock import DefaultClock from structio.core.time.clock import DefaultClock
from structio.core.syscalls import sleep from structio.core.syscalls import sleep
from structio.core.context import TaskPool, TaskScope from structio.core.context import TaskPool, TaskScope
from structio.core.exceptions import Cancelled from structio.core.exceptions import Cancelled, TimedOut
from structio.sync import Event, Queue, MemoryChannel, Semaphore from structio.sync import Event, Queue, MemoryChannel, Semaphore
from structio.core.abc import Channel, Stream, ChannelReader, ChannelWriter from structio.core.abc import Channel, Stream, ChannelReader, ChannelWriter
@ -80,5 +80,6 @@ __all__ = ["run",
"Stream", "Stream",
"ChannelReader", "ChannelReader",
"ChannelWriter", "ChannelWriter",
"Semaphore" "Semaphore",
"TimedOut"
] ]

View File

@ -63,7 +63,7 @@ class TaskScope:
def __exit__(self, exc_type: type, exc_val: BaseException, exc_tb): def __exit__(self, exc_type: type, exc_val: BaseException, exc_tb):
current_loop().close_scope(self) current_loop().close_scope(self)
if isinstance(exc_val, TimeoutError) and self.timed_out: if isinstance(exc_val, structio.TimedOut) and exc_val.scope is self:
return self.silent return self.silent
return False return False

View File

@ -10,4 +10,17 @@ class Cancelled(BaseException):
# ignore cancellations # ignore cancellations
""" """
A cancellation exception A cancellation exception
""" """
scope: "TaskScope"
class TimedOut(StructIOException):
"""
Raised when a task scope times out.
The scope attribute can be used to
know which scope originally timed
out
"""
scope: "TaskScope"

View File

@ -1,13 +1,12 @@
import traceback import traceback
import warnings import warnings
from types import FrameType from types import FrameType
from structio.core.abc import BaseKernel, BaseClock, BaseDebugger, BaseIOManager, SignalManager from structio.core.abc import BaseKernel, BaseClock, BaseDebugger, BaseIOManager, SignalManager
from structio.core.context import TaskPool, TaskScope from structio.core.context import TaskPool, TaskScope
from structio.core.task import Task, TaskState from structio.core.task import Task, TaskState
from structio.util.ki import CTRLC_PROTECTION_ENABLED from structio.util.ki import CTRLC_PROTECTION_ENABLED
from structio.core.time.queue import TimeQueue from structio.core.time.queue import TimeQueue
from structio.core.exceptions import StructIOException, Cancelled from structio.core.exceptions import StructIOException, Cancelled, TimedOut
from collections import deque from collections import deque
from typing import Callable, Coroutine, Any from typing import Callable, Coroutine, Any
from functools import partial from functools import partial
@ -141,8 +140,7 @@ class FIFOKernel(BaseKernel):
def check_cancelled(self): def check_cancelled(self):
if self.current_task.pending_cancellation: if self.current_task.pending_cancellation:
self.throw(self.current_task, Cancelled()) self.cancel_task(self.current_task)
def sleep(self, amount): def sleep(self, amount):
""" """
@ -169,7 +167,9 @@ class FIFOKernel(BaseKernel):
if scope.get_actual_timeout() <= self.clock.current_time(): if scope.get_actual_timeout() <= self.clock.current_time():
scope.timed_out = True scope.timed_out = True
scope.cancel() scope.cancel()
self.throw(scope.owner, TimeoutError("timed out")) error = TimedOut("timed out")
error.scope = scope
self.throw(scope.owner, error)
def wakeup(self): def wakeup(self):
while self.paused and self.paused.peek().next_deadline <= self.clock.current_time(): while self.paused and self.paused.peek().next_deadline <= self.clock.current_time():
@ -276,6 +276,7 @@ class FIFOKernel(BaseKernel):
""" """
self.event("on_exception_raised", task, task.exc) self.event("on_exception_raised", task, task.exc)
task.pool.scope.cancel()
current = task.pool.scope current = task.pool.scope
while current and current is not self.pool.scope: while current and current is not self.pool.scope:
# Unroll nested task scopes until one of # Unroll nested task scopes until one of
@ -294,7 +295,10 @@ class FIFOKernel(BaseKernel):
# proceed! # proceed!
break break
current = current.outer current = current.outer
self.throw(task.pool.scope.owner, task.exc)
self.release(task) self.release(task)
self.current_scope = task.pool.scope.outer
self.current_pool = task.pool.outer
def on_cancel(self, task: Task): def on_cancel(self, task: Task):
""" """
@ -312,7 +316,6 @@ class FIFOKernel(BaseKernel):
self.scopes.append(scope) self.scopes.append(scope)
def close_scope(self, scope: TaskScope): def close_scope(self, scope: TaskScope):
assert scope == self.current_scope
self.current_scope = scope.outer self.current_scope = scope.outer
self.reschedule(scope.owner) self.reschedule(scope.owner)
self.scopes.pop() self.scopes.pop()
@ -320,7 +323,9 @@ class FIFOKernel(BaseKernel):
def cancel_task(self, task: Task): def cancel_task(self, task: Task):
if task.done(): if task.done():
return return
self.throw(task, Cancelled()) err = Cancelled()
err.scope = task.pool.scope
self.throw(task, err)
if task.state != TaskState.CANCELLED: if task.state != TaskState.CANCELLED:
task.pending_cancellation = True task.pending_cancellation = True

View File

@ -36,9 +36,38 @@ async def main(
print(f"[main] Children exited in {structio.clock() - before:.2f} seconds") print(f"[main] Children exited in {structio.clock() - before:.2f} seconds")
async def main_nested(
children_outer: list[tuple[str, int]], children_inner: list[tuple[str, int]]
):
before = structio.clock()
try:
async with structio.create_pool() as p1:
print(f"[main] Spawning children in first context ({hex(id(p1))})")
for name, delay in children_outer:
p1.spawn(successful, name, delay)
print("[main] Children spawned")
async with structio.create_pool() as p2:
print(f"[main] Spawning children in second context ({hex(id(p2))})")
for name, delay in children_outer:
p2.spawn(successful, name, delay)
print("[main] Children spawned")
async with structio.create_pool() as p3:
print(f"[main] Spawning children in third context ({hex(id(p3))})")
for name, delay in children_inner:
p3.spawn(failing, name, delay)
print("[main] Children spawned")
except TypeError:
print("[main] TypeError caught!")
print(f"[main] Children exited in {structio.clock() - before:.2f} seconds")
if __name__ == "__main__": if __name__ == "__main__":
structio.run( structio.run(
main, main,
[("first", 1), ("third", 3)], [("first", 1), ("third", 3)],
[("second", 2), ("fourth", 4)], [("second", 2), ("fourth", 4)],
) )
structio.run(
main_nested,
[("first", 1), ("third", 3)],
[("second", 2), ("fourth", 4)],
)

View File

@ -12,8 +12,8 @@ async def sleeper(n):
print("[sleeper] Woke up!") print("[sleeper] Woke up!")
async def main(n, o, p): async def main_simple(n, o, p):
print(f"[main] Parent is alive, spawning {n} children sleeping {o} seconds each") print(f"[main] Parent is alive, spawning {o} children sleeping {n} seconds each")
t = structio.clock() t = structio.clock()
async with structio.create_pool() as pool: async with structio.create_pool() as pool:
for i in range(o): for i in range(o):
@ -25,4 +25,21 @@ async def main(n, o, p):
print(f"[main] Parent exited in {structio.clock() - t:.2f} seconds") print(f"[main] Parent exited in {structio.clock() - t:.2f} seconds")
structio.run(main, 5, 2, 2) async def main_nested(n, o, p):
print(f"[main] Parent is alive, spawning {o} children in two contexts sleeping {n} seconds each")
t = structio.clock()
async with structio.create_pool() as p1:
for i in range(o):
p1.spawn(sleeper, n)
async with structio.create_pool() as p2:
for i in range(o):
p2.spawn(sleeper, n)
print(f"[main] Children spawned, sleeping {p} seconds before cancelling")
await structio.sleep(p)
# Note that cancellations propagate to all inner task scopes!
p1.scope.cancel()
print(f"[main] Parent exited in {structio.clock() - t:.2f} seconds")
structio.run(main_simple, 5, 2, 2)
structio.run(main_nested, 5, 2, 2)

View File

@ -17,7 +17,7 @@ async def test_loud(i, j):
with structio.with_timeout(i) as scope: with structio.with_timeout(i) as scope:
print(f"[test] Sleeping for {j} seconds") print(f"[test] Sleeping for {j} seconds")
await structio.sleep(j) await structio.sleep(j)
except TimeoutError: except structio.TimedOut:
print("[test] Timed out!") print("[test] Timed out!")
print(f"[test] Finished in {structio.clock() - k:.2f} seconds") print(f"[test] Finished in {structio.clock() - k:.2f} seconds")