diff --git a/giambio/__init__.py b/giambio/__init__.py index a8b5ba0..40da333 100644 --- a/giambio/__init__.py +++ b/giambio/__init__.py @@ -3,6 +3,5 @@ __version__ = (0, 0, 1) from ._core import AsyncScheduler from .exceptions import GiambioError, AlreadyJoinedError, CancelledError from ._traps import sleep -from ._managers import TaskManager __all__ = ["AsyncScheduler", "GiambioError", "AlreadyJoinedError", "CancelledError", "TaskManager", "sleep"] diff --git a/giambio/_core.py b/giambio/_core.py index 9472b0c..e1f4835 100644 --- a/giambio/_core.py +++ b/giambio/_core.py @@ -1,3 +1,4 @@ +# Import libraries and internal resources import types from collections import deque from selectors import DefaultSelector, EVENT_READ, EVENT_WRITE @@ -13,8 +14,15 @@ from ._traps import want_read, want_write class AsyncScheduler: - """Implementation of an event loop, alternates between execution of coroutines (asynchronous functions) - to allow a concurrency model or 'green threads'""" + """ + An asynchronous scheduler toy implementation. Tries to mimic the threaded + model in its simplicity, without using actual threads, but rather alternating + across coroutines execution to let more than one thing at a time to proceed + with its calculations. An attempt to fix the threaded model underlying pitfalls + and weaknesses has been made, without making the API unnecessarily complicated. + A few examples are tasks cancellation and exception propagation. + Can perform (unreliably) socket I/O asynchronously. + """ def __init__(self): """Object constructor""" @@ -22,7 +30,7 @@ class AsyncScheduler: self.to_run = deque() # Tasks that are ready to run self.paused = [] # Tasks that are asleep self.selector = DefaultSelector() # Selector object to perform I/O multiplexing - self.running = None # This will always point to the currently running coroutine (Task object) + self.current_task = None # This will always point to the currently running coroutine (Task object) self.joined = {} # Maps child tasks that need to be joined their respective parent task self.clock = default_timer # Monotonic clock to keep track of elapsed time reliably self.sequence = 0 # A monotonically increasing ID to avoid some corner cases with deadlines comparison @@ -34,62 +42,69 @@ class AsyncScheduler: give execution control to the loop itself.""" while True: - if not self.selector.get_map() and not any(deque(self.paused) + self.to_run): + if not self.selector.get_map() and not any([self.paused, self.to_run]): # If there is nothing to do, just exit break - if not self.to_run and self.paused: # If there are sockets ready, (re)schedule their associated task - wait(max(0.0, self.paused[0][0] - self.clock())) # Sleep in order not to waste CPU cycles - while self.paused[0][0] < self.clock(): # Reschedules task when their deadline has elapsed + if not self.to_run and self.paused: # If there are no actively running tasks, we try to schedule the asleep ones + wait(max(0.0, self.paused[0][0] - self.clock())) # Sleep until the closest deadline in order not to waste CPU cycles + while self.paused[0][0] < self.clock(): # Reschedules tasks when their deadline has elapsed _, __, task = heappop(self.paused) self.to_run.append(task) if not self.paused: break - timeout = 0.0 if self.to_run else None - tasks = self.selector.select(timeout) + timeout = 0.0 if self.to_run else None # If there are no tasks ready wait indefinitely + tasks = self.selector.select(timeout) # Get sockets that are ready and schedule their tasks for key, _ in tasks: self.to_run.append(key.data) # Socket ready? Schedule the task self.selector.unregister( key.fileobj) # Once (re)scheduled, the task does not need to perform I/O multiplexing (for now) - while self.to_run: - self.running = self.to_run.popleft() # Sets the currently running task + while self.to_run: # While there are tasks to run + self.current_task = self.to_run.popleft() # Sets the currently running task try: - method, *args = self.running.run() + method, *args = self.current_task.run() # Run a single step with the calculation getattr(self, method)(*args) # Sneaky method call, thanks to David Beazley for this ;) - except StopIteration as e: - e = e.args[0] if e.args else None - self.running.result = e - self.running.finished = True + except StopIteration as e: # Coroutine ends + self.current_task.result = e.args[0] if e.args else None + self.current_task.finished = True self.reschedule_parent() - except CancelledError: - self.running.cancelled = True + except CancelledError: # Coroutine was cancelled + self.current_task.cancelled = True self.reschedule_parent() - except Exception as error: - self.running.exc = error + except Exception as error: # Coroutine raised + self.current_task.exc = error self.reschedule_parent() + raise # Find a better way to propagate errors + + + def create_task(self, coro: types.coroutine): + """Spawns a child task""" + + task = Task(coro) + self.to_run.append(task) + return task def start(self, coro: types.coroutine): """Starts the event loop using a coroutine as an entry point. - Equivalent to self.create_task(coro) and self.run() """ - self.to_run.append(Task(coro)) + self.create_task(coro) self.run() def reschedule_parent(self): """Reschedules the parent task""" - popped = self.joined.pop(self.running, None) + popped = self.joined.pop(self.current_task, None) if popped: self.to_run.append(popped) def want_read(self, sock: socket.socket): """Handler for the 'want_read' event, registers the socket inside the selector to perform I/0 multiplexing""" - self.selector.register(sock, EVENT_READ, self.running) + self.selector.register(sock, EVENT_READ, self.current_task) def want_write(self, sock: socket.socket): """Handler for the 'want_write' event, registers the socket inside the selector to perform I/0 multiplexing""" - self.selector.register(sock, EVENT_WRITE, self.running) + self.selector.register(sock, EVENT_WRITE, self.current_task) def join(self, coro: types.coroutine): """Handler for the 'join' event, does some magic to tell the scheduler @@ -98,23 +113,22 @@ class AsyncScheduler: parent task""" if coro not in self.joined: - self.joined[coro] = self.running + self.joined[coro] = self.current_task else: raise AlreadyJoinedError("Joining the same task multiple times is not allowed!") def sleep(self, seconds): - """Puts a task to sleep""" + """Puts the caller to sleep for a given amount of seconds""" self.sequence += 1 - heappush(self.paused, (self.clock() + seconds, self.sequence, self.running)) - self.running = None + heappush(self.paused, (self.clock() + seconds, self.sequence, self.current_task)) def cancel(self, task): """Handler for the 'cancel' event, throws CancelledError inside a coroutine in order to stop it from executing. The loop continues to execute as tasks are independent""" - task.coroutine.throw(CancelledError) + task.throw(CancelledError) def wrap_socket(self, sock): """Wraps a standard socket into an AsyncSocket object""" diff --git a/giambio/_layers.py b/giambio/_layers.py index 5530af6..8ee0202 100644 --- a/giambio/_layers.py +++ b/giambio/_layers.py @@ -1,5 +1,5 @@ import types -from ._traps import join, sleep, cancel +from ._traps import join, cancel class Task: @@ -13,10 +13,15 @@ class Task: self.result = None self.finished = False - def run(self): + def run(self, what=None): """Simple abstraction layer over the coroutines ``send`` method""" - return self.coroutine.send(None) + return self.coroutine.send(what) + + def throw(self, err: Exception): + """Simple abstraction layer over the coroutines ``throw`` method""" + + return self.coroutine.throw(err) async def join(self): """Joins the task""" @@ -28,11 +33,6 @@ class Task: await cancel(self) - def result(self): - if self.exc: - raise self.exc - return self.result - def __repr__(self): """Implements repr(self)""" diff --git a/giambio/_managers.py b/giambio/_managers.py deleted file mode 100644 index 2b3231a..0000000 --- a/giambio/_managers.py +++ /dev/null @@ -1,95 +0,0 @@ -from collections import deque -import types -from ._layers import Task -from heapq import heappush - - -class TaskManager: - """The only way to execute asynchronous code in giambio is trough a ``TaskManager`` (or one of its children classes) object used within - an ``async with`` context manager. The ``TaskManager`` is an ideal "playground" where all asynchronous code runs and where giambio's - event loop can control their execution flow. - - The key feature of this mechanism is that all tasks are always joined automatically: This opens a new world of features, - allowing exceptions to propagate just as expected (exceptions are **never** discarded in giambio, unlike in some other libraries) and some other lower-level advantages. - Moreover, giambio always saves the return values so that you don't lose any important information when executing a coroutine. - - There are a few concepts to explain here, though: - - - The term "task" refers to a coroutine executed trough the ``TaskManager``'s methods ``spawn()`` and ``schedule()``, as well as - one executed with ``await coro()`` - - If an exception occurs, all other tasks are cancelled (read more below) and the exception is later propagated in the parent task - - The concept of cancellation is a bit tricky, because there is no real way to stop a coroutine from executing without actually raising - an exception inside i. So when giambio needs to cancel a task, it just throws ``giambio.exceptions.CancelledError`` inside it and hopes for the best. - This exception inherits from ``BaseException``, which by convention means that it should not be catched. Doing so in giambio will likely break your code - and make it explode spectacularly. If you really want to catch it to perform cleanup, be sure to re-raise it when done (or to raise another unhandled exception if you want) - so that the internal loop can proceed with execution. - In general, when writing an asynchronous function, you should always consider that it might be cancelled at any time and handle that case accordingly. - """ - - def __init__(self, loop): - self.values = {} # Results from each task - self.loop = loop # The event loop that spawned the TaskManager - - async def _cancel_and_raise(self, exc): - """Cancels all the tasks inside the TaskManager object and raises the exception - of the task that triggered this mechanism""" - - try: - await self.loop.running.cancel() - except Exception as error: - self.loop.running.exc = error - for task in self.loop.to_run + deque(self.loop.paused): - if isinstance(task, tuple): # Sleeping task - _, _, task = task - try: - await task.cancel() - except Exception as error: - task.exc = error - raise exc - - async def __aenter__(self): - return self - - async def __aexit__(self, type, value, traceback): - while True: - if not any([self.loop.to_run, self.loop.paused]): - break - tasks = self.loop.to_run + deque(self.loop.paused) - task = tasks.popleft() - if isinstance(task, tuple): # Sleeping task - _, _, task = task - self.values[task] = await task.join() - if task.exc: - print(task) - await self._cancel_and_raise(task.exc) - - def spawn(self, coroutine: types.coroutine): - """Schedules a task for execution, appending it to the call stack - - :param coroutine: The coroutine to spawn, please note that if you want to execute foo, you need to pass foo() as this - returns a coroutine instead of a function object - :type coroutine: types.coroutine - :returns: A ``Task`` object - :rtype: class: Task - """ - - task = Task(coroutine) - self.loop.to_run.append(task) - return task - - def schedule(self, coroutine: types.coroutine, n: int): - """Schedules a task for execution after when seconds - - :param coroutine: The coroutine to spawn, please note that if you want to execute foo, you need to pass foo() as this - returns a coroutine instead of a function object - :type coroutine: types.coroutine - :param n: The delay in seconds after which the task should start running - :type n: int - :returns: A ``Task`` object - :rtype: class: Task - """ - - self.loop.sequence += 1 - task = Task(coroutine) - heappush(self.loop.paused, (self.loop.clock() + n, self.loop.sequence, task)) - return task diff --git a/giambio/_traps.py b/giambio/_traps.py index 564a9fb..bed64ad 100644 --- a/giambio/_traps.py +++ b/giambio/_traps.py @@ -32,8 +32,7 @@ def join(task): """ yield "join", task - assert task.finished - return task.result() + return task.result @types.coroutine diff --git a/tests/count.py b/tests/count.py index 64ca9cc..ec8aa53 100644 --- a/tests/count.py +++ b/tests/count.py @@ -1,4 +1,4 @@ -from giambio import AsyncScheduler, sleep, TaskManager +from giambio import AsyncScheduler, sleep async def countdown(n: int): @@ -7,7 +7,7 @@ async def countdown(n: int): n -= 1 await sleep(1) if n == 5: - raise ValueError('lul') + raise ValueError('lul') print("Countdown over") @@ -21,13 +21,17 @@ async def countup(stop, step: int or float = 1): async def main(): - async with TaskManager(scheduler) as manager: - manager.spawn(countdown(10)) - manager.spawn(countup(5, 2)) - print("Counters started, awaiting completion") + cup = scheduler.create_task(countdown(10)) + cdown = scheduler.create_task(countup(5, 2)) + print("Counters started, awaiting completion") + await cup.join() + await cdown.join() print("Task execution complete") if __name__ == "__main__": scheduler = AsyncScheduler() - scheduler.start(main()) + try: + scheduler.start(main()) + except Exception: + print("main() errored!") diff --git a/tests/server.py b/tests/server.py index 9c9eca8..ee2dcb5 100644 --- a/tests/server.py +++ b/tests/server.py @@ -17,11 +17,10 @@ async def server(address: tuple): sock.listen(5) asock = sched.wrap_socket(sock) logging.info(f"Echo server serving asynchronously at {address}") - async with giambio.TaskManager(sched) as manager: - while True: - conn, addr = await asock.accept() - logging.info(f"{addr} connected") - manager.spawn(echo_handler(conn, addr)) + while True: + conn, addr = await asock.accept() + logging.info(f"{addr} connected") + task = sched.create_task(echo_handler(conn, addr)) async def echo_handler(sock: AsyncSocket, addr: tuple):