From c2f1d01dcf4c41414256ac77abe3680371c08038 Mon Sep 17 00:00:00 2001 From: nocturn9x Date: Fri, 20 Mar 2020 17:53:30 +0100 Subject: [PATCH] minor fixes (issue still persists) --- giambio/core.py | 24 +++++++++++------------- 1 file changed, 11 insertions(+), 13 deletions(-) diff --git a/giambio/core.py b/giambio/core.py index 3b84842..9ae562a 100644 --- a/giambio/core.py +++ b/giambio/core.py @@ -4,7 +4,7 @@ from collections import deque, defaultdict from selectors import DefaultSelector, EVENT_READ, EVENT_WRITE from heapq import heappush, heappop import socket -from .exceptions import GiambioError, AlreadyJoinedError, CancelledError +from .exceptions import AlreadyJoinedError, CancelledError import traceback from timeit import default_timer from time import sleep as wait @@ -50,7 +50,6 @@ class EventLoop: self.running = None # This will always point to the currently running coroutine (Task object) self.joined = defaultdict(list) # Tasks that want to join self.clock = default_timer # Monotonic clock to keep track of elapsed time - self.tree = set() # Keep track of coroutines to know their state def loop(self): """Main event loop for giambio""" @@ -58,16 +57,16 @@ class EventLoop: while True: if not self.selector.get_map() and not self.to_run: break - while self.selector.get_map(): # If there are sockets ready, schedule their associated task + while self.selector.get_map(): # If there are sockets ready, (re)schedule their associated task timeout = 0.0 if self.to_run else None tasks = deque(self.selector.select(timeout)) for key, _ in tasks: self.to_run.append(key.data) # Socket ready? Schedule the task - self.selector.unregister(key.fileobj) # Once scheduled, the task does not need to wait anymore + self.selector.unregister(key.fileobj) # Once (re)scheduled, the task does not need to perform I/O multiplexing (for now) while self.to_run or self.paused: if not self.to_run: wait(max(0.0, self.paused[0][0] - self.clock())) # If there are no tasks ready, just do nothing - while self.paused and self.paused[0][0] < self.clock(): # Rechedules task when their timer has elapsed + while self.paused and self.paused[0][0] < self.clock(): # Reschedules task when their timer has elapsed _, coro = heappop(self.paused) self.to_run.append(coro) self.running = self.to_run.popleft() # Sets the currently running task @@ -78,15 +77,14 @@ class EventLoop: self.running.ret_value = e.args[0] if e.args else None # Saves the return value self.to_run.extend(self.joined.pop(self.running, ())) # Reschedules the parent task except CancelledError: - self.tree.discard(self.running) self.running.cancelled = True # Update the coroutine status raise except Exception as has_raised: - if self.running.joined: - self.running.exception = has_raised # Errored? Save the exception - else: # If the task is not joined, the exception would disappear, but not in giambio + self.to_run.extend(self.joined.pop(self.running, ())) # Reschedules the parent task + if self.running.joined: # Let the join function handle the hassle of propagating the error + self.running.exception = has_raised # Save the exception + else: # Let the exception propagate (I'm looking at you asyncIO ;)) raise - self.to_run.extend(self.joined.pop(self.running, ())) except KeyboardInterrupt: self.running.coroutine.throw(KeyboardInterrupt) @@ -227,12 +225,12 @@ def join(task: Task): if task.exception: print("Traceback (most recent call last):") traceback.print_tb(task.exception.__traceback__) - ename = type(task.exception).__name__ + exception_name = type(task.exception).__name__ if str(task.exception): - print(f"{ename}: {task.exception}") + print(f"{exception_name}: {task.exception}") else: print(task.exception) - raise GiambioError from task.exception + raise task.exception return task.ret_val