This commit is contained in:
Mattia Giambirtone 2023-05-18 22:05:43 +02:00
parent bb52655582
commit 2459a7ea00
Signed by: nocturn9x
GPG Key ID: 8270F9F467971E59
3 changed files with 11 additions and 25 deletions

View File

@ -44,9 +44,6 @@ class TaskScope:
""" """
current_loop().cancel_scope(self) current_loop().cancel_scope(self)
# We also cancel our owner when this
# method is called explicitly
current_loop().cancel_task(self.owner)
def get_actual_timeout(self): def get_actual_timeout(self):
""" """
@ -127,16 +124,11 @@ class TaskPool:
else: else:
await suspend() await suspend()
except Cancelled as e: except Cancelled as e:
if e.scope is not self.scope: self.error = e
self.error = e
self.scope.cancelled = True self.scope.cancelled = True
except (Exception, KeyboardInterrupt) as e: except (Exception, KeyboardInterrupt) as e:
self.error = e self.error = e
# We don't use self.scope.cancel() because self.scope.cancel()
# that raises a Cancelled exception in the
# task owning the scope, and we're already
# handling an error here!
current_loop().cancel_scope(self.scope)
finally: finally:
current_loop().close_pool(self) current_loop().close_pool(self)
self.scope.__exit__(exc_type, exc_val, exc_tb) self.scope.__exit__(exc_type, exc_val, exc_tb)

View File

@ -40,6 +40,7 @@ class FIFOKernel(BaseKernel):
self.current_scope = self.current_pool.scope self.current_scope = self.current_pool.scope
self.current_scope.shielded = False self.current_scope.shielded = False
self.scopes.append(self.current_scope) self.scopes.append(self.current_scope)
self._closing = False
def get_closest_deadline(self): def get_closest_deadline(self):
return min([self.current_scope.get_actual_timeout(), self.paused.get_closest_deadline()]) return min([self.current_scope.get_actual_timeout(), self.paused.get_closest_deadline()])
@ -135,6 +136,9 @@ class FIFOKernel(BaseKernel):
self.handle_errors(partial(task.coroutine.throw, err), task) self.handle_errors(partial(task.coroutine.throw, err), task)
def reschedule(self, task: Task): def reschedule(self, task: Task):
if task.done():
return
#print(__import__("inspect").stack()[1].function)
self.run_queue.append(task) self.run_queue.append(task)
def check_cancelled(self): def check_cancelled(self):
@ -277,15 +281,6 @@ class FIFOKernel(BaseKernel):
self.event("on_task_exit", task) self.event("on_task_exit", task)
self.io_manager.release_task(task) self.io_manager.release_task(task)
def close(self, force: bool = False):
super().close(force)
if force:
# Every task is in a scope deeper
# than our internal pool, so canceling
# it will cancel *everything* we are
# executing
self.pool.scope.cancel()
def on_error(self, task: Task): def on_error(self, task: Task):
""" """
The given task raised an exception The given task raised an exception
@ -297,8 +292,6 @@ class FIFOKernel(BaseKernel):
self.throw(task.pool.scope.owner, task.exc) self.throw(task.pool.scope.owner, task.exc)
task.waiters.clear() task.waiters.clear()
self.release(task) self.release(task)
if task.done() and task is self.entry_point:
self.close(force=True)
def on_cancel(self, task: Task): def on_cancel(self, task: Task):
""" """
@ -309,10 +302,9 @@ class FIFOKernel(BaseKernel):
for waiter in task.waiters: for waiter in task.waiters:
self.reschedule(waiter) self.reschedule(waiter)
task.waiters.clear() task.waiters.clear()
if task.pool.done():
task.pool.scope.cancelled = True
self.reschedule(task.pool.entry_point)
self.release(task) self.release(task)
if task.pool.done():
self.reschedule(task.pool.entry_point)
def init_scope(self, scope: TaskScope): def init_scope(self, scope: TaskScope):
scope.owner = self.current_task scope.owner = self.current_task
@ -342,6 +334,8 @@ class FIFOKernel(BaseKernel):
self.cancel_scope(inner) self.cancel_scope(inner)
for task in scope.tasks: for task in scope.tasks:
self.cancel_task(task) self.cancel_task(task)
if scope.done():
self.reschedule(scope.owner)
def init_pool(self, pool: TaskPool): def init_pool(self, pool: TaskPool):
pool.outer = self.current_pool pool.outer = self.current_pool

View File

@ -46,5 +46,5 @@ async def main_async_thread(i):
print(f"[main] Exited in {structio.clock() - j:.2f} seconds") print(f"[main] Exited in {structio.clock() - j:.2f} seconds")
#structio.run(main, 5) structio.run(main, 5)
structio.run(main_async_thread, 5) structio.run(main_async_thread, 5)