Exceptions and cancellation seem to be working (except for server.py inside tests, investigation needed)

This commit is contained in:
nocturn9x 2020-11-26 16:57:20 +01:00
parent caee01977e
commit 4618c8cc79
9 changed files with 89 additions and 72 deletions

View File

@ -247,9 +247,7 @@ async function in this case) until all children tasks have exited, and as it tur
is a good thing.
The reason why pools always wait for all children to have finished executing is that it makes
easier propagating exceptions in the parent if something goes wrong: unlike many other frameworks,
exceptions in giambio always behave as expected*
*: This is a WIP, it doesn't work right now!
exceptions in giambio always behave as expected
Ok, so, let's try running this snippet and see what we get:

View File

@ -1,4 +1,6 @@
"""
Asynchronous Python made easy (and friendly)
Copyright (C) 2020 nocturn9x
Licensed under the Apache License, Version 2.0 (the "License");

View File

@ -20,7 +20,6 @@ limitations under the License.
import types
from .core import AsyncScheduler
from .objects import Task
from .exceptions import CancelledError
class TaskManager:
@ -34,7 +33,7 @@ class TaskManager:
"""
self.loop = loop
self.tasks = []
self.tasks = [] # We store a reference to all tasks, even the asleep ones!
def spawn(self, func: types.FunctionType, *args):
"""
@ -42,10 +41,10 @@ class TaskManager:
"""
task = Task(func(*args), func.__name__ or str(func))
task.parent = self.loop.current_task
task.joiners = [self.loop.current_task]
self.loop.tasks.append(task)
self.tasks.append(task)
self.loop.debugger.on_task_spawn(task)
self.tasks.append(task)
return task
def spawn_after(self, func: types.FunctionType, n: int, *args):
@ -55,11 +54,11 @@ class TaskManager:
assert n >= 0, "The time delay can't be negative"
task = Task(func(*args), func.__name__ or str(func))
task.parent = self.loop.current_task
task.joiners = [self.loop.current_task]
task.sleep_start = self.loop.clock()
self.loop.paused.put(task, n)
self.tasks.append(task)
self.loop.debugger.on_task_schedule(task, n)
self.tasks.append(task)
return task
async def __aenter__(self):
@ -67,4 +66,4 @@ class TaskManager:
async def __aexit__(self, exc_type: Exception, exc: Exception, tb):
for task in self.tasks:
await task.join()
await task.join()

View File

