Fixes to cancellation
This commit is contained in:
parent
0560298f0f
commit
f81071f3b2
|
@ -157,11 +157,11 @@ async def wait(task: Task) -> Any | None:
|
||||||
raise SchedulerError("a task cannot join itself")
|
raise SchedulerError("a task cannot join itself")
|
||||||
if current not in task.joiners:
|
if current not in task.joiners:
|
||||||
# Luckily we use a set, so this has O(1)
|
# Luckily we use a set, so this has O(1)
|
||||||
# complexity
|
# complexity on average
|
||||||
await join(task) # Waiting implies joining!
|
await join(task) # Waiting implies joining!
|
||||||
await syscall("wait", task)
|
await syscall("wait", task)
|
||||||
if task.exc and task.state != TaskState.CANCELLED and task.propagate:
|
if task.exc and task.state != TaskState.CANCELLED and task.propagate:
|
||||||
# Task raised an error that wasn't directly caused by a cancellation:
|
# The task raised an error that wasn't directly caused by a cancellation:
|
||||||
# raise it, but do so only the first time wait was called
|
# raise it, but do so only the first time wait was called
|
||||||
task.propagate = False
|
task.propagate = False
|
||||||
raise task.exc
|
raise task.exc
|
||||||
|
@ -184,7 +184,7 @@ async def cancel(task: Task, block: bool = False):
|
||||||
:type block: bool, optional
|
:type block: bool, optional
|
||||||
"""
|
"""
|
||||||
|
|
||||||
await syscall("cancel", task, block)
|
await syscall("cancel", task)
|
||||||
if block:
|
if block:
|
||||||
await wait(task)
|
await wait(task)
|
||||||
if not task.state == TaskState.CANCELLED:
|
if not task.state == TaskState.CANCELLED:
|
||||||
|
|
|
@ -422,7 +422,7 @@ class FIFOKernel:
|
||||||
self.handle_errors(partial(k.data.throw, exc), k.data)
|
self.handle_errors(partial(k.data.throw, exc), k.data)
|
||||||
self.reschedule_running()
|
self.reschedule_running()
|
||||||
|
|
||||||
def cancel(self, task: Task, block: bool = True):
|
def cancel(self, task: Task):
|
||||||
"""
|
"""
|
||||||
Attempts to cancel the given task or
|
Attempts to cancel the given task or
|
||||||
schedules cancellation for later if
|
schedules cancellation for later if
|
||||||
|
@ -434,8 +434,7 @@ class FIFOKernel:
|
||||||
task.pending_cancellation = True
|
task.pending_cancellation = True
|
||||||
self.io_release_task(task)
|
self.io_release_task(task)
|
||||||
self.paused.discard(task)
|
self.paused.discard(task)
|
||||||
if not block:
|
self.reschedule_running()
|
||||||
self.reschedule_running()
|
|
||||||
|
|
||||||
def handle_errors(self, func: Callable, task: Task | None = None):
|
def handle_errors(self, func: Callable, task: Task | None = None):
|
||||||
"""
|
"""
|
||||||
|
@ -510,8 +509,6 @@ class FIFOKernel:
|
||||||
self.paused.discard(task)
|
self.paused.discard(task)
|
||||||
self.io_release_task(task)
|
self.io_release_task(task)
|
||||||
self.run_ready.extend(task.joiners)
|
self.run_ready.extend(task.joiners)
|
||||||
if task is not self.current_task:
|
|
||||||
self.reschedule_running()
|
|
||||||
|
|
||||||
def join(self, task: Task):
|
def join(self, task: Task):
|
||||||
"""
|
"""
|
||||||
|
|
Reference in New Issue