From 497ef45307601b315f036ae34e1e219a3a0f00a5 Mon Sep 17 00:00:00 2001 From: nocturn9x Date: Mon, 16 Nov 2020 23:06:54 +0100 Subject: [PATCH] Fixed events + Added some TODOs --- giambio/__init__.py | 6 ++-- giambio/context.py | 7 ++-- giambio/core.py | 79 ++++++++++++++++++++----------------------- giambio/exceptions.py | 41 +++++++++++----------- giambio/objects.py | 20 ++++++----- giambio/run.py | 2 ++ giambio/socket.py | 8 ++--- giambio/traps.py | 18 +++++----- tests/events.py | 22 ++++++------ 9 files changed, 101 insertions(+), 102 deletions(-) diff --git a/giambio/__init__.py b/giambio/__init__.py index 45f118c..8a33e3d 100644 --- a/giambio/__init__.py +++ b/giambio/__init__.py @@ -18,16 +18,14 @@ __author__ = "Nocturn9x aka Isgiambyy" __version__ = (1, 0, 0) -from .exceptions import GiambioError, AlreadyJoinedError, CancelledError +from . import exceptions from .traps import sleep, current_task from .objects import Event from .run import run, clock, wrap_socket, create_pool, get_event_loop, new_event_loop __all__ = [ - "GiambioError", - "AlreadyJoinedError", - "CancelledError", + "exceptions", "sleep", "Event", "run", diff --git a/giambio/context.py b/giambio/context.py index b3c7df3..a4f7635 100644 --- a/giambio/context.py +++ b/giambio/context.py @@ -1,4 +1,6 @@ """ +Higher-level context managers for async pools + Copyright (C) 2020 nocturn9x Licensed under the Apache License, Version 2.0 (the "License"); @@ -61,6 +63,7 @@ class TaskManager: for task in self.tasks: try: await task.join() - except BaseException: + except BaseException as e: + self.tasks.remove(task) for to_cancel in self.tasks: - await to_cancel.cancel() \ No newline at end of file + await to_cancel.cancel() diff --git a/giambio/core.py b/giambio/core.py index b12392b..d2deb1b 100644 --- a/giambio/core.py +++ b/giambio/core.py @@ -1,4 +1,6 @@ """ +The main runtime environment for giambio + Copyright (C) 2020 nocturn9x Licensed under the Apache License, Version 2.0 (the "License"); @@ -22,18 +24,15 @@ from timeit import default_timer from .objects import Task, TimeQueue from socket import SOL_SOCKET, SO_ERROR from .traps import want_read, want_write -from collections import defaultdict, deque +from collections import deque from .socket import AsyncSocket, WantWrite, WantRead from selectors import DefaultSelector, EVENT_READ, EVENT_WRITE from .exceptions import ( - AlreadyJoinedError, CancelledError, ResourceBusy, - GiambioError ) -# The main runtime environment for giambio class AsyncScheduler: """ @@ -62,8 +61,6 @@ class AsyncScheduler: self.paused = TimeQueue(self.clock) # All active Event objects self.events = set() - # Coroutines waiting on event objects - self.event_waiting = defaultdict(list) # Data to send back to a trap self.to_send = None # Have we ever ran? @@ -74,7 +71,10 @@ class AsyncScheduler: Returns True if there is work to do """ - if self.selector.get_map() or any([self.paused, self.tasks, self.event_waiting]): + if self.selector.get_map() or any([self.paused, + self.tasks, + self.events + ]): return False return True @@ -83,6 +83,7 @@ class AsyncScheduler: Shuts down the event loop """ + # TODO: See if other teardown is required (massive join()?) self.selector.close() def run(self): @@ -106,9 +107,9 @@ class AsyncScheduler: if self.selector.get_map(): # The next step is checking for I/O self.check_io() - if self.event_waiting: + if self.events: # Try to awake event-waiting tasks - self.trigger_events() + self.check_events() # While there are tasks to run while self.tasks: # Sets the currently running task @@ -132,6 +133,7 @@ class AsyncScheduler: self.current_task.status = "cancelled" self.current_task.cancelled = True self.current_task.cancel_pending = False + self.join() # TODO: Investigate if a call to join() is needed except StopIteration as ret: # Coroutine ends self.current_task.status = "end" @@ -150,8 +152,8 @@ class AsyncScheduler: as tasks are independent """ + # TODO: Do we need anything else? self.current_task.throw(CancelledError) - self.current_task.coroutine.close() def get_running(self): """ @@ -161,16 +163,17 @@ class AsyncScheduler: self.tasks.append(self.current_task) self.to_send = self.current_task - def trigger_events(self): + def check_events(self): """ Checks for ready or expired events and triggers them """ - for event, tasks in self.event_waiting.copy().items(): + for event in self.events.copy(): if event.set: event.event_caught = True - self.tasks.extend(tasks + [event.notifier]) - self.event_waiting.pop(event) + event.waiters + self.tasks.extend(event.waiters) + self.events.remove(event) def awake_sleeping(self): """ @@ -213,15 +216,6 @@ class AsyncScheduler: if entry.exc: raise entry.exc from None - def reschedule_parent(self): - """ - Reschedules the parent task of the - currently running task, if any - """ - - if parent := self.current_task.parent: - self.tasks.append(parent) - def reschedule_joinee(self): """ Reschedules the joinee(s) task of the @@ -238,11 +232,12 @@ class AsyncScheduler: child = self.current_task child.joined = True + if child.parent: + child.waiters.append(child.parent) if child.finished: self.reschedule_joinee() - self.reschedule_parent() elif child.exc: - raise child.exc + ... # TODO: Handle exceptions def sleep(self, seconds: int or float): """ @@ -258,7 +253,8 @@ class AsyncScheduler: # TODO: More generic I/O rather than just sockets def want_read(self, sock: socket.socket): """ - Handler for the 'want_read' event, registers the socket inside the selector to perform I/0 multiplexing + Handler for the 'want_read' event, registers the socket inside the + selector to perform I/0 multiplexing """ self.current_task.status = "I/O" @@ -277,7 +273,8 @@ class AsyncScheduler: def want_write(self, sock: socket.socket): """ - Handler for the 'want_write' event, registers the socket inside the selector to perform I/0 multiplexing + Handler for the 'want_write' event, registers the socket inside the + selector to perform I/0 multiplexing """ self.current_task.status = "I/O" @@ -286,7 +283,7 @@ class AsyncScheduler: # Socket is already scheduled! return else: - # modify() causes issues + # TODO: Inspect why modify() causes issues self.selector.unregister(sock) self.current_task.last_io = "WRITE", sock try: @@ -299,27 +296,23 @@ class AsyncScheduler: Sets an event """ - event.notifier = self.current_task - event.set = True self.events.add(event) + event.waiters.append(self.current_task) + event.set = True + self.reschedule_joinee() def event_wait(self, event): """ - Waits for an event + Pauses the current task on an event """ - if event in self.events: - event.waiting -= 1 - if event.waiting <= 0: - return self.events.remove(event) - else: - return - else: - self.event_waiting[event].append(self.current_task) + event.waiters.append(self.current_task) + def cancel(self): """ Handler for the 'cancel' event, schedules the task to be cancelled later + or does so straight away if it is safe to do so """ if self.current_task.status in ("I/O", "sleep"): @@ -337,8 +330,8 @@ class AsyncScheduler: async def read_sock(self, sock: socket.socket, buffer: int): """ - Reads from a socket asynchronously, waiting until the resource is available and returning up to buffer bytes - from the socket + Reads from a socket asynchronously, waiting until the resource is + available and returning up to buffer bytes from the socket """ try: @@ -349,8 +342,8 @@ class AsyncScheduler: async def accept_sock(self, sock: socket.socket): """ - Accepts a socket connection asynchronously, waiting until the resource is available and returning the - result of the accept() call + Accepts a socket connection asynchronously, waiting until the resource + is available and returning the result of the accept() call """ try: diff --git a/giambio/exceptions.py b/giambio/exceptions.py index 8fa63c9..5e70eb6 100644 --- a/giambio/exceptions.py +++ b/giambio/exceptions.py @@ -1,4 +1,6 @@ """ +Exceptions for giambio + Copyright (C) 2020 nocturn9x Licensed under the Apache License, Version 2.0 (the "License"); @@ -16,36 +18,35 @@ limitations under the License. class GiambioError(Exception): - """Base class for gaimbio exceptions""" + """ + Base class for giambio exceptions + """ - pass - - -class AlreadyJoinedError(GiambioError): - pass + ... class CancelledError(BaseException): - """Exception raised by the giambio._layers.Task.cancel() method""" + """ + Exception raised by the giambio.objects.Task.cancel() method + to terminate a child task. This should NOT be catched, or + at least it should be re-raised and never ignored + """ - def __repr__(self): - return "giambio.exceptions.CancelledError" + ... class ResourceBusy(GiambioError): - """Exception that is raised when a resource is accessed by more than - one task at a time""" + """ + Exception that is raised when a resource is accessed by more than + one task at a time + """ - pass - - -class BrokenPipeError(GiambioError): - """Wrapper around the broken pipe socket.error""" - - pass + ... class ResourceClosed(GiambioError): - """Raised when I/O is attempted on a closed fd""" + """ + Raised when I/O is attempted on a closed resource + """ - pass + ... diff --git a/giambio/objects.py b/giambio/objects.py index c4ef000..2b95d1f 100644 --- a/giambio/objects.py +++ b/giambio/objects.py @@ -1,4 +1,6 @@ """ +Various object wrappers and abstraction layers + Copyright (C) 2020 nocturn9x Licensed under the Apache License, Version 2.0 (the "License"); @@ -61,15 +63,17 @@ class Task: Joins the task """ - return await join(self) + res = await join(self) + if self.exc: + raise self.exc + return res async def cancel(self): """ Cancels the task """ - - if not self.exc and not self.cancelled and not self.finished: - await cancel(self) + + await cancel(self) def __del__(self): self.coroutine.close() @@ -86,11 +90,10 @@ class Event: """ self.set = False + self.waiters = [] self.event_caught = False - self.timeout = None - self.waiting = 0 - async def activate(self): + async def trigger(self): """ Sets the event, waking up all tasks that called pause() on us @@ -100,12 +103,11 @@ class Event: raise GiambioError("The event has already been set") await event_set(self) - async def pause(self): + async def wait(self): """ Waits until the event is set """ - self.waiting += 1 await event_wait(self) diff --git a/giambio/run.py b/giambio/run.py index c8d9ced..f4c64fe 100644 --- a/giambio/run.py +++ b/giambio/run.py @@ -1,4 +1,6 @@ """ +Helper methods and public API + Copyright (C) 2020 nocturn9x Licensed under the Apache License, Version 2.0 (the "License"); diff --git a/giambio/socket.py b/giambio/socket.py index cbc2768..6bd47f5 100644 --- a/giambio/socket.py +++ b/giambio/socket.py @@ -1,5 +1,4 @@ """ - Basic abstraction layer for giambio asynchronous sockets Copyright (C) 2020 nocturn9x @@ -22,6 +21,7 @@ import socket from .exceptions import ResourceClosed from .traps import sleep + # Stolen from curio try: from ssl import SSLWantReadError, SSLWantWriteError @@ -34,18 +34,18 @@ except ImportError: class AsyncSocket(object): """ - Abstraction layer for asynchronous sockets + Abstraction layer for asynchronous TCP sockets """ def __init__(self, sock: socket.socket, loop): self.sock = sock - self.sock.setblocking(False) self.loop = loop self._closed = False + self.sock.setblocking(False) async def receive(self, max_size: int): """ - Receives up to max_size from a socket asynchronously + Receives up to max_size bytes from a socket asynchronously """ if self._closed: diff --git a/giambio/traps.py b/giambio/traps.py index 0cd33cc..4d5adaa 100644 --- a/giambio/traps.py +++ b/giambio/traps.py @@ -1,4 +1,10 @@ """ +Implementation for all giambio traps, which are hooks +into the event loop and allow it to switch tasks. +These coroutines are the one and only way to interact +with the event loop from the user's perspective, and +the entire library is based on them + Copyright (C) 2020 nocturn9x Licensed under the Apache License, Version 2.0 (the "License"); @@ -14,12 +20,6 @@ See the License for the specific language governing permissions and limitations under the License. """ -# Implementation for all giambio traps, which are hooks -# into the event loop and allow it to switch tasks -# These coroutines are the one and only way to interact -# with the event loop from the user's perspective, and -# the entire library is based on these traps - import types @@ -95,7 +95,7 @@ async def cancel(task): async def want_read(stream): """ - Notifies the event loop that a task that wants to read from the given + Notifies the event loop that a task wants to read from the given resource :param stream: The resource that needs to be read @@ -106,7 +106,7 @@ async def want_read(stream): async def want_write(stream): """ - Notifies the event loop that a task that wants to read from the given + Notifies the event loop that a task wants to write on the given resource :param stream: The resource that needs to be written @@ -128,7 +128,7 @@ async def event_set(event): async def event_wait(event): """ Notifies the event loop that the current task has to wait - for given event to trigger + for the given event to trigger """ await create_trap("event_wait", event) diff --git a/tests/events.py b/tests/events.py index 0e8cabd..c36db9a 100644 --- a/tests/events.py +++ b/tests/events.py @@ -4,10 +4,10 @@ import giambio # A test for events -async def child(notifier: giambio.Event, pause: int): +async def child(ev: giambio.Event, pause: int): print("[child] Child is alive! Going to wait until notified") start_total = giambio.clock() - await notifier.pause() + await ev.wait() end_pause = giambio.clock() - start_total print(f"[child] Parent set the event, exiting in {pause} seconds") start_sleep = giambio.clock() @@ -18,15 +18,15 @@ async def child(notifier: giambio.Event, pause: int): async def parent(pause: int = 1): - event = giambio.Event() - print("[parent] Spawning child task") - task = giambio.spawn(child, event, pause + 2) - start = giambio.clock() - print(f"[parent] Sleeping {pause} second(s) before setting the event") - await giambio.sleep(pause) - await event.set() - print("[parent] Event set, awaiting child") - await task.join() + async with giambio.create_pool() as pool: + event = giambio.Event() + print("[parent] Spawning child task") + pool.spawn(child, event, pause + 2) + start = giambio.clock() + print(f"[parent] Sleeping {pause} second(s) before setting the event") + await giambio.sleep(pause) + await event.trigger() + print("[parent] Event set, awaiting child") end = giambio.clock() - start print(f"[parent] Child exited in {end} seconds")