From 7b4051f3b90d59ef5832ed5996c2fe2dc74f49c6 Mon Sep 17 00:00:00 2001 From: nocturn9x Date: Sat, 14 Nov 2020 10:42:46 +0100 Subject: [PATCH] Starting to work on async pools --- giambio/__init__.py | 6 +-- giambio/_core.py | 122 ++++++++++++++++-------------------------- giambio/_layers.py | 11 +++- giambio/_managers.py | 70 ++++++++++++++++++++++++ giambio/_run.py | 33 ++++++------ giambio/_traps.py | 1 + giambio/exceptions.py | 2 +- tests/count.py | 53 +++++++++--------- tests/server.py | 25 +++++---- 9 files changed, 190 insertions(+), 133 deletions(-) create mode 100644 giambio/_managers.py diff --git a/giambio/__init__.py b/giambio/__init__.py index 90b9d0b..921c37c 100644 --- a/giambio/__init__.py +++ b/giambio/__init__.py @@ -16,7 +16,7 @@ limitations under the License. __author__ = "Nocturn9x aka Isgiambyy" __version__ = (1, 0, 0) -from ._run import run, spawn, clock, wrap_socket +from ._run import run, clock, wrap_socket, create_pool from .exceptions import GiambioError, AlreadyJoinedError, CancelledError from ._traps import sleep from ._layers import Event @@ -28,7 +28,7 @@ __all__ = [ "sleep", "Event", "run", - "spawn", "clock", - "wrap_socket" + "wrap_socket", + "create_pool" ] diff --git a/giambio/_core.py b/giambio/_core.py index ba166e9..37e3698 100644 --- a/giambio/_core.py +++ b/giambio/_core.py @@ -16,7 +16,7 @@ limitations under the License. # Import libraries and internal resources import types -from collections import deque, defaultdict +from collections import defaultdict from selectors import DefaultSelector, EVENT_READ, EVENT_WRITE import socket from .exceptions import AlreadyJoinedError, CancelledError, ResourceBusy, GiambioError @@ -42,9 +42,10 @@ class AsyncScheduler: def __init__(self): """Object constructor""" - self.tasks = deque() # Tasks that are ready to run + self.tasks = [] # Tasks that are ready to run self.selector = DefaultSelector() # Selector object to perform I/O multiplexing self.current_task = None # This will always point to the currently running coroutine (Task object) + self.catch = True self.joined = ( {} ) # Maps child tasks that need to be joined their respective parent task @@ -53,7 +54,7 @@ class AsyncScheduler: ) self.paused = TimeQueue(self.clock) # Tasks that are asleep self.events = set() # All Event objects - self._event_waiting = defaultdict(list) # Coroutines waiting on event objects + self.event_waiting = defaultdict(list) # Coroutines waiting on event objects self.sequence = 0 def _run(self): @@ -67,32 +68,31 @@ class AsyncScheduler: while True: try: if not self.selector.get_map() and not any( - [self.paused, self.tasks, self._event_waiting] + [self.paused, self.tasks, self.event_waiting] ): # If there is nothing to do, just exit break - if not self.tasks: - if ( - self.paused - ): # If there are no actively running tasks, we try to schedule the asleep ones + elif not self.tasks: + if self.paused: + # If there are no actively running tasks, we try to schedule the asleep ones self._check_sleeping() - if self.selector.get_map(): - self._check_io() # The next step is checking for I/O + if self.selector.get_map(): + self._check_io() # The next step is checking for I/O + if self.event_waiting: + # Try to awake event-waiting tasks + self._check_events() while self.tasks: # While there are tasks to run - self.current_task = ( - self.tasks.popleft() - ) # Sets the currently running task + self.current_task = self.tasks.pop(0) + # Sets the currently running task if self.current_task.status == "cancel": # Deferred cancellation self.current_task.cancelled = True self.current_task.throw(CancelledError(self.current_task)) method, *args = self.current_task.run() # Run a single step with the calculation self.current_task.status = "run" - getattr(self, f"_{method}")( - *args - ) # Sneaky method call, thanks to David Beazley for this ;) - if self._event_waiting: - self._check_events() + getattr(self, f"_{method}")(*args) + # Sneaky method call, thanks to David Beazley for this ;) except CancelledError as cancelled: - self.tasks.remove(cancelled.args[0]) # Remove the dead task + if cancelled.args[0] in self.tasks: + self.tasks.remove(cancelled.args[0]) # Remove the dead task self.tasks.append(self.current_task) except StopIteration as e: # Coroutine ends self.current_task.result = e.args[0] if e.args else None @@ -100,42 +100,33 @@ class AsyncScheduler: self._reschedule_parent() except BaseException as error: # Coroutine raised self.current_task.exc = error - self._reschedule_parent() - self._join(self.current_task) - - def clock(self): - """ - Returns the current clock time for the event loop. - Useful to keep track of elapsed time in the terms of - the scheduler itself - :return: whatever self.clock returns - :rtype: - """ - - return self.clock() + if self.catch: + self._reschedule_parent() + self._join(self.current_task) + else: + if not isinstance(error, RuntimeError): + raise def _check_events(self): """ Checks for ready or expired events and triggers them """ - for event, tasks in self._event_waiting.copy().items(): + for event, tasks in self.event_waiting.copy().items(): if event._set: event.event_caught = True self.tasks.extend(tasks + [event.notifier]) - self._event_waiting.pop(event) + self.event_waiting.pop(event) def _check_sleeping(self): """ Checks and reschedules sleeping tasks """ - wait( - max(0.0, self.paused[0][0] - self.clock()) - ) # Sleep until the closest deadline in order not to waste CPU cycles - while ( - self.paused[0][0] < self.clock() - ): # Reschedules tasks when their deadline has elapsed + wait(max(0.0, self.paused[0][0] - self.clock())) + # Sleep until the closest deadline in order not to waste CPU cycles + while self.paused[0][0] < self.clock(): + # Reschedules tasks when their deadline has elapsed self.tasks.append(self.paused.get()) if not self.paused: break @@ -145,41 +136,22 @@ class AsyncScheduler: Checks and schedules task to perform I/O """ - timeout = ( - 0.0 if self.tasks else None - ) # If there are no tasks ready wait indefinitely - io_ready = self.selector.select( - timeout - ) # Get sockets that are ready and schedule their tasks + timeout = 0.0 if self.tasks else None + # If there are no tasks ready wait indefinitely + io_ready = self.selector.select(timeout) + # Get sockets that are ready and schedule their tasks for key, _ in io_ready: - self.tasks.append(key.data) # Socket ready? Schedule the task - - def spawn(self, func: types.FunctionType, *args): - """ - Spawns a child task - """ - - task = Task(func(*args)) - self.tasks.append(task) - return task - - def spawn_after(self, func: types.FunctionType, n: int, *args): - """ - Schedules a task for execution after n seconds - """ - - task = Task(func(*args)) - self.paused.put(task, n) - return task + self.tasks.append(key.data) # Resource ready? Schedule its task def start(self, func: types.FunctionType, *args): """ - Starts the event loop using a coroutine as an entry point. + Starts the event loop from a sync context """ - entry = self.spawn(func, *args) - self._run() + entry = Task(func(*args)) + self.tasks.append(entry) self._join(entry) + self._run() return entry def _reschedule_parent(self): @@ -236,12 +208,9 @@ class AsyncScheduler: parent task """ - if child.cancelled or child.finished: # Task was cancelled or has finished executing and is therefore dead + if child.cancelled or child.exc: # Task was cancelled or has errored self._reschedule_parent() - elif child.exc: # Task raised an error, propagate it! - self._reschedule_parent() - raise child.exc - elif child.finished: + elif child.finished: # Task finished running self.tasks.append(self.current_task) # Task has already finished else: if child not in self.joined: @@ -283,7 +252,7 @@ class AsyncScheduler: else: return else: - self._event_waiting[event].append(self.current_task) + self.event_waiting[event].append(self.current_task) def _cancel(self, task): """ @@ -292,9 +261,8 @@ class AsyncScheduler: are independent """ - if ( - task.status in ("sleep", "I/O") and not task.cancelled - ): # It is safe to cancel a task while blocking + if task.status in ("sleep", "I/O") and not task.cancelled: + # It is safe to cancel a task while blocking task.cancelled = True task.throw(CancelledError(task)) elif task.status == "run": diff --git a/giambio/_layers.py b/giambio/_layers.py index 53aab99..68a5561 100644 --- a/giambio/_layers.py +++ b/giambio/_layers.py @@ -46,12 +46,21 @@ class Task: async def join(self): """Joins the task""" - return await join(self) + if self.cancelled and not self.exc: + return None + if self.exc: + raise self.exc + res = await join(self) + if self.exc: + raise self.exc + return res + async def cancel(self): """Cancels the task""" await cancel(self) + assert self.cancelled, "Task ignored cancellation" def __repr__(self): """Implements repr(self)""" diff --git a/giambio/_managers.py b/giambio/_managers.py new file mode 100644 index 0000000..e7a666f --- /dev/null +++ b/giambio/_managers.py @@ -0,0 +1,70 @@ +""" +Copyright (C) 2020 nocturn9x + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +""" + +from ._core import AsyncScheduler +from ._layers import Task +import types + + +class TaskManager: + """ + An asynchronous context manager for giambio + """ + + def __init__(self, loop: AsyncScheduler) -> None: + """ + Object constructor + """ + + self.loop = loop + + def spawn(self, func: types.FunctionType, *args): + """ + Spawns a child task + """ + + task = Task(func(*args)) + self.loop.tasks.append(task) + return task + + def spawn_after(self, func: types.FunctionType, n: int, *args): + """ + Schedules a task for execution after n seconds + """ + + assert n >= 0, "The time delay can't be negative" + task = Task(func(*args)) + self.loop.paused.put(task, n) + return task + + async def __aenter__(self): + self.loop.catch = True # Restore event loop's status + return self + + async def __aexit__(self, exc_type, exc, tb): + for task in self.loop.tasks: + try: + await task.join() + except BaseException as e: + for task in self.loop.tasks: + await task.cancel() + for _, __, task in self.loop.paused: + await task.cancel() + for tasks in self.loop.event_waiting.values(): + for task in tasks: + await task.cancel() + self.loop.catch = False + raise e diff --git a/giambio/_run.py b/giambio/_run.py index f91e6d3..2511a13 100644 --- a/giambio/_run.py +++ b/giambio/_run.py @@ -17,6 +17,7 @@ limitations under the License. import threading from ._core import AsyncScheduler from ._layers import Task +from ._managers import TaskManager from .socket import AsyncSocket from types import FunctionType, CoroutineType, GeneratorType import socket @@ -33,7 +34,9 @@ def run(func: FunctionType, *args) -> Task: if isinstance(func, (CoroutineType, GeneratorType)): raise RuntimeError("Looks like you tried to call giambio.run(your_func(arg1, arg2, ...)), that is wrong!" "\nWhat you wanna do, instead, is this: giambio.run(your_func, arg1, arg2, ...)") - if not hasattr(thread_local, "loop"): + try: + return thread_local.loop.start(func, *args) + except AttributeError: thread_local.loop = AsyncScheduler() return thread_local.loop.start(func, *args) @@ -47,25 +50,21 @@ def clock(): return thread_local.loop.clock() -def spawn(func: FunctionType, *args): - """ - Spawns a child task in the current event - loop - """ - - if isinstance(func, (CoroutineType, GeneratorType)): - raise RuntimeError("Looks like you tried to call giambio.spawn(your_func(arg1, arg2, ...)), that is wrong!" - "\nWhat you wanna do, instead, is this: giambio.spawn(your_func, arg1, arg2, ...)") - try: - return thread_local.loop.spawn(func, *args) - except AttributeError: - raise RuntimeError("It appears that giambio is not running, did you call giambio.spawn(...)" - " outside of an async context?") from None - - def wrap_socket(sock: socket.socket) -> AsyncSocket: """ Wraps a synchronous socket into a giambio.socket.AsyncSocket """ return thread_local.loop.wrap_socket(sock) + + +def create_pool(): + """ + Creates an async pool + """ + + try: + return TaskManager(thread_local.loop) + except AttributeError: + raise RuntimeError("It appears that giambio is not running, did you call giambio.async_pool()" + " outside of an async context?") from None diff --git a/giambio/_traps.py b/giambio/_traps.py index aa15a7b..bda1c37 100644 --- a/giambio/_traps.py +++ b/giambio/_traps.py @@ -39,6 +39,7 @@ def sleep(seconds: int): :type seconds: int """ + assert seconds >= 0, "The time delay can't be negative" yield "sleep", seconds diff --git a/giambio/exceptions.py b/giambio/exceptions.py index e9e6580..8fa63c9 100644 --- a/giambio/exceptions.py +++ b/giambio/exceptions.py @@ -26,7 +26,7 @@ class AlreadyJoinedError(GiambioError): class CancelledError(BaseException): - """Exception raised as a result of the giambio.core.cancel() method""" + """Exception raised by the giambio._layers.Task.cancel() method""" def __repr__(self): return "giambio.exceptions.CancelledError" diff --git a/tests/count.py b/tests/count.py index dcdef79..d761e33 100644 --- a/tests/count.py +++ b/tests/count.py @@ -1,7 +1,7 @@ import giambio -# A test for cancellation +# A test for context managers async def countdown(n: int): @@ -9,38 +9,43 @@ async def countdown(n: int): print(f"Down {n}") n -= 1 await giambio.sleep(1) + # raise Exception("oh no man") # Uncomment to test propagation print("Countdown over") - # raise Exception("oh no man") return 0 async def countup(stop: int, step: int = 1): - x = 0 - while x < stop: - print(f"Up {x}") - x += 1 - await giambio.sleep(step) - print("Countup over") - return 1 - + try: + x = 0 + while x < stop: + print(f"Up {x}") + x += 1 + await giambio.sleep(step) + print("Countup over") + return 1 + except giambio.exceptions.CancelledError: + print("I'm not gonna die!!") + raise BaseException(2) async def main(): - cdown = giambio.spawn(countdown, 10) - cup = giambio.spawn(countup, 5, 2) - print("Counters started, awaiting completion") - await giambio.sleep(2) - print("Slept 2 seconds, killing countup") - await cup.cancel() -# raise TypeError("bruh") - print("Countup cancelled") - up = await cup.join() - down = await cdown.join() - print(f"Countup returned: {up}\nCountdown returned: {down}") - print("Task execution complete") + try: + print("Creating an async pool") + async with giambio.create_pool() as pool: + print("Starting counters") + pool.spawn(countdown, 10) + t = pool.spawn(countup, 5, 2) + await giambio.sleep(2) + await t.cancel() + print("Task execution complete") + except Exception as e: + print(f"Caught this bad boy in here, propagating it -> {type(e).__name__}: {e}") + raise if __name__ == "__main__": + print("Starting event loop") try: giambio.run(main) - except Exception as e: - print(f"Exception caught! -> {type(e).__name__}: {e}") + except BaseException as e: + print(f"Exception caught from main event loop!! -> {type(e).__name__}: {e}") + print("Event loop done") diff --git a/tests/server.py b/tests/server.py index d921a81..74251c3 100644 --- a/tests/server.py +++ b/tests/server.py @@ -1,10 +1,10 @@ import giambio -import traceback from giambio.socket import AsyncSocket import socket import logging import sys + # A test to check for asynchronous I/O logging.basicConfig( @@ -20,10 +20,13 @@ async def server(address: tuple): asock = giambio.wrap_socket(sock) # We make the socket an async socket logging.info(f"Echo server serving asynchronously at {address}") while True: - conn, addr = await asock.accept() - logging.info(f"{addr} connected") - task = giambio.spawn(echo_handler, conn, addr) - # await task.join() # TODO: Joining I/O tasks seems broken + try: + async with giambio.async_pool() as pool: + conn, addr = await asock.accept() + logging.info(f"{addr} connected") + pool.spawn(echo_handler, conn, addr) + except TypeError: + print("Looks like we have a naughty boy here!") async def echo_handler(sock: AsyncSocket, addr: tuple): @@ -46,9 +49,11 @@ async def echo_handler(sock: AsyncSocket, addr: tuple): if __name__ == "__main__": + if len(sys.argv) > 1: + port = int(sys.argv[1]) + else: + port = 1500 try: - giambio.run(server, ("", 1501)) - except BaseException as error: # Exceptions propagate! - print(f"Exiting due to a {type(error).__name__}: '{error}'", end=" ") - print("traceback below (or above, or in the middle, idk async is weird)") - traceback.print_exception(*sys.exc_info()) + giambio.run(server, ("", port)) + except (Exception, KeyboardInterrupt) as error: # Exceptions propagate! + print(f"Exiting due to a {type(error).__name__}: '{error}'")