diff --git a/giambio/__init__.py b/giambio/__init__.py index 40da333..a8b5ba0 100644 --- a/giambio/__init__.py +++ b/giambio/__init__.py @@ -3,5 +3,6 @@ __version__ = (0, 0, 1) from ._core import AsyncScheduler from .exceptions import GiambioError, AlreadyJoinedError, CancelledError from ._traps import sleep +from ._managers import TaskManager __all__ = ["AsyncScheduler", "GiambioError", "AlreadyJoinedError", "CancelledError", "TaskManager", "sleep"] diff --git a/giambio/_core.py b/giambio/_core.py index 6b2feac..c460ebf 100644 --- a/giambio/_core.py +++ b/giambio/_core.py @@ -1,5 +1,5 @@ import types -from collections import deque, defaultdict +from collections import deque from selectors import DefaultSelector, EVENT_READ, EVENT_WRITE from heapq import heappush, heappop import socket @@ -24,7 +24,7 @@ class AsyncScheduler: self.paused = [] # Sleeping tasks self.selector = DefaultSelector() # Selector object to perform I/O multiplexing self.running = None # This will always point to the currently running coroutine (Task object) - self.joined = defaultdict(list) # Tasks that want to join + self.joined = {} # Tasks that want to join self.clock = default_timer # Monotonic clock to keep track of elapsed time self.sequence = 0 # To avoid TypeError in the (unlikely) event of two task with the same deadline we use a unique and incremental integer pushed to the queue together with the deadline and the function itself @@ -52,9 +52,16 @@ class AsyncScheduler: method, *args = self.running.run() getattr(self, method)(*args) # Sneaky method call, thanks to David Beazley for this ;) except StopIteration as e: - self.to_run.extend(self.joined.pop(self.running, ())) # Reschedules the parent task + e = e.args[0] if e.args else None + self.running.result = e + self.running.finished = True + self.reschedule_parent() except CancelledError: - ... # What to do here? + self.running.cancelled = True + self.reschedule_parent() + except Exception as error: + self.running.exc = error + self.reschedule_parent() def start(self, coro: types.coroutine): """Starts the event loop using a coroutine as an entry point. @@ -64,6 +71,12 @@ class AsyncScheduler: self.to_run.append(Task(coro)) self.run() + def reschedule_parent(self): + """Reschedules the parent task""" + + popped = self.joined.pop(self.running, None) + if popped: + self.to_run.append(popped) def want_read(self, sock: socket.socket): """Handler for the 'want_read' event, registers the socket inside the selector to perform I/0 multiplexing""" @@ -118,7 +131,7 @@ class AsyncScheduler: if coro not in self.joined: - self.joined[coro].append(self.running) + self.joined[coro] = self.running else: raise AlreadyJoinedError("Joining the same task multiple times is not allowed!") @@ -132,7 +145,7 @@ class AsyncScheduler: def cancel(self, task): """Cancels a task""" - self.running.coroutine.throw(CancelledError(task)) + task.coroutine.throw(CancelledError) async def connect_sock(self, sock: socket.socket, addr: tuple): """Connects a socket asynchronously""" diff --git a/giambio/_layers.py b/giambio/_layers.py index 8bdfa31..050acac 100644 --- a/giambio/_layers.py +++ b/giambio/_layers.py @@ -1,6 +1,6 @@ import types from ._traps import join, sleep, cancel -from .exceptions import CancelledError + class Task: @@ -8,8 +8,10 @@ class Task: def __init__(self, coroutine: types.coroutine): self.coroutine = coroutine - self.joined = False # True if the task is joined self.cancelled = False # True if the task gets cancelled + self.exc = None + self.result = None + self.finished = False def run(self): """Simple abstraction layer over the coroutines ``send`` method""" @@ -26,3 +28,7 @@ class Task: await cancel(self) + def __repr__(self): + """Implements repr(self)""" + + return f"Task({self.coroutine}, cancelled={self.cancelled}, exc={repr(self.exc)}, result={self.result}, finished={self.finished})" diff --git a/giambio/_managers.py b/giambio/_managers.py new file mode 100644 index 0000000..929002c --- /dev/null +++ b/giambio/_managers.py @@ -0,0 +1,99 @@ +from collections import deque +import types +from ._layers import Task +from heapq import heappush +from ._traps import sleep +import time + + +class TaskManager: + """The only way to execute asynchronous code in giambio is trough a ``TaskManager`` (or one of its children classes) object used within + an ``async with`` context manager. The ``TaskManager`` is an ideal "playground" where all asynchronous code runs and where giambio's + event loop can control their execution flow. + + The key feature of this mechanism is that all tasks are always joined automatically: This opens a new world of features, + allowing exceptions to propagate just as expected (exceptions are **never** discarded in giambio, unlike in some other libraries) and some other lower-level advantages. + Moreover, giambio always saves the return values so that you don't lose any important information when executing a coroutine. + + There are a few concepts to explain here, though: + + - The term "task" refers to a coroutine executed trough the ``TaskManager``'s methods ``spawn()`` and ``schedule()``, as well as + one executed with ``await coro()`` + - If an exception occurs, all other tasks are cancelled (read more below) and the exception is later propagated in the parent task + - The concept of cancellation is a bit tricky, because there is no real way to stop a coroutine from executing without actually raising + an exception inside i. So when giambio needs to cancel a task, it just throws ``giambio.exceptions.CancelledError`` inside it and hopes for the best. + This exception inherits from ``BaseException``, which by convention means that it should not be catched. Doing so in giambio will likely break your code + and make it explode spectacularly. If you really want to catch it to perform cleanup, be sure to re-raise it when done (or to raise another unhandled exception if you want) + so that the internal loop can proceed with execution. + In general, when writing an asynchronous function, you should always consider that it might be cancelled at any time and handle that case accordingly. + """ + + + + def __init__(self, loop): + self.values = {} # Results from each task + self.loop = loop # The event loop that spawned the TaskManager + + async def _cancel_and_raise(self, exc): + """Cancels all the tasks inside the TaskManager object and raises the exception + of the task that triggered this mechanism""" + + try: + await self.loop.running.cancel() + except Exception as error: + self.loop.running.exc = error + for task in self.loop.to_run + deque(self.loop.paused): + if isinstance(task, tuple): # Sleeping task + _, _, task = task + try: + await task.cancel() + except Exception as error: + task.exc = error + raise exc + + async def __aenter__(self): + return self + + async def __aexit__(self, type, value, traceback): + while True: + if not any([self.loop.to_run, self.loop.paused]): + break + tasks = self.loop.to_run + deque(self.loop.paused) + task = tasks.popleft() + if isinstance(task, tuple): # Sleeping task + _, _, task = task + self.values[task] = await task.join() + if task.exc: + print(task) + await self._cancel_and_raise(task.exc) + + def spawn(self, coroutine: types.coroutine): + """Schedules a task for execution, appending it to the call stack + + :param coroutine: The coroutine to spawn, please note that if you want to execute foo, you need to pass foo() as this + returns a coroutine instead of a function object + :type coroutine: types.coroutine + :returns: A ``Task`` object + :rtype: class: Task + """ + + task = Task(coroutine) + self.loop.to_run.append(task) + return task + + def schedule(self, coroutine: types.coroutine, n: int): + """Schedules a task for execution after when seconds + + :param coroutine: The coroutine to spawn, please note that if you want to execute foo, you need to pass foo() as this + returns a coroutine instead of a function object + :type coroutine: types.coroutine + :param n: The delay in seconds after which the task should start running + :type n: int + :returns: A ``Task`` object + :rtype: class: Task + """ + + self.loop.sequence += 1 + task = Task(coroutine) + heappush(self.loop.paused, (self.loop.clock() + n, self.loop.sequence, task)) + return task diff --git a/giambio/_run.py b/giambio/_run.py new file mode 100644 index 0000000..285ca1c --- /dev/null +++ b/giambio/_run.py @@ -0,0 +1,8 @@ +from ._core import AsyncScheduler +from types import coroutine + + +def run(coro: coroutine): + """Shorthand for giambio.AsyncScheduler().start(coro)""" + + ... # How to do it? (Share objects between coroutines etc) diff --git a/giambio/_traps.py b/giambio/_traps.py index 14f1975..f3b03c9 100644 --- a/giambio/_traps.py +++ b/giambio/_traps.py @@ -24,18 +24,16 @@ def sleep(seconds: int): yield "sleep", seconds @types.coroutine -def join(task, silent=False): +def join(task): """'Tells' the scheduler that the desired task MUST be awaited for completion - If silent is True, any exception in the child task will be discarded :param task: The task to join :type task: class: Task - :param silent: If ``True``, any exception raised from the child will be ignored (not recommended), defaults to ``False`` - :type silent: bool, optional """ - return (yield "join", task) - + yield "join", task + assert task.finished + return task.result @types.coroutine def cancel(task): @@ -49,7 +47,7 @@ def cancel(task): """ yield "cancel", task - + assert task.cancelled @types.coroutine def want_read(sock: socket.socket): diff --git a/tests/count.py b/tests/count.py index 58a913d..f649816 100644 --- a/tests/count.py +++ b/tests/count.py @@ -1,4 +1,4 @@ -from giambio import AsyncScheduler, sleep, CancelledError +from giambio import AsyncScheduler, sleep, TaskManager import time @@ -7,6 +7,8 @@ async def countdown(n: int): print(f"Down {n}") n -= 1 await sleep(1) + if n == 5: + raise ValueError('lul') print("Countdown over") @@ -20,14 +22,10 @@ async def countup(stop, step: int or float = 1): async def main(): - counter = scheduler.create_task(countup(5, 4)) - counter2 = scheduler.create_task(countdown(20)) - print("Counters started, awaiting completion") - await sleep(4) - print("4 seconds have passed, killing countup task") - await counter.cancel() - await counter.join() - await counter2.join() + async with TaskManager(scheduler) as manager: + manager.spawn(countdown(10)) + manager.spawn(countup(5, 2)) + print("Counters started, awaiting completion") print("Task execution complete") if __name__ == "__main__":