From 0b1c5d75e7c9ddccff64c85dc3e63dc5382a06d1 Mon Sep 17 00:00:00 2001 From: nocturn9x Date: Fri, 13 Nov 2020 10:44:47 +0100 Subject: [PATCH] Major library refactoring, made event loops thread-local (no need to carry scheduler objects around anymore) and fixed some bugs related to I/O. Made events simpler as they would overlap with a future implementation of channels --- giambio/__init__.py | 7 +++-- giambio/_core.py | 72 +++++++++++++++++++++++++-------------------- giambio/_layers.py | 29 ++++++++++++------ giambio/_run.py | 44 +++++++++++++++++++++++---- giambio/_traps.py | 22 +++++++++----- giambio/socket.py | 6 ++-- tests/count.py | 15 ++++++---- tests/events.py | 35 ++++++++++++---------- tests/join.py | 26 ++++++++++++++++ tests/server.py | 25 +++++++++++----- 10 files changed, 192 insertions(+), 89 deletions(-) create mode 100644 tests/join.py diff --git a/giambio/__init__.py b/giambio/__init__.py index 529da7e..90b9d0b 100644 --- a/giambio/__init__.py +++ b/giambio/__init__.py @@ -16,16 +16,19 @@ limitations under the License. __author__ = "Nocturn9x aka Isgiambyy" __version__ = (1, 0, 0) -from ._core import AsyncScheduler +from ._run import run, spawn, clock, wrap_socket from .exceptions import GiambioError, AlreadyJoinedError, CancelledError from ._traps import sleep from ._layers import Event __all__ = [ - "AsyncScheduler", "GiambioError", "AlreadyJoinedError", "CancelledError", "sleep", "Event", + "run", + "spawn", + "clock", + "wrap_socket" ] diff --git a/giambio/_core.py b/giambio/_core.py index 746179e..ba166e9 100644 --- a/giambio/_core.py +++ b/giambio/_core.py @@ -22,7 +22,7 @@ import socket from .exceptions import AlreadyJoinedError, CancelledError, ResourceBusy, GiambioError from timeit import default_timer from time import sleep as wait -from .socket import AsyncSocket, WantWrite +from .socket import AsyncSocket, WantWrite, WantRead from ._layers import Task, TimeQueue from socket import SOL_SOCKET, SO_ERROR from ._traps import want_read, want_write @@ -52,11 +52,11 @@ class AsyncScheduler: default_timer # Monotonic clock to keep track of elapsed time reliably ) self.paused = TimeQueue(self.clock) # Tasks that are asleep - self.events = {} # All Event objects + self.events = set() # All Event objects self._event_waiting = defaultdict(list) # Coroutines waiting on event objects self.sequence = 0 - def run(self): + def _run(self): """ Starts the loop and 'listens' for events until there are either ready or asleep tasks, then exit. This behavior kinda reflects a kernel, as coroutines can request @@ -84,28 +84,37 @@ class AsyncScheduler: if self.current_task.status == "cancel": # Deferred cancellation self.current_task.cancelled = True self.current_task.throw(CancelledError(self.current_task)) - method, *args = self.current_task.run( - self.current_task._notify - ) # Run a single step with the calculation (and awake event-waiting tasks if any) + method, *args = self.current_task.run() # Run a single step with the calculation self.current_task.status = "run" getattr(self, f"_{method}")( *args ) # Sneaky method call, thanks to David Beazley for this ;) if self._event_waiting: - self.check_events() + self._check_events() except CancelledError as cancelled: self.tasks.remove(cancelled.args[0]) # Remove the dead task self.tasks.append(self.current_task) except StopIteration as e: # Coroutine ends self.current_task.result = e.args[0] if e.args else None self.current_task.finished = True - self.reschedule_parent(self.current_task) + self._reschedule_parent() except BaseException as error: # Coroutine raised self.current_task.exc = error - self.reschedule_parent(self.current_task) + self._reschedule_parent() self._join(self.current_task) - def check_events(self): + def clock(self): + """ + Returns the current clock time for the event loop. + Useful to keep track of elapsed time in the terms of + the scheduler itself + :return: whatever self.clock returns + :rtype: + """ + + return self.clock() + + def _check_events(self): """ Checks for ready or expired events and triggers them """ @@ -113,8 +122,6 @@ class AsyncScheduler: for event, tasks in self._event_waiting.copy().items(): if event._set: event.event_caught = True - for task in tasks: - task._notify = event._notify self.tasks.extend(tasks + [event.notifier]) self._event_waiting.pop(event) @@ -147,40 +154,41 @@ class AsyncScheduler: for key, _ in io_ready: self.tasks.append(key.data) # Socket ready? Schedule the task - def create_task(self, coro: types.coroutine): + def spawn(self, func: types.FunctionType, *args): """ Spawns a child task """ - task = Task(coro) + task = Task(func(*args)) self.tasks.append(task) return task - def schedule_task(self, coro: types.coroutine, n: int): + def spawn_after(self, func: types.FunctionType, n: int, *args): """ Schedules a task for execution after n seconds """ - task = Task(coro) + task = Task(func(*args)) self.paused.put(task, n) return task - def start(self, coro: types.coroutine): + def start(self, func: types.FunctionType, *args): """ Starts the event loop using a coroutine as an entry point. """ - entry = self.create_task(coro) - self.run() + entry = self.spawn(func, *args) + self._run() self._join(entry) return entry - def reschedule_parent(self, coro): + def _reschedule_parent(self): """ - Reschedules the parent task + Reschedules the parent task of the + currently running task, if any """ - parent = self.joined.pop(coro, None) + parent = self.joined.pop(self.current_task, None) if parent: self.tasks.append(parent) return parent @@ -200,7 +208,7 @@ class AsyncScheduler: self.current_task._last_io = "READ", sock try: self.selector.register(sock, EVENT_READ, self.current_task) - except KeyError: + except KeyError: # The socket is already registered doing something else raise ResourceBusy("The given resource is busy!") from None def _want_write(self, sock: socket.socket): @@ -228,10 +236,10 @@ class AsyncScheduler: parent task """ - if child.cancelled: # Task was cancelled and is therefore dead - self.tasks.append(self.current_task) + if child.cancelled or child.finished: # Task was cancelled or has finished executing and is therefore dead + self._reschedule_parent() elif child.exc: # Task raised an error, propagate it! - self.reschedule_parent(child) + self._reschedule_parent() raise child.exc elif child.finished: self.tasks.append(self.current_task) # Task has already finished @@ -254,27 +262,26 @@ class AsyncScheduler: else: self.tasks.append(self.current_task) - def _event_set(self, event, value): + def _event_set(self, event): """ Sets an event """ event.notifier = self.current_task event._set = True - event._notify = value - self.events[event] = value + self.events.add(event) def _event_wait(self, event): """ Waits for an event """ - if self.events.get(event, None): + if event in self.events: event.waiting -= 1 if event.waiting <= 0: - return self.events.pop(event) + return self.events.remove(event) else: - return self.events[event] + return else: self._event_waiting[event].append(self.current_task) @@ -334,6 +341,7 @@ class AsyncScheduler: """ await want_write(sock) + self.selector.unregister(sock) return sock.close() async def _connect_sock(self, sock: socket.socket, addr: tuple): diff --git a/giambio/_layers.py b/giambio/_layers.py index f6a503c..53aab99 100644 --- a/giambio/_layers.py +++ b/giambio/_layers.py @@ -32,7 +32,6 @@ class Task: self.finished = False self.status = "init" # This is useful for cancellation self._last_io = None - self._notify = None def run(self, what=None): """Simple abstraction layer over the coroutines ``send`` method""" @@ -61,24 +60,25 @@ class Task: class Event: - """A class designed similarly to threading.Event, but with more features""" + """A class designed similarly to threading.Event""" def __init__(self): """Object constructor""" self._set = False - self._notify = None self.event_caught = False self.timeout = None self.waiting = 0 - async def set(self, value=True): - """Sets the event, optionally taking a value. This can be used - to control tasks' flow by 'sending' commands back and fort""" + async def set(self): + """ + Sets the event, waking up all tasks that called + pause() on this event + """ if self._set: raise GiambioError("The event has already been set") - await event_set(self, value) + await event_set(self) async def pause(self): """Waits until the event is set and returns a value""" @@ -88,8 +88,10 @@ class Event: class TimeQueue: - """An abstraction layer over a heap queue based on time. This is where - sleeping tasks will be put when they are asleep""" + """ + An abstraction layer over a heap queue based on time. This is where + sleeping tasks will be put when they are not running + """ def __init__(self, clock): self.clock = clock @@ -112,8 +114,17 @@ class TimeQueue: return f"TimeQueue({self.container}, clock={self.clock})" def put(self, item, amount): + """ + Pushes an item onto the queue with its unique + time amount and ID + """ + heappush(self.container, (self.clock() + amount, self.sequence, item)) self.sequence += 1 def get(self): + """ + Gets the first task that is meant to run + """ + return heappop(self.container)[2] diff --git a/giambio/_run.py b/giambio/_run.py index 44c6fa1..6b6b4e9 100644 --- a/giambio/_run.py +++ b/giambio/_run.py @@ -14,11 +14,45 @@ See the License for the specific language governing permissions and limitations under the License. """ -from ._core import AsyncScheduler -from types import coroutine +import threading +from ._layers import Task +from .socket import AsyncSocket +from types import FunctionType +import socket -def run(coro: coroutine): - """Shorthand for giambio.AsyncScheduler().start(coro)""" +thread_local = threading.local() - ... # How to do it? (Share objects between coroutines etc) + +def run(func: FunctionType, *args) -> Task: + """ + Starts the event loop from a synchronous entry point + """ + + return thread_local.loop.start(func, *args) + + +def clock(): + """ + Returns the current clock time of the thread-local event + loop + """ + + return thread_local.loop.clock() + + +def spawn(func: FunctionType, *args): + """ + Spawns a child task in the current event + loop + """ + + return thread_local.loop.spawn(func, *args) + + +def wrap_socket(sock: socket.socket) -> AsyncSocket: + """ + Wraps a synchronous socket into a giambio.socket.AsyncSocket + """ + + return thread_local.loop.wrap_socket(sock) diff --git a/giambio/_traps.py b/giambio/_traps.py index b58d461..aa15a7b 100644 --- a/giambio/_traps.py +++ b/giambio/_traps.py @@ -56,13 +56,16 @@ def join(task): @types.coroutine def cancel(task): - """'Tells' the scheduler that the passed task must be cancelled + """ + 'Tells' the scheduler that the passed task must be cancelled The concept of cancellation here is tricky, because there is no real way to 'stop' a running task if not by raising an exception inside it and just ignore whatever the task - returns (and also hoping that the task won't cause damage when exiting abruptly). + returns (and also hoping that the task won't cause collateral damage when exiting abruptly). It is highly recommended that when you write a coroutine you take into account that it might - be cancelled at any time + be cancelled at any time. Please note, though, that ignoring a giambio.exceptions.CancelledError + exception *will* break your code, so if you really wanna do that be sure to re-raise + it when done! """ yield "cancel", task @@ -71,7 +74,8 @@ def cancel(task): @types.coroutine def want_read(sock: socket.socket): - """'Tells' the event loop that there is some coroutine that wants to read from the given socket + """ + 'Tells' the event loop that there is some coroutine that wants to read from the given socket :param sock: The socket to perform the operation on :type sock: class: socket.socket @@ -82,7 +86,8 @@ def want_read(sock: socket.socket): @types.coroutine def want_write(sock: socket.socket): - """'Tells' the event loop that there is some coroutine that wants to write on the given socket + """ + 'Tells' the event loop that there is some coroutine that wants to write on the given socket :param sock: The socket to perform the operation on :type sock: class: socket.socket @@ -92,18 +97,19 @@ def want_write(sock: socket.socket): @types.coroutine -def event_set(event, value): +def event_set(event): """Communicates to the loop that the given event object must be set. This is important as the loop constantly checks for active events to deliver them """ - yield "event_set", event, value + yield "event_set", event @types.coroutine def event_wait(event): - """Notifies the event loop that the current task has to wait + """ + Notifies the event loop that the current task has to wait for the event to trigger """ diff --git a/giambio/socket.py b/giambio/socket.py index 890f129..fee93a9 100644 --- a/giambio/socket.py +++ b/giambio/socket.py @@ -81,12 +81,10 @@ class AsyncSocket(object): await self.loop._connect_sock(self.sock, addr) async def __aenter__(self): - await sleep(0) return self.sock.__enter__() - async def __aexit__(self, *args): - await sleep(0) - return self.sock.__exit__(*args) + async def __aexit__(self, *_): + await self.close() def __repr__(self): return f"giambio.socket.AsyncSocket({self.sock}, {self.loop})" diff --git a/tests/count.py b/tests/count.py index 00e4e93..dcdef79 100644 --- a/tests/count.py +++ b/tests/count.py @@ -1,12 +1,16 @@ import giambio +# A test for cancellation + + async def countdown(n: int): while n > 0: print(f"Down {n}") n -= 1 await giambio.sleep(1) print("Countdown over") + # raise Exception("oh no man") return 0 @@ -21,8 +25,8 @@ async def countup(stop: int, step: int = 1): async def main(): - cdown = scheduler.create_task(countdown(10)) - cup = scheduler.create_task(countup(5, 2)) + cdown = giambio.spawn(countdown, 10) + cup = giambio.spawn(countup, 5, 2) print("Counters started, awaiting completion") await giambio.sleep(2) print("Slept 2 seconds, killing countup") @@ -36,8 +40,7 @@ async def main(): if __name__ == "__main__": - scheduler = giambio.AsyncScheduler() try: - scheduler.start(main()) - except Exception: - print("bruh") + giambio.run(main) + except Exception as e: + print(f"Exception caught! -> {type(e).__name__}: {e}") diff --git a/tests/events.py b/tests/events.py index 7772fb7..914574c 100644 --- a/tests/events.py +++ b/tests/events.py @@ -1,30 +1,35 @@ import giambio -async def child(notifier: giambio.Event, reply: giambio.Event, pause: int): - print("[child] Child is alive! Going to sleep until notified") - notification = await notifier.pause() - print(f"[child] Parent said: '{notification}', replying in {pause} seconds") +# A test for events + + +async def child(notifier: giambio.Event, pause: int): + print("[child] Child is alive! Going to wait until notified") + start_total = giambio.clock() + await notifier.pause() + end_pause = giambio.clock() - start_total + print(f"[child] Parent set the event, exiting in {pause} seconds") + start_sleep = giambio.clock() - start_total await giambio.sleep(pause) - print("[child] Replying to parent") - await reply.set("Hi daddy!") + end_sleep = giambio.clock() - start_sleep + end_total = giambio.clock() - start_total + print(f"[child] Done! Slept for {end_total} seconds total ({end_pause} paused, {end_sleep} sleeping), nice nap!") async def parent(pause: int = 1): event = giambio.Event() - reply = giambio.Event() print("[parent] Spawning child task") - task = scheduler.create_task(child(event, reply, pause)) + task = giambio.spawn(child, event, pause + 2) + start = giambio.clock() print(f"[parent] Sleeping {pause} second(s) before setting the event") await giambio.sleep(pause) - await event.set("Hi, my child") - print("[parent] Event set, awaiting reply") - reply = await reply.pause() - print(f"[parent] Child replied: '{reply}'") + await event.set() + print("[parent] Event set, awaiting child") await task.join() - print("[parent] Child exited") + end = giambio.clock() - start + print(f"[parent] Child exited in {end} seconds") if __name__ == "__main__": - scheduler = giambio.AsyncScheduler() - scheduler.start(parent(5)) + giambio.run(parent, 3) diff --git a/tests/join.py b/tests/join.py new file mode 100644 index 0000000..611bfb0 --- /dev/null +++ b/tests/join.py @@ -0,0 +1,26 @@ +import giambio + + +# A test to see if tasks are properly joined + + +async def child(sleep: int): + start = giambio.clock() + print(f"[child] Gonna sleep for {sleep} seconds!") + await giambio.sleep(sleep) + end = giambio.clock() - start + print(f"[child] I woke up! Slept for {end} seconds") + + +async def main(): + print("[parent] Spawning child") + task = giambio.spawn(child, 5) + start = giambio.clock() + print("[parent] Child spawned, awaiting completion") + await task.join() + end = giambio.clock() - start + print(f"[parent] Child exited in {end} seconds") + + +if __name__ == "__main__": + giambio.run(main) diff --git a/tests/server.py b/tests/server.py index 8cfc064..d921a81 100644 --- a/tests/server.py +++ b/tests/server.py @@ -1,9 +1,11 @@ import giambio +import traceback from giambio.socket import AsyncSocket import socket import logging +import sys -sched = giambio.AsyncScheduler() +# A test to check for asynchronous I/O logging.basicConfig( level=20, format="[%(levelname)s] %(asctime)s %(message)s", datefmt="%d/%m/%Y %p" @@ -15,21 +17,26 @@ async def server(address: tuple): sock.bind(address) sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) sock.listen(5) - asock = sched.wrap_socket(sock) + asock = giambio.wrap_socket(sock) # We make the socket an async socket logging.info(f"Echo server serving asynchronously at {address}") while True: conn, addr = await asock.accept() logging.info(f"{addr} connected") - task = sched.create_task(echo_handler(conn, addr)) + task = giambio.spawn(echo_handler, conn, addr) + # await task.join() # TODO: Joining I/O tasks seems broken async def echo_handler(sock: AsyncSocket, addr: tuple): async with sock: - await sock.send_all(b"Welcome to the server pal!\n") + await sock.send_all(b"Welcome to the server pal, feel free to send me something!\n") while True: - data = await sock.receive(1000) + await sock.send_all(b"-> ") + data = await sock.receive(1024) if not data: break + elif data == b"raise\n": + await sock.send_all(b"I'm dead dude\n") + raise TypeError("Oh, no, I'm gonna die!") to_send_back = data data = data.decode("utf-8").encode("unicode_escape") logging.info(f"Got: '{data.decode('utf-8')}' from {addr}") @@ -40,6 +47,8 @@ async def echo_handler(sock: AsyncSocket, addr: tuple): if __name__ == "__main__": try: - sched.start(server(("", 25001))) - except KeyboardInterrupt: # Exceptions propagate! - print("Exiting...") + giambio.run(server, ("", 1501)) + except BaseException as error: # Exceptions propagate! + print(f"Exiting due to a {type(error).__name__}: '{error}'", end=" ") + print("traceback below (or above, or in the middle, idk async is weird)") + traceback.print_exception(*sys.exc_info())