From d2a20a14fcd8f625fd53f44f99049ddcf23df793 Mon Sep 17 00:00:00 2001 From: nocturn9x Date: Sat, 11 Jul 2020 08:57:12 +0000 Subject: [PATCH] stuff --- giambio/_core.py | 75 ++++++++++++++++++++++++---------------------- giambio/_layers.py | 17 +++++++---- giambio/_traps.py | 5 ++-- tests/count.py | 1 + tests/events.py | 36 ++++++++++------------ 5 files changed, 69 insertions(+), 65 deletions(-) diff --git a/giambio/_core.py b/giambio/_core.py index 60187eb..c0e2a4b 100644 --- a/giambio/_core.py +++ b/giambio/_core.py @@ -16,13 +16,13 @@ limitations under the License. # Import libraries and internal resources import types -from collections import deque +from collections import deque, defaultdict from selectors import DefaultSelector, EVENT_READ, EVENT_WRITE import socket -from .exceptions import AlreadyJoinedError, CancelledError, ResourceBusy +from .exceptions import AlreadyJoinedError, CancelledError, ResourceBusy, GiambioError from timeit import default_timer from time import sleep as wait -from .socket import AsyncSocket, WantWrite, WantRead +from .socket import AsyncSocket, WantWrite from ._layers import Task, TimeQueue from socket import SOL_SOCKET, SO_ERROR from ._traps import want_read, want_write @@ -49,7 +49,7 @@ class AsyncScheduler: self.clock = default_timer # Monotonic clock to keep track of elapsed time reliably self.paused = TimeQueue(self.clock) # Tasks that are asleep self.events = {} # All Event objects - self.event_waiting = {} # Coroutines waiting on event objects + self.event_waiting = defaultdict(list) # Coroutines waiting on event objects self.sequence = 0 def run(self): @@ -81,7 +81,7 @@ class AsyncScheduler: try: if self.current_task.status == "cancel": # Deferred cancellation self.current_task.cancelled = True - self.current_task.throw(CancelledError) + self.current_task.throw(CancelledError(self.current_task)) method, *args = self.current_task.run(self.current_task._notify) # Run a single step with the calculation (and awake event-waiting tasks if any) self.current_task.status = "run" getattr(self, method)(*args) # Sneaky method call, thanks to David Beazley for this ;) @@ -89,6 +89,7 @@ class AsyncScheduler: self.check_events() except CancelledError as cancelled: self.tasks.remove(cancelled.args[0]) # Remove the dead task + self.tasks.append(self.current_task) except StopIteration as e: # Coroutine ends self.current_task.result = e.args[0] if e.args else None self.current_task.finished = True @@ -101,18 +102,12 @@ class AsyncScheduler: def check_events(self): """Checks for ready or expired events and triggers them""" - for event, (timeout, _, task) in self.event_waiting.copy().items(): - if timeout and self.clock() > timeout: - event._timeout_expired = True - event._notify = task._notify = None - self.tasks.append(task) - self.tasks.append(event.notifier) - self.event_waiting.pop(event) - elif event._set: + for event, tasks in self.event_waiting.copy().items(): + if event._set: event.event_caught = True - task._notify = event._notify - self.tasks.append(task) - self.tasks.append(event.notifier) + for task in tasks: + task._notify = event._notify + self.tasks.extend(tasks + [event.notifier]) self.event_waiting.pop(event) def check_sleeping(self): @@ -139,19 +134,33 @@ class AsyncScheduler: self.tasks.append(task) return task + def schedule_task(self, coro: types.coroutine, n: int): + """Schedules a task for execution after n seconds""" + + task = Task(coro) + self.paused.put(task, n) + return task + def start(self, coro: types.coroutine): """Starts the event loop using a coroutine as an entry point. """ - self.create_task(coro) - self.run() + entry = self.create_task(coro) + crashed = False + try: + self.run() + except BaseException as exc: + entry.exc = exc + crashed = True + if crashed: + raise GiambioError("Event loop crashed!") from entry.exc + return entry def reschedule_parent(self, coro): """Reschedules the parent task""" parent = self.joined.pop(coro, None) if parent: - assert parent not in self.tasks self.tasks.append(parent) return parent @@ -197,13 +206,10 @@ class AsyncScheduler: coroutine returns or, if an exception gets raised, the exception will get propagated inside the parent task""" - if child.finished: - self.tasks.append(self.current_task) + if child not in self.joined: + self.joined[child] = self.current_task else: - if child not in self.joined: - self.joined[child] = self.current_task - else: - raise AlreadyJoinedError("Joining the same task multiple times is not allowed!") + raise AlreadyJoinedError("Joining the same task multiple times is not allowed!") def sleep(self, seconds: int or float): """Puts the caller to sleep for a given amount of seconds""" @@ -222,19 +228,17 @@ class AsyncScheduler: event._notify = value self.events[event] = value - def event_wait(self, event, timeout): + def event_wait(self, event): """Waits for an event""" - self.sequence += 1 - if timeout: - timeout = self.clock() + timeout - else: - timeout = 0 if self.events.get(event, None): - return self.events.pop(event) + event.waiting -= 1 + if event.waiting <= 0: + return self.events.pop(event) + else: + return self.events[event] else: - self.event_waiting[event] = timeout, self.sequence, self.current_task - self.event_waiting = dict(sorted(self.event_waiting.items())) + self.event_waiting[event].append(self.current_task) def cancel(self, task): """Handler for the 'cancel' event, throws CancelledError inside a coroutine @@ -245,8 +249,7 @@ class AsyncScheduler: task.cancelled = True task.throw(CancelledError(task)) elif task.status == "run": - task.status = "cancel" - self.reschedule_parent() + task.status = "cancel" # Cancellation is deferred def wrap_socket(self, sock): """Wraps a standard socket into an AsyncSocket object""" diff --git a/giambio/_layers.py b/giambio/_layers.py index ef8c84f..2acbbcc 100644 --- a/giambio/_layers.py +++ b/giambio/_layers.py @@ -17,6 +17,8 @@ limitations under the License. import types from ._traps import join, cancel, event_set, event_wait from heapq import heappop, heappush +from .exceptions import GiambioError + class Task: @@ -61,26 +63,29 @@ class Task: class Event: """A class designed similarly to threading.Event, but with more features""" - def __init__(self, loop): + def __init__(self): """Object constructor""" self._set = False self._notify = None - self.notifier = loop.current_task - self._timeout_expired = False self.event_caught = False self.timeout = None + self.waiting = 0 - async def set(self, value=None): + async def set(self, value=True): """Sets the event, optionally taking a value. This can be used to control tasks' flow by 'sending' commands back and fort""" + if self._set: + raise GiambioError("The event has already been set") await event_set(self, value) - async def pause(self, timeout=0): + async def pause(self): """Waits until the event is set and returns a value""" - return await event_wait(self, timeout) + self.waiting += 1 + return await event_wait(self) + class TimeQueue: """An abstraction layer over a heap queue based on time. This is where diff --git a/giambio/_traps.py b/giambio/_traps.py index 4ed9b31..74fe49a 100644 --- a/giambio/_traps.py +++ b/giambio/_traps.py @@ -48,7 +48,6 @@ def join(task): """ yield "join", task - return task.result @types.coroutine @@ -95,7 +94,7 @@ def event_set(event, value): @types.coroutine -def event_wait(event, timeout: int): +def event_wait(event): - msg = yield "event_wait", event, timeout + msg = yield "event_wait", event return msg diff --git a/tests/count.py b/tests/count.py index 8b7c90d..c12dc2f 100644 --- a/tests/count.py +++ b/tests/count.py @@ -25,6 +25,7 @@ async def main(): await giambio.sleep(2) print("Slept 2 seconds, killing countup") await cup.cancel() ## DOES NOT WORK!!! + print("Countup cancelled") await cup.join() await cdown.join() print("Task execution complete") diff --git a/tests/events.py b/tests/events.py index a739415..7772fb7 100644 --- a/tests/events.py +++ b/tests/events.py @@ -1,34 +1,30 @@ import giambio -async def child(notifier: giambio.Event, timeout: int): - print("[child] Child is alive!") - if timeout: - print(f"[child] Waiting for events for up to {timeout} seconds") - else: - print("[child] Waiting for events") - notification = await notifier.pause(timeout=timeout) - if not notifier.event_caught: - print("[child] Parent was too slow!") - else: - print(f"[child] Parent said: {notification}") +async def child(notifier: giambio.Event, reply: giambio.Event, pause: int): + print("[child] Child is alive! Going to sleep until notified") + notification = await notifier.pause() + print(f"[child] Parent said: '{notification}', replying in {pause} seconds") + await giambio.sleep(pause) + print("[child] Replying to parent") + await reply.set("Hi daddy!") -async def parent(pause: int = 1, child_timeout: int = 0): - event = giambio.Event(scheduler) + +async def parent(pause: int = 1): + event = giambio.Event() + reply = giambio.Event() print("[parent] Spawning child task") - task = scheduler.create_task(child(event, child_timeout)) + task = scheduler.create_task(child(event, reply, pause)) print(f"[parent] Sleeping {pause} second(s) before setting the event") await giambio.sleep(pause) - print("[parent] Event set") await event.set("Hi, my child") - if not event.event_caught: - print("[parent] Event not delivered, the timeout has expired") - else: - print("[parent] Event delivered") + print("[parent] Event set, awaiting reply") + reply = await reply.pause() + print(f"[parent] Child replied: '{reply}'") await task.join() print("[parent] Child exited") if __name__ == "__main__": scheduler = giambio.AsyncScheduler() - scheduler.start(parent(4, 5)) + scheduler.start(parent(5))