From 2459a7ea00a77a32e2eb24cde6b1762b9a288dc9 Mon Sep 17 00:00:00 2001 From: Nocturn9x Date: Thu, 18 May 2023 22:05:43 +0200 Subject: [PATCH] Bugfixes --- structio/core/context.py | 12 ++---------- structio/core/kernels/fifo.py | 22 ++++++++-------------- tests/events.py | 2 +- 3 files changed, 11 insertions(+), 25 deletions(-) diff --git a/structio/core/context.py b/structio/core/context.py index 215aafc..856f19b 100644 --- a/structio/core/context.py +++ b/structio/core/context.py @@ -44,9 +44,6 @@ class TaskScope: """ current_loop().cancel_scope(self) - # We also cancel our owner when this - # method is called explicitly - current_loop().cancel_task(self.owner) def get_actual_timeout(self): """ @@ -127,16 +124,11 @@ class TaskPool: else: await suspend() except Cancelled as e: - if e.scope is not self.scope: - self.error = e + self.error = e self.scope.cancelled = True except (Exception, KeyboardInterrupt) as e: self.error = e - # We don't use self.scope.cancel() because - # that raises a Cancelled exception in the - # task owning the scope, and we're already - # handling an error here! - current_loop().cancel_scope(self.scope) + self.scope.cancel() finally: current_loop().close_pool(self) self.scope.__exit__(exc_type, exc_val, exc_tb) diff --git a/structio/core/kernels/fifo.py b/structio/core/kernels/fifo.py index d34e02c..9b02002 100644 --- a/structio/core/kernels/fifo.py +++ b/structio/core/kernels/fifo.py @@ -40,6 +40,7 @@ class FIFOKernel(BaseKernel): self.current_scope = self.current_pool.scope self.current_scope.shielded = False self.scopes.append(self.current_scope) + self._closing = False def get_closest_deadline(self): return min([self.current_scope.get_actual_timeout(), self.paused.get_closest_deadline()]) @@ -135,6 +136,9 @@ class FIFOKernel(BaseKernel): self.handle_errors(partial(task.coroutine.throw, err), task) def reschedule(self, task: Task): + if task.done(): + return + #print(__import__("inspect").stack()[1].function) self.run_queue.append(task) def check_cancelled(self): @@ -277,15 +281,6 @@ class FIFOKernel(BaseKernel): self.event("on_task_exit", task) self.io_manager.release_task(task) - def close(self, force: bool = False): - super().close(force) - if force: - # Every task is in a scope deeper - # than our internal pool, so canceling - # it will cancel *everything* we are - # executing - self.pool.scope.cancel() - def on_error(self, task: Task): """ The given task raised an exception @@ -297,8 +292,6 @@ class FIFOKernel(BaseKernel): self.throw(task.pool.scope.owner, task.exc) task.waiters.clear() self.release(task) - if task.done() and task is self.entry_point: - self.close(force=True) def on_cancel(self, task: Task): """ @@ -309,10 +302,9 @@ class FIFOKernel(BaseKernel): for waiter in task.waiters: self.reschedule(waiter) task.waiters.clear() - if task.pool.done(): - task.pool.scope.cancelled = True - self.reschedule(task.pool.entry_point) self.release(task) + if task.pool.done(): + self.reschedule(task.pool.entry_point) def init_scope(self, scope: TaskScope): scope.owner = self.current_task @@ -342,6 +334,8 @@ class FIFOKernel(BaseKernel): self.cancel_scope(inner) for task in scope.tasks: self.cancel_task(task) + if scope.done(): + self.reschedule(scope.owner) def init_pool(self, pool: TaskPool): pool.outer = self.current_pool diff --git a/tests/events.py b/tests/events.py index 4b1cbd0..370f67a 100644 --- a/tests/events.py +++ b/tests/events.py @@ -46,5 +46,5 @@ async def main_async_thread(i): print(f"[main] Exited in {structio.clock() - j:.2f} seconds") -#structio.run(main, 5) +structio.run(main, 5) structio.run(main_async_thread, 5)