@ -25,7 +25,6 @@ from .objects import Task, TimeQueue
from socket import SOL_SOCKET, SO_ERROR
from .traps import want_read, want_write
from .util.debug import BaseDebugger
from collections import deque
from itertools import chain
from .socket import AsyncSocket, WantWrite, WantRead
from selectors import DefaultSelector, EVENT_READ, EVENT_WRITE
@ -35,7 +34,6 @@ from .exceptions import (InternalError,
)
class AsyncScheduler:
"""
An asynchronous scheduler implementation. Tries to mimic the threaded
@ -54,8 +52,9 @@ class AsyncScheduler:
# The debugger object. If it is none we create a dummy object that immediately returns an empty
# lambda every time you access any of its attributes to avoid lots of if self.debugger clauses
if debugger:
assert issubclass(type(debugger), BaseDebugger), "The debugger must be a subclass of giambio.util.BaseDebugger"
self.debugger = debugger or type("DumbDebugger", (object, ), {"__getattr__": lambda *args: lambda *args: None})()
assert issubclass(type(debugger),
BaseDebugger), "The debugger must be a subclass of giambio.util.BaseDebugger"
self.debugger = debugger or type("DumbDebugger", (object,), {"__getattr__": lambda *args: lambda *arg: None})()
# Tasks that are ready to run
self.tasks = []
# Selector object to perform I/O multiplexing
@ -110,7 +109,8 @@ class AsyncScheduler:
elif not self.tasks:
# If there are no actively running tasks
# we try to schedule the asleep ones
self.awake_sleeping()
if self.paused:
self.awake_sleeping()
# The next step is checking for I/O
self.check_io()
# Try to awake event-waiting tasks
@ -121,30 +121,36 @@ class AsyncScheduler:
self.current_task = self.tasks.pop(0)
self.debugger.before_task_step(self.current_task)
if self.current_task.cancel_pending:
# We perform the deferred cancellation
# if it was previously scheduled
self.do_cancel()
if self.to_send and self.current_task.status != "init":
# A little setup to send objects from and to
# coroutines outside the event loop
data = self.to_send
else:
# The first time coroutines' method .send() wants None!
data = None
# Run a single step with the calculation
method, *args = self.current_task.run(data)
# Some debugging and internal chatter here
self.current_task.status = "run"
self.current_task.steps += 1
self.debugger.after_task_step(self.current_task)
# Data has been sent, reset it to None
# If data has been sent, reset it to None
if self.to_send and self.current_task != "init":
self.to_send = None
# Sneaky method call, thanks to David Beazley for this ;)
getattr(self, method)(*args)
except AttributeError: # If this happens, that's quite bad!
raise InternalError("Uh oh! Something very bad just happened, did"
" you try to mix primitives from other async libraries?") from None
" you try to mix primitives from other async libraries?") from None
except CancelledError:
self.current_task.status = "cancelled"
self.current_task.cancelled = True
self.current_task.cancel_pending = False
self.debugger.after_cancel(self.current_task)
self.join(self.current_task) # TODO: Investigate if a call to join() is needed
# TODO: Do we need to join?
except StopIteration as ret:
# Coroutine ends
self.current_task.status = "end"
@ -153,20 +159,21 @@ class AsyncScheduler:
self.debugger.on_task_exit(self.current_task)
self.join(self.current_task)
except BaseException as err:
# Coroutine raised
self.current_task.exc = err
self.current_task.status = "crashed"
self.join(self.current_task)
self.join(self.current_task) # This propagates the exception
def do_cancel(self, task: Task = None):
"""
Performs task cancellation by throwing CancelledError inside the current
task in order to stop it from executing. The loop continues to execute
task in order to stop it from running. The loop continues to execute
as tasks are independent
"""
task = task or self.current_task
self.debugger.before_cancel(task)
task.throw(CancelledError)
task.throw(CancelledError())
def get_running(self):
"""
@ -184,7 +191,6 @@ class AsyncScheduler:
for event in self.events.copy():
if event.set:
event.event_caught = True
event.waiters.append(self.current_task)
self.tasks.extend(event.waiters)
self.events.remove(event)
@ -239,37 +245,52 @@ class AsyncScheduler:
self.run()
self.has_ran = True
self.debugger.on_exit()
if entry.exc:
raise entry.exc
def reschedule_joinee(self, task: Task):
"""
Reschedules the joinee of the
Reschedules the parent(s) of the
given task, if any
"""
if task.parent:
self.tasks.append(task.parent)
for t in task.joiners:
if t not in self.tasks:
# Since a task can be the parent
# of multiple children, we need to
# make sure we reschedule it only
# once, otherwise a RuntimeError will
# occur
self.tasks.append(t)
def join(self, child: Task):
def cancel_all(self):
"""
Cancels all tasks in preparation for the exception
throwing from self.join
"""
for to_cancel in chain(self.tasks, self.paused):
try:
self.cancel(to_cancel)
except CancelledError:
to_cancel.status = "cancelled"
to_cancel.cancelled = True
to_cancel.cancel_pending = False
self.debugger.after_cancel(to_cancel)
self.tasks.remove(to_cancel)
def join(self, task: Task):
"""
Handler for the 'join' event, does some magic to tell the scheduler
to wait until the current coroutine ends
"""
child.joined = True
if child.finished:
self.reschedule_joinee(child)
elif child.exc:
for task in chain(self.tasks, self.paused):
try:
self.cancel(task)
except CancelledError:
task.status = "cancelled"
task.cancelled = True
task.cancel_pending = False
self.debugger.after_cancel(task)
self.tasks.remove(task)
child.parent.throw(child.exc)
self.tasks.append(child.parent)
task.joined = True
if task.finished:
self.reschedule_joinee(task)
elif task.exc:
self.cancel_all()
self.reschedule_joinee(task)
def sleep(self, seconds: int or float):
"""
@ -304,9 +325,8 @@ class AsyncScheduler:
"""
self.events.add(event)
event.waiters.append(self.current_task)
event.set = True
self.reschedule_joinee()
self.tasks.append(self.current_task)
def event_wait(self, event):
"""
@ -353,6 +373,7 @@ class AsyncScheduler:
self.selector.register(sock, EVENT_WRITE, self.current_task)
except KeyError:
raise ResourceBusy("The given resource is busy!") from None
def wrap_socket(self, sock):
"""
Wraps a standard socket into an AsyncSocket object

View File

