From 4618c8cc79393c4dfcddd3d40fdb7aecb5556449 Mon Sep 17 00:00:00 2001 From: nocturn9x Date: Thu, 26 Nov 2020 16:57:20 +0100 Subject: [PATCH] Exceptions and cancellation seem to be working (except for server.py inside tests, investigation needed) --- README.md | 4 +- giambio/__init__.py | 2 + giambio/context.py | 13 +++---- giambio/core.py | 87 +++++++++++++++++++++++++++---------------- giambio/exceptions.py | 2 +- giambio/objects.py | 10 +++-- giambio/run.py | 32 +++++++--------- tests/events.py | 4 +- tests/sleep.py | 7 ++-- 9 files changed, 89 insertions(+), 72 deletions(-) diff --git a/README.md b/README.md index b8aff43..aef1cc9 100644 --- a/README.md +++ b/README.md @@ -247,9 +247,7 @@ async function in this case) until all children tasks have exited, and as it tur is a good thing. The reason why pools always wait for all children to have finished executing is that it makes easier propagating exceptions in the parent if something goes wrong: unlike many other frameworks, -exceptions in giambio always behave as expected* - -*: This is a WIP, it doesn't work right now! +exceptions in giambio always behave as expected Ok, so, let's try running this snippet and see what we get: diff --git a/giambio/__init__.py b/giambio/__init__.py index f7e7bf5..9260a7e 100644 --- a/giambio/__init__.py +++ b/giambio/__init__.py @@ -1,4 +1,6 @@ """ +Asynchronous Python made easy (and friendly) + Copyright (C) 2020 nocturn9x Licensed under the Apache License, Version 2.0 (the "License"); diff --git a/giambio/context.py b/giambio/context.py index 9943dc9..89450ef 100644 --- a/giambio/context.py +++ b/giambio/context.py @@ -20,7 +20,6 @@ limitations under the License. import types from .core import AsyncScheduler from .objects import Task -from .exceptions import CancelledError class TaskManager: @@ -34,7 +33,7 @@ class TaskManager: """ self.loop = loop - self.tasks = [] + self.tasks = [] # We store a reference to all tasks, even the asleep ones! def spawn(self, func: types.FunctionType, *args): """ @@ -42,10 +41,10 @@ class TaskManager: """ task = Task(func(*args), func.__name__ or str(func)) - task.parent = self.loop.current_task + task.joiners = [self.loop.current_task] self.loop.tasks.append(task) - self.tasks.append(task) self.loop.debugger.on_task_spawn(task) + self.tasks.append(task) return task def spawn_after(self, func: types.FunctionType, n: int, *args): @@ -55,11 +54,11 @@ class TaskManager: assert n >= 0, "The time delay can't be negative" task = Task(func(*args), func.__name__ or str(func)) - task.parent = self.loop.current_task + task.joiners = [self.loop.current_task] task.sleep_start = self.loop.clock() self.loop.paused.put(task, n) - self.tasks.append(task) self.loop.debugger.on_task_schedule(task, n) + self.tasks.append(task) return task async def __aenter__(self): @@ -67,4 +66,4 @@ class TaskManager: async def __aexit__(self, exc_type: Exception, exc: Exception, tb): for task in self.tasks: - await task.join() \ No newline at end of file + await task.join() diff --git a/giambio/core.py b/giambio/core.py index 7ff33dc..6dc1db6 100644 --- a/giambio/core.py +++ b/giambio/core.py @@ -25,7 +25,6 @@ from .objects import Task, TimeQueue from socket import SOL_SOCKET, SO_ERROR from .traps import want_read, want_write from .util.debug import BaseDebugger -from collections import deque from itertools import chain from .socket import AsyncSocket, WantWrite, WantRead from selectors import DefaultSelector, EVENT_READ, EVENT_WRITE @@ -35,7 +34,6 @@ from .exceptions import (InternalError, ) - class AsyncScheduler: """ An asynchronous scheduler implementation. Tries to mimic the threaded @@ -54,8 +52,9 @@ class AsyncScheduler: # The debugger object. If it is none we create a dummy object that immediately returns an empty # lambda every time you access any of its attributes to avoid lots of if self.debugger clauses if debugger: - assert issubclass(type(debugger), BaseDebugger), "The debugger must be a subclass of giambio.util.BaseDebugger" - self.debugger = debugger or type("DumbDebugger", (object, ), {"__getattr__": lambda *args: lambda *args: None})() + assert issubclass(type(debugger), + BaseDebugger), "The debugger must be a subclass of giambio.util.BaseDebugger" + self.debugger = debugger or type("DumbDebugger", (object,), {"__getattr__": lambda *args: lambda *arg: None})() # Tasks that are ready to run self.tasks = [] # Selector object to perform I/O multiplexing @@ -110,7 +109,8 @@ class AsyncScheduler: elif not self.tasks: # If there are no actively running tasks # we try to schedule the asleep ones - self.awake_sleeping() + if self.paused: + self.awake_sleeping() # The next step is checking for I/O self.check_io() # Try to awake event-waiting tasks @@ -121,30 +121,36 @@ class AsyncScheduler: self.current_task = self.tasks.pop(0) self.debugger.before_task_step(self.current_task) if self.current_task.cancel_pending: + # We perform the deferred cancellation + # if it was previously scheduled self.do_cancel() if self.to_send and self.current_task.status != "init": + # A little setup to send objects from and to + # coroutines outside the event loop data = self.to_send else: + # The first time coroutines' method .send() wants None! data = None # Run a single step with the calculation method, *args = self.current_task.run(data) + # Some debugging and internal chatter here self.current_task.status = "run" self.current_task.steps += 1 self.debugger.after_task_step(self.current_task) - # Data has been sent, reset it to None + # If data has been sent, reset it to None if self.to_send and self.current_task != "init": self.to_send = None # Sneaky method call, thanks to David Beazley for this ;) getattr(self, method)(*args) except AttributeError: # If this happens, that's quite bad! raise InternalError("Uh oh! Something very bad just happened, did" - " you try to mix primitives from other async libraries?") from None + " you try to mix primitives from other async libraries?") from None except CancelledError: self.current_task.status = "cancelled" self.current_task.cancelled = True self.current_task.cancel_pending = False self.debugger.after_cancel(self.current_task) - self.join(self.current_task) # TODO: Investigate if a call to join() is needed + # TODO: Do we need to join? except StopIteration as ret: # Coroutine ends self.current_task.status = "end" @@ -153,20 +159,21 @@ class AsyncScheduler: self.debugger.on_task_exit(self.current_task) self.join(self.current_task) except BaseException as err: + # Coroutine raised self.current_task.exc = err self.current_task.status = "crashed" - self.join(self.current_task) + self.join(self.current_task) # This propagates the exception def do_cancel(self, task: Task = None): """ Performs task cancellation by throwing CancelledError inside the current - task in order to stop it from executing. The loop continues to execute + task in order to stop it from running. The loop continues to execute as tasks are independent """ task = task or self.current_task self.debugger.before_cancel(task) - task.throw(CancelledError) + task.throw(CancelledError()) def get_running(self): """ @@ -184,7 +191,6 @@ class AsyncScheduler: for event in self.events.copy(): if event.set: event.event_caught = True - event.waiters.append(self.current_task) self.tasks.extend(event.waiters) self.events.remove(event) @@ -239,37 +245,52 @@ class AsyncScheduler: self.run() self.has_ran = True self.debugger.on_exit() + if entry.exc: + raise entry.exc def reschedule_joinee(self, task: Task): """ - Reschedules the joinee of the + Reschedules the parent(s) of the given task, if any """ - if task.parent: - self.tasks.append(task.parent) + for t in task.joiners: + if t not in self.tasks: + # Since a task can be the parent + # of multiple children, we need to + # make sure we reschedule it only + # once, otherwise a RuntimeError will + # occur + self.tasks.append(t) - def join(self, child: Task): + def cancel_all(self): + """ + Cancels all tasks in preparation for the exception + throwing from self.join + """ + + for to_cancel in chain(self.tasks, self.paused): + try: + self.cancel(to_cancel) + except CancelledError: + to_cancel.status = "cancelled" + to_cancel.cancelled = True + to_cancel.cancel_pending = False + self.debugger.after_cancel(to_cancel) + self.tasks.remove(to_cancel) + + def join(self, task: Task): """ Handler for the 'join' event, does some magic to tell the scheduler to wait until the current coroutine ends """ - child.joined = True - if child.finished: - self.reschedule_joinee(child) - elif child.exc: - for task in chain(self.tasks, self.paused): - try: - self.cancel(task) - except CancelledError: - task.status = "cancelled" - task.cancelled = True - task.cancel_pending = False - self.debugger.after_cancel(task) - self.tasks.remove(task) - child.parent.throw(child.exc) - self.tasks.append(child.parent) + task.joined = True + if task.finished: + self.reschedule_joinee(task) + elif task.exc: + self.cancel_all() + self.reschedule_joinee(task) def sleep(self, seconds: int or float): """ @@ -304,9 +325,8 @@ class AsyncScheduler: """ self.events.add(event) - event.waiters.append(self.current_task) event.set = True - self.reschedule_joinee() + self.tasks.append(self.current_task) def event_wait(self, event): """ @@ -353,6 +373,7 @@ class AsyncScheduler: self.selector.register(sock, EVENT_WRITE, self.current_task) except KeyError: raise ResourceBusy("The given resource is busy!") from None + def wrap_socket(self, sock): """ Wraps a standard socket into an AsyncSocket object diff --git a/giambio/exceptions.py b/giambio/exceptions.py index c1b7e3d..a59f79e 100644 --- a/giambio/exceptions.py +++ b/giambio/exceptions.py @@ -33,7 +33,7 @@ class InternalError(GiambioError): ... -class CancelledError(BaseException): +class CancelledError(GiambioError): """ Exception raised by the giambio.objects.Task.cancel() method to terminate a child task. This should NOT be catched, or diff --git a/giambio/objects.py b/giambio/objects.py index 0c44eb5..333c8c7 100644 --- a/giambio/objects.py +++ b/giambio/objects.py @@ -32,17 +32,17 @@ class Task: coroutine: types.CoroutineType name: str - cancelled: bool = False # True if the task gets cancelled + cancelled: bool = False exc: BaseException = None result: object = None finished: bool = False status: str = "init" steps: int = 0 last_io: tuple = () - parent: object = None - joined: bool= False + joiners: list = field(default_factory=list) + joined: bool = False cancel_pending: bool = False - sleep_start: int = None + sleep_start: float = 0.0 def run(self, what=None): """ @@ -78,6 +78,8 @@ class Task: def __del__(self): self.coroutine.close() + def __hash__(self): + return hash(self.coroutine) class Event: """ diff --git a/giambio/run.py b/giambio/run.py index 535af67..ac38f40 100644 --- a/giambio/run.py +++ b/giambio/run.py @@ -17,13 +17,14 @@ limitations under the License. """ import socket +import inspect import threading from .core import AsyncScheduler from .exceptions import GiambioError from .context import TaskManager from .socket import AsyncSocket from .util.debug import BaseDebugger -from types import FunctionType, CoroutineType, GeneratorType +from types import FunctionType thread_local = threading.local() @@ -38,14 +39,16 @@ def get_event_loop(): try: return thread_local.loop except AttributeError: - raise GiambioError("no event loop set") from None + raise GiambioError("giambio is not running") from None def new_event_loop(debugger: BaseDebugger): """ Associates a new event loop to the current thread and deactivates the old one. This should not be - called explicitly unless you know what you're doing + called explicitly unless you know what you're doing. + If an event loop is currently set and it is running, + a GiambioError exception is raised """ try: @@ -54,7 +57,7 @@ def new_event_loop(debugger: BaseDebugger): thread_local.loop = AsyncScheduler(debugger) else: if not loop.done(): - raise GiambioError("cannot set event loop while running") + raise GiambioError("cannot change event loop while running") else: thread_local.loop = AsyncScheduler(debugger) @@ -64,11 +67,11 @@ def run(func: FunctionType, *args, **kwargs): Starts the event loop from a synchronous entry point """ - if isinstance(func, (CoroutineType, GeneratorType)): + if inspect.iscoroutine(func): raise GiambioError("Looks like you tried to call giambio.run(your_func(arg1, arg2, ...)), that is wrong!" "\nWhat you wanna do, instead, is this: giambio.run(your_func, arg1, arg2, ...)") elif not isinstance(func, FunctionType): - raise GiambioError("gaibmio.run() requires an async function as parameter!") + raise GiambioError("giambio.run() requires an async function as parameter!") new_event_loop(kwargs.get("debugger", None)) get_event_loop().start(func, *args) @@ -79,20 +82,15 @@ def clock(): loop """ - try: - return thread_local.loop.clock() - except AttributeError: - raise GiambioError("Cannot call clock from outside an async context") from None + return get_event_loop().clock() def wrap_socket(sock: socket.socket) -> AsyncSocket: """ Wraps a synchronous socket into a giambio.socket.AsyncSocket """ - try: - return thread_local.loop.wrap_socket(sock) - except AttributeError: - raise GiambioError("Cannot wrap a socket from outside an async context") from None + + return get_event_loop().wrap_socket(sock) def create_pool(): @@ -100,8 +98,4 @@ def create_pool(): Creates an async pool """ - try: - return TaskManager(thread_local.loop) - except AttributeError: - raise GiambioError("It appears that giambio is not running, did you call giambio.create_pool()" - " outside of an async context?") from None + return TaskManager(get_event_loop()) diff --git a/tests/events.py b/tests/events.py index 65ceb6f..f5b0e95 100644 --- a/tests/events.py +++ b/tests/events.py @@ -26,10 +26,10 @@ async def parent(pause: int = 1): print(f"[parent] Sleeping {pause} second(s) before setting the event") await giambio.sleep(pause) await event.trigger() - print("[parent] Event set, awaiting child") + print("[parent] Event set, awaiting child completion") end = giambio.clock() - start print(f"[parent] Child exited in {end} seconds") if __name__ == "__main__": - giambio.run(parent, 3) + giambio.run(parent, 3) \ No newline at end of file diff --git a/tests/sleep.py b/tests/sleep.py index 5332ef9..d4bb8e4 100644 --- a/tests/sleep.py +++ b/tests/sleep.py @@ -50,7 +50,8 @@ async def child(): print("[child] Child spawned!! Sleeping for 2 seconds") await giambio.sleep(2) print("[child] Had a nice nap!") - raise TypeError("rip") + # raise TypeError("rip") # Uncomment this line and watch the exception magically propagate! + async def child1(): print("[child 1] Child spawned!! Sleeping for 2 seconds") @@ -68,7 +69,7 @@ async def main(): except Exception as error: print(f"[main] Exception from child catched! {repr(error)}") print(f"[main] Children execution complete in {giambio.clock() - start:.2f} seconds") - await giambio.sleep(5) + if __name__ == "__main__": - giambio.run(main, debugger=Debugger()) + giambio.run(main, debugger=None)