From 10c1b33e201cb91e52b1aa881e3abe27f0ea7e7a Mon Sep 17 00:00:00 2001 From: nocturn9x Date: Mon, 16 Nov 2020 21:49:13 +0100 Subject: [PATCH] join() partially fixed --- giambio/context.py | 7 +++---- giambio/core.py | 48 ++++++++++++++++++++-------------------------- giambio/objects.py | 12 +++++------- giambio/traps.py | 4 ++-- tests/count.py | 4 ++-- 5 files changed, 33 insertions(+), 42 deletions(-) diff --git a/giambio/context.py b/giambio/context.py index 8ff98aa..b3c7df3 100644 --- a/giambio/context.py +++ b/giambio/context.py @@ -61,7 +61,6 @@ class TaskManager: for task in self.tasks: try: await task.join() - except BaseException as task_error: - for dead in self.tasks: - await dead.cancel() - raise task.exc + except BaseException: + for to_cancel in self.tasks: + await to_cancel.cancel() \ No newline at end of file diff --git a/giambio/core.py b/giambio/core.py index dfb5790..b12392b 100644 --- a/giambio/core.py +++ b/giambio/core.py @@ -129,19 +129,19 @@ class AsyncScheduler: # Sneaky method call, thanks to David Beazley for this ;) getattr(self, method)(*args) except CancelledError: - self.current_task.status = "end" + self.current_task.status = "cancelled" self.current_task.cancelled = True self.current_task.cancel_pending = False - self.join(self.current_task, self.current_task.parent) except StopIteration as ret: # Coroutine ends self.current_task.status = "end" self.current_task.result = ret.value self.current_task.finished = True + self.join() except BaseException as err: self.current_task.exc = err self.current_task.status = "crashed" - self.join(self.current_task, self.current_task.parent) + self.join() def do_cancel(self): """ @@ -151,6 +151,7 @@ class AsyncScheduler: """ self.current_task.throw(CancelledError) + self.current_task.coroutine.close() def get_running(self): """ @@ -218,40 +219,30 @@ class AsyncScheduler: currently running task, if any """ - parent = self.current_task.parent - if parent: + if parent := self.current_task.parent: self.tasks.append(parent) - return parent def reschedule_joinee(self): """ - Reschedules the joinee task of the + Reschedules the joinee(s) task of the currently running task, if any """ self.tasks.extend(self.current_task.waiters) - def join(self, child: types.coroutine, parent): + def join(self): """ Handler for the 'join' event, does some magic to tell the scheduler - to wait in the given parent until the current coroutine ends + to wait until the current coroutine ends """ + child = self.current_task child.joined = True - if parent: - print("p") - child.waiters.append(parent) - if child.cancelled or child.exc: - print("f") - # Task was cancelled or has errored - if child.parent: - self.tasks.append(child.parent) - self.tasks.extend(child.waiters) - elif child.finished: - print("finish") -# if parent: - # self.tasks.append(parent) - self.tasks.extend(child.waiters) + if child.finished: + self.reschedule_joinee() + self.reschedule_parent() + elif child.exc: + raise child.exc def sleep(self, seconds: int or float): """ @@ -326,12 +317,16 @@ class AsyncScheduler: else: self.event_waiting[event].append(self.current_task) - def cancel(self, task): + def cancel(self): """ - Handler for the 'cancel' event, sets the task to be cancelled later + Handler for the 'cancel' event, schedules the task to be cancelled later """ - task.cancel_pending = True # Cancellation is deferred + if self.current_task.status in ("I/O", "sleep"): + # We cancel right away + self.do_cancel() + else: + self.current_task.cancel_pending = True # Cancellation is deferred def wrap_socket(self, sock): """ @@ -384,7 +379,6 @@ class AsyncScheduler: await want_write(sock) self.selector.unregister(sock) - sock.setblocking(False) return sock.close() async def connect_sock(self, sock: socket.socket, addr: tuple): diff --git a/giambio/objects.py b/giambio/objects.py index 28c9de9..c4ef000 100644 --- a/giambio/objects.py +++ b/giambio/objects.py @@ -61,17 +61,15 @@ class Task: Joins the task """ - res = await join(self) - if self.exc: - raise self.exc - return res + return await join(self) async def cancel(self): """ Cancels the task """ - - await cancel(self) + + if not self.exc and not self.cancelled and not self.finished: + await cancel(self) def __del__(self): self.coroutine.close() @@ -92,7 +90,7 @@ class Event: self.timeout = None self.waiting = 0 - async def set(self): + async def activate(self): """ Sets the event, waking up all tasks that called pause() on us diff --git a/giambio/traps.py b/giambio/traps.py index cb7f05d..0cd33cc 100644 --- a/giambio/traps.py +++ b/giambio/traps.py @@ -73,7 +73,7 @@ async def join(task): :type task: class: Task """ - return await create_trap("join", task, await current_task()) + return await create_trap("join") async def cancel(task): @@ -89,7 +89,7 @@ async def cancel(task): code, so if you really wanna do that be sure to re-raise it when done! """ - await create_trap("cancel", task) + await create_trap("cancel") assert task.cancelled, f"Coroutine ignored CancelledError" diff --git a/tests/count.py b/tests/count.py index 83f61c3..c216e02 100644 --- a/tests/count.py +++ b/tests/count.py @@ -27,8 +27,8 @@ async def countup(stop: int, step: int = 1): async def main(): try: async with giambio.create_pool() as pool: - pool.spawn(countdown, 10) - pool.spawn(countup, 5, 2) + pool.spawn(countdown, 5) + pool.spawn(countup, 5, 1) except Exception as e: print(f"Got -> {type(e).__name__}: {e}") print("Task execution complete")