@ -33,7 +33,7 @@ class InternalError(GiambioError):
...
class CancelledError(BaseException):
class CancelledError(GiambioError):
"""
Exception raised by the giambio.objects.Task.cancel() method
to terminate a child task. This should NOT be catched, or

View File

@ -32,17 +32,17 @@ class Task:
coroutine: types.CoroutineType
name: str
cancelled: bool = False # True if the task gets cancelled
cancelled: bool = False
exc: BaseException = None
result: object = None
finished: bool = False
status: str = "init"
steps: int = 0
last_io: tuple = ()
parent: object = None
joined: bool= False
joiners: list = field(default_factory=list)
joined: bool = False
cancel_pending: bool = False
sleep_start: int = None
sleep_start: float = 0.0
def run(self, what=None):
"""
@ -78,6 +78,8 @@ class Task:
def __del__(self):
self.coroutine.close()
def __hash__(self):
return hash(self.coroutine)
class Event:
"""

View File

@ -17,13 +17,14 @@ limitations under the License.
"""
import socket
import inspect
import threading
from .core import AsyncScheduler
from .exceptions import GiambioError
from .context import TaskManager
from .socket import AsyncSocket
from .util.debug import BaseDebugger
from types import FunctionType, CoroutineType, GeneratorType
from types import FunctionType
thread_local = threading.local()
@ -38,14 +39,16 @@ def get_event_loop():
try:
return thread_local.loop
except AttributeError:
raise GiambioError("no event loop set") from None
raise GiambioError("giambio is not running") from None
def new_event_loop(debugger: BaseDebugger):
"""
Associates a new event loop to the current thread
and deactivates the old one. This should not be
called explicitly unless you know what you're doing
called explicitly unless you know what you're doing.
If an event loop is currently set and it is running,
a GiambioError exception is raised
"""
try:
@ -54,7 +57,7 @@ def new_event_loop(debugger: BaseDebugger):
thread_local.loop = AsyncScheduler(debugger)
else:
if not loop.done():
raise GiambioError("cannot set event loop while running")
raise GiambioError("cannot change event loop while running")
else:
thread_local.loop = AsyncScheduler(debugger)
@ -64,11 +67,11 @@ def run(func: FunctionType, *args, **kwargs):
Starts the event loop from a synchronous entry point
"""
if isinstance(func, (CoroutineType, GeneratorType)):
if inspect.iscoroutine(func):
raise GiambioError("Looks like you tried to call giambio.run(your_func(arg1, arg2, ...)), that is wrong!"
"\nWhat you wanna do, instead, is this: giambio.run(your_func, arg1, arg2, ...)")
elif not isinstance(func, FunctionType):
raise GiambioError("gaibmio.run() requires an async function as parameter!")
raise GiambioError("giambio.run() requires an async function as parameter!")
new_event_loop(kwargs.get("debugger", None))
get_event_loop().start(func, *args)
@ -79,20 +82,15 @@ def clock():
loop
"""
try:
return thread_local.loop.clock()
except AttributeError:
raise GiambioError("Cannot call clock from outside an async context") from None
return get_event_loop().clock()
def wrap_socket(sock: socket.socket) -> AsyncSocket:
"""
Wraps a synchronous socket into a giambio.socket.AsyncSocket
"""
try:
return thread_local.loop.wrap_socket(sock)
except AttributeError:
raise GiambioError("Cannot wrap a socket from outside an async context") from None
return get_event_loop().wrap_socket(sock)
def create_pool():
@ -100,8 +98,4 @@ def create_pool():
Creates an async pool
"""
try:
return TaskManager(thread_local.loop)
except AttributeError:
raise GiambioError("It appears that giambio is not running, did you call giambio.create_pool()"
" outside of an async context?") from None
return TaskManager(get_event_loop())

View File

@ -26,10 +26,10 @@ async def parent(pause: int = 1):
print(f"[parent] Sleeping {pause} second(s) before setting the event")
await giambio.sleep(pause)
await event.trigger()
print("[parent] Event set, awaiting child")
print("[parent] Event set, awaiting child completion")
end = giambio.clock() - start
print(f"[parent] Child exited in {end} seconds")
if __name__ == "__main__":
giambio.run(parent, 3)
giambio.run(parent, 3)

View File

@ -50,7 +50,8 @@ async def child():
print("[child] Child spawned!! Sleeping for 2 seconds")
await giambio.sleep(2)
print("[child] Had a nice nap!")
raise TypeError("rip")
# raise TypeError("rip") # Uncomment this line and watch the exception magically propagate!
async def child1():
print("[child 1] Child spawned!! Sleeping for 2 seconds")
@ -68,7 +69,7 @@ async def main():
except Exception as error:
print(f"[main] Exception from child catched! {repr(error)}")
print(f"[main] Children execution complete in {giambio.clock() - start:.2f} seconds")
await giambio.sleep(5)
if __name__ == "__main__":
giambio.run(main, debugger=Debugger())
giambio.run(main, debugger=None)