Fix more bugs. Proxy example now working

This commit is contained in:
Mattia Giambirtone 2024-03-25 19:28:23 +01:00
parent 90e182c28b
commit b8cdeef41c
4 changed files with 53 additions and 23 deletions

View File

@ -196,8 +196,8 @@ class DefaultKernel(BaseKernel):
elif self._sigint_handled and not critical_section(
self.current_task.coroutine.cr_frame
):
self._sigint_handled = False
runner = partial(self.current_task.coroutine.throw, KeyboardInterrupt())
self._raise_ki(self.current_task)
return
self.event("before_task_step", self.current_task)
self.current_task.state = TaskState.RUNNING
self.current_task.paused_when = 0
@ -235,7 +235,7 @@ class DefaultKernel(BaseKernel):
def check_cancelled(self, schedule: bool = True):
if self._sigint_handled:
self.throw(self.entry_point, KeyboardInterrupt())
self._raise_ki()
elif self.current_task.pending_cancellation:
self.current_task: Task
self.cancel_task(self.current_task)
@ -294,9 +294,38 @@ class DefaultKernel(BaseKernel):
)
self.reschedule(task)
def _pick_ki_task(self):
"""
Attempts to find a suitable task to
throw KeyboardInterrupt into
"""
if self.policy.has_next_task():
return self.policy.get_next_task()
if self.policy.has_paused_task():
return self.policy.get_paused_task()
if not self.entry_point.done():
return self.entry_point
raise StructIOException("unable to find a task to throw KeyboardInterrupt into")
def _raise_ki(self, task: Task | None = None):
"""
Raises a KeyboardInterrupt exception into a
task: If one is passed explicitly, the exception
is thrown there, otherwise a suitable task is
awakened and thrown into
"""
self._sigint_handled = False
self.throw(task or self._pick_ki_task(), KeyboardInterrupt())
def _tick(self):
"""
Runs a single event loop tick
"""
if self._sigint_handled and not self.restrict_ki_to_checkpoints:
self.throw(self.entry_point, KeyboardInterrupt())
self._raise_ki()
self.wakeup()
self.check_scopes()
self.step()
@ -445,12 +474,11 @@ class DefaultKernel(BaseKernel):
self.close(force=True)
raise task.exc from StructIOException(f"system task {task} crashed")
scope = task.scope
# self._reschedule_scope_tree(scope)
self._reschedule_scope_tree(scope)
self.release(task)
self.cancel_scope(scope)
if task is not scope.owner:
self.throw(scope.owner, task.exc)
# self.reschedule(scope.owner)
def on_cancel(self, task: Task):
"""
@ -459,7 +487,7 @@ class DefaultKernel(BaseKernel):
"""
assert task.state == TaskState.CANCELLED
# self._reschedule_scope_tree(task.scope)
self._reschedule_scope_tree(task.scope)
self.event("after_cancel", task)
self.release(task)
@ -495,26 +523,26 @@ class DefaultKernel(BaseKernel):
# every time said task tries to call any
# event loop primitive
task.pending_cancellation = True
if task.scope.done():
task.scope.cancelled = True
def cancel_scope(self, scope: TaskScope):
scope.attempted_cancel = True
# We can't just immediately cancel the
# current task because this method is
# called synchronously by TaskScope.cancel(),
# so there is nowhere to throw an exception
# to via throw()
if self.current_task in scope.tasks and self.current_task is not scope.owner:
self.current_task.pending_cancellation = True
for child in filter(lambda c: not c.shielded, scope.children):
self.cancel_scope(child)
for task in scope.tasks:
if task is self.current_task:
continue
self.cancel_task(task)
if scope.owner is not self.entry_point:
self.cancel_task(scope.owner)
if scope.done():
scope.cancelled = True
# We can't just immediately cancel the
# current task because this method is
# called synchronously by TaskScope.cancel(),
# so there is nowhere to throw an exception
# to: instead, we delay the cancellation until
# the next tick
if self.current_task in scope.tasks:
self.current_task.pending_cancellation = True
self.reschedule(self.current_task)
else:
self.cancel_task(task)
def init_pool(self, pool: TaskPool):
pool.outer = self.current_pool

View File

@ -50,6 +50,8 @@ class FIFOPolicy(SchedulingPolicy):
return self.paused.get()[0]
def schedule(self, task: Task, front: bool = False):
if self.is_scheduled(task):
return
task.state = TaskState.READY
if front:
self.run_queue.append(task)

View File

@ -27,4 +27,4 @@ async def main():
print("all done!")
structio.run(main, tools=[structio.util.debug.SimpleDebugger()])
structio.run(main)

View File

@ -63,6 +63,6 @@ async def main_child(x: float):
# Should take about 5 seconds
#structio.run(main_simple, 5, 2, 2)
#structio.run(main_nested, 5, 2, 2)
structio.run(main_simple, 5, 2, 2)
structio.run(main_nested, 5, 2, 2)
structio.run(main_child, 2)