minor fixes (issue still persists)

This commit is contained in:
nocturn9x 2020-03-20 17:53:30 +01:00
parent 5bf06031ac
commit c2f1d01dcf
1 changed files with 11 additions and 13 deletions

View File

@ -4,7 +4,7 @@ from collections import deque, defaultdict
from selectors import DefaultSelector, EVENT_READ, EVENT_WRITE from selectors import DefaultSelector, EVENT_READ, EVENT_WRITE
from heapq import heappush, heappop from heapq import heappush, heappop
import socket import socket
from .exceptions import GiambioError, AlreadyJoinedError, CancelledError from .exceptions import AlreadyJoinedError, CancelledError
import traceback import traceback
from timeit import default_timer from timeit import default_timer
from time import sleep as wait from time import sleep as wait
@ -50,7 +50,6 @@ class EventLoop:
self.running = None # This will always point to the currently running coroutine (Task object) self.running = None # This will always point to the currently running coroutine (Task object)
self.joined = defaultdict(list) # Tasks that want to join self.joined = defaultdict(list) # Tasks that want to join
self.clock = default_timer # Monotonic clock to keep track of elapsed time self.clock = default_timer # Monotonic clock to keep track of elapsed time
self.tree = set() # Keep track of coroutines to know their state
def loop(self): def loop(self):
"""Main event loop for giambio""" """Main event loop for giambio"""
@ -58,16 +57,16 @@ class EventLoop:
while True: while True:
if not self.selector.get_map() and not self.to_run: if not self.selector.get_map() and not self.to_run:
break break
while self.selector.get_map(): # If there are sockets ready, schedule their associated task while self.selector.get_map(): # If there are sockets ready, (re)schedule their associated task
timeout = 0.0 if self.to_run else None timeout = 0.0 if self.to_run else None
tasks = deque(self.selector.select(timeout)) tasks = deque(self.selector.select(timeout))
for key, _ in tasks: for key, _ in tasks:
self.to_run.append(key.data) # Socket ready? Schedule the task self.to_run.append(key.data) # Socket ready? Schedule the task
self.selector.unregister(key.fileobj) # Once scheduled, the task does not need to wait anymore self.selector.unregister(key.fileobj) # Once (re)scheduled, the task does not need to perform I/O multiplexing (for now)
while self.to_run or self.paused: while self.to_run or self.paused:
if not self.to_run: if not self.to_run:
wait(max(0.0, self.paused[0][0] - self.clock())) # If there are no tasks ready, just do nothing wait(max(0.0, self.paused[0][0] - self.clock())) # If there are no tasks ready, just do nothing
while self.paused and self.paused[0][0] < self.clock(): # Rechedules task when their timer has elapsed while self.paused and self.paused[0][0] < self.clock(): # Reschedules task when their timer has elapsed
_, coro = heappop(self.paused) _, coro = heappop(self.paused)
self.to_run.append(coro) self.to_run.append(coro)
self.running = self.to_run.popleft() # Sets the currently running task self.running = self.to_run.popleft() # Sets the currently running task
@ -78,15 +77,14 @@ class EventLoop:
self.running.ret_value = e.args[0] if e.args else None # Saves the return value self.running.ret_value = e.args[0] if e.args else None # Saves the return value
self.to_run.extend(self.joined.pop(self.running, ())) # Reschedules the parent task self.to_run.extend(self.joined.pop(self.running, ())) # Reschedules the parent task
except CancelledError: except CancelledError:
self.tree.discard(self.running)
self.running.cancelled = True # Update the coroutine status self.running.cancelled = True # Update the coroutine status
raise raise
except Exception as has_raised: except Exception as has_raised:
if self.running.joined: self.to_run.extend(self.joined.pop(self.running, ())) # Reschedules the parent task
self.running.exception = has_raised # Errored? Save the exception if self.running.joined: # Let the join function handle the hassle of propagating the error
else: # If the task is not joined, the exception would disappear, but not in giambio self.running.exception = has_raised # Save the exception
else: # Let the exception propagate (I'm looking at you asyncIO ;))
raise raise
self.to_run.extend(self.joined.pop(self.running, ()))
except KeyboardInterrupt: except KeyboardInterrupt:
self.running.coroutine.throw(KeyboardInterrupt) self.running.coroutine.throw(KeyboardInterrupt)
@ -227,12 +225,12 @@ def join(task: Task):
if task.exception: if task.exception:
print("Traceback (most recent call last):") print("Traceback (most recent call last):")
traceback.print_tb(task.exception.__traceback__) traceback.print_tb(task.exception.__traceback__)
ename = type(task.exception).__name__ exception_name = type(task.exception).__name__
if str(task.exception): if str(task.exception):
print(f"{ename}: {task.exception}") print(f"{exception_name}: {task.exception}")
else: else:
print(task.exception) print(task.exception)
raise GiambioError from task.exception raise task.exception
return task.ret_val return task.ret_val