Temporary 'meh' fix to allow exception propagation to occur, deleted old _managers.py module and refactored comments and docstrings

This commit is contained in:
nocturn9x 2020-06-19 21:39:54 +00:00
parent 094e3d50ac
commit 5c5beeef22
7 changed files with 68 additions and 148 deletions

View File

@ -3,6 +3,5 @@ __version__ = (0, 0, 1)
from ._core import AsyncScheduler
from .exceptions import GiambioError, AlreadyJoinedError, CancelledError
from ._traps import sleep
from ._managers import TaskManager
__all__ = ["AsyncScheduler", "GiambioError", "AlreadyJoinedError", "CancelledError", "TaskManager", "sleep"]

View File

@ -1,3 +1,4 @@
# Import libraries and internal resources
import types
from collections import deque
from selectors import DefaultSelector, EVENT_READ, EVENT_WRITE
@ -13,8 +14,15 @@ from ._traps import want_read, want_write
class AsyncScheduler:
"""Implementation of an event loop, alternates between execution of coroutines (asynchronous functions)
to allow a concurrency model or 'green threads'"""
"""
An asynchronous scheduler toy implementation. Tries to mimic the threaded
model in its simplicity, without using actual threads, but rather alternating
across coroutines execution to let more than one thing at a time to proceed
with its calculations. An attempt to fix the threaded model underlying pitfalls
and weaknesses has been made, without making the API unnecessarily complicated.
A few examples are tasks cancellation and exception propagation.
Can perform (unreliably) socket I/O asynchronously.
"""
def __init__(self):
"""Object constructor"""
@ -22,7 +30,7 @@ class AsyncScheduler:
self.to_run = deque() # Tasks that are ready to run
self.paused = [] # Tasks that are asleep
self.selector = DefaultSelector() # Selector object to perform I/O multiplexing
self.running = None # This will always point to the currently running coroutine (Task object)
self.current_task = None # This will always point to the currently running coroutine (Task object)
self.joined = {} # Maps child tasks that need to be joined their respective parent task
self.clock = default_timer # Monotonic clock to keep track of elapsed time reliably
self.sequence = 0 # A monotonically increasing ID to avoid some corner cases with deadlines comparison
@ -34,62 +42,69 @@ class AsyncScheduler:
give execution control to the loop itself."""
while True:
if not self.selector.get_map() and not any(deque(self.paused) + self.to_run):
if not self.selector.get_map() and not any([self.paused, self.to_run]): # If there is nothing to do, just exit
break
if not self.to_run and self.paused: # If there are sockets ready, (re)schedule their associated task
wait(max(0.0, self.paused[0][0] - self.clock())) # Sleep in order not to waste CPU cycles
while self.paused[0][0] < self.clock(): # Reschedules task when their deadline has elapsed
if not self.to_run and self.paused: # If there are no actively running tasks, we try to schedule the asleep ones
wait(max(0.0, self.paused[0][0] - self.clock())) # Sleep until the closest deadline in order not to waste CPU cycles
while self.paused[0][0] < self.clock(): # Reschedules tasks when their deadline has elapsed
_, __, task = heappop(self.paused)
self.to_run.append(task)
if not self.paused:
break
timeout = 0.0 if self.to_run else None
tasks = self.selector.select(timeout)
timeout = 0.0 if self.to_run else None # If there are no tasks ready wait indefinitely
tasks = self.selector.select(timeout) # Get sockets that are ready and schedule their tasks
for key, _ in tasks:
self.to_run.append(key.data) # Socket ready? Schedule the task
self.selector.unregister(
key.fileobj) # Once (re)scheduled, the task does not need to perform I/O multiplexing (for now)
while self.to_run:
self.running = self.to_run.popleft() # Sets the currently running task
while self.to_run: # While there are tasks to run
self.current_task = self.to_run.popleft() # Sets the currently running task
try:
method, *args = self.running.run()
method, *args = self.current_task.run() # Run a single step with the calculation
getattr(self, method)(*args) # Sneaky method call, thanks to David Beazley for this ;)
except StopIteration as e:
e = e.args[0] if e.args else None
self.running.result = e
self.running.finished = True
except StopIteration as e: # Coroutine ends
self.current_task.result = e.args[0] if e.args else None
self.current_task.finished = True
self.reschedule_parent()
except CancelledError:
self.running.cancelled = True
except CancelledError: # Coroutine was cancelled
self.current_task.cancelled = True
self.reschedule_parent()
except Exception as error:
self.running.exc = error
except Exception as error: # Coroutine raised
self.current_task.exc = error
self.reschedule_parent()
raise # Find a better way to propagate errors
def create_task(self, coro: types.coroutine):
"""Spawns a child task"""
task = Task(coro)
self.to_run.append(task)
return task
def start(self, coro: types.coroutine):
"""Starts the event loop using a coroutine as an entry point.
Equivalent to self.create_task(coro) and self.run()
"""
self.to_run.append(Task(coro))
self.create_task(coro)
self.run()
def reschedule_parent(self):
"""Reschedules the parent task"""
popped = self.joined.pop(self.running, None)
popped = self.joined.pop(self.current_task, None)
if popped:
self.to_run.append(popped)
def want_read(self, sock: socket.socket):
"""Handler for the 'want_read' event, registers the socket inside the selector to perform I/0 multiplexing"""
self.selector.register(sock, EVENT_READ, self.running)
self.selector.register(sock, EVENT_READ, self.current_task)
def want_write(self, sock: socket.socket):
"""Handler for the 'want_write' event, registers the socket inside the selector to perform I/0 multiplexing"""
self.selector.register(sock, EVENT_WRITE, self.running)
self.selector.register(sock, EVENT_WRITE, self.current_task)
def join(self, coro: types.coroutine):
"""Handler for the 'join' event, does some magic to tell the scheduler
@ -98,23 +113,22 @@ class AsyncScheduler:
parent task"""
if coro not in self.joined:
self.joined[coro] = self.running
self.joined[coro] = self.current_task
else:
raise AlreadyJoinedError("Joining the same task multiple times is not allowed!")
def sleep(self, seconds):
"""Puts a task to sleep"""
"""Puts the caller to sleep for a given amount of seconds"""
self.sequence += 1
heappush(self.paused, (self.clock() + seconds, self.sequence, self.running))
self.running = None
heappush(self.paused, (self.clock() + seconds, self.sequence, self.current_task))
def cancel(self, task):
"""Handler for the 'cancel' event, throws CancelledError inside a coroutine
in order to stop it from executing. The loop continues to execute as tasks
are independent"""
task.coroutine.throw(CancelledError)
task.throw(CancelledError)
def wrap_socket(self, sock):
"""Wraps a standard socket into an AsyncSocket object"""

View File

@ -1,5 +1,5 @@
import types
from ._traps import join, sleep, cancel
from ._traps import join, cancel
class Task:
@ -13,10 +13,15 @@ class Task:
self.result = None
self.finished = False
def run(self):
def run(self, what=None):
"""Simple abstraction layer over the coroutines ``send`` method"""
return self.coroutine.send(None)
return self.coroutine.send(what)
def throw(self, err: Exception):
"""Simple abstraction layer over the coroutines ``throw`` method"""
return self.coroutine.throw(err)
async def join(self):
"""Joins the task"""
@ -28,11 +33,6 @@ class Task:
await cancel(self)
def result(self):
if self.exc:
raise self.exc
return self.result
def __repr__(self):
"""Implements repr(self)"""

View File

@ -1,95 +0,0 @@
from collections import deque
import types
from ._layers import Task
from heapq import heappush
class TaskManager:
"""The only way to execute asynchronous code in giambio is trough a ``TaskManager`` (or one of its children classes) object used within
an ``async with`` context manager. The ``TaskManager`` is an ideal "playground" where all asynchronous code runs and where giambio's
event loop can control their execution flow.
The key feature of this mechanism is that all tasks are always joined automatically: This opens a new world of features,
allowing exceptions to propagate just as expected (exceptions are **never** discarded in giambio, unlike in some other libraries) and some other lower-level advantages.
Moreover, giambio always saves the return values so that you don't lose any important information when executing a coroutine.
There are a few concepts to explain here, though:
- The term "task" refers to a coroutine executed trough the ``TaskManager``'s methods ``spawn()`` and ``schedule()``, as well as
one executed with ``await coro()``
- If an exception occurs, all other tasks are cancelled (read more below) and the exception is later propagated in the parent task
- The concept of cancellation is a bit tricky, because there is no real way to stop a coroutine from executing without actually raising
an exception inside i. So when giambio needs to cancel a task, it just throws ``giambio.exceptions.CancelledError`` inside it and hopes for the best.
This exception inherits from ``BaseException``, which by convention means that it should not be catched. Doing so in giambio will likely break your code
and make it explode spectacularly. If you really want to catch it to perform cleanup, be sure to re-raise it when done (or to raise another unhandled exception if you want)
so that the internal loop can proceed with execution.
In general, when writing an asynchronous function, you should always consider that it might be cancelled at any time and handle that case accordingly.
"""
def __init__(self, loop):
self.values = {} # Results from each task
self.loop = loop # The event loop that spawned the TaskManager
async def _cancel_and_raise(self, exc):
"""Cancels all the tasks inside the TaskManager object and raises the exception
of the task that triggered this mechanism"""
try:
await self.loop.running.cancel()
except Exception as error:
self.loop.running.exc = error
for task in self.loop.to_run + deque(self.loop.paused):
if isinstance(task, tuple): # Sleeping task
_, _, task = task
try:
await task.cancel()
except Exception as error:
task.exc = error
raise exc
async def __aenter__(self):
return self
async def __aexit__(self, type, value, traceback):
while True:
if not any([self.loop.to_run, self.loop.paused]):
break
tasks = self.loop.to_run + deque(self.loop.paused)
task = tasks.popleft()
if isinstance(task, tuple): # Sleeping task
_, _, task = task
self.values[task] = await task.join()
if task.exc:
print(task)
await self._cancel_and_raise(task.exc)
def spawn(self, coroutine: types.coroutine):
"""Schedules a task for execution, appending it to the call stack
:param coroutine: The coroutine to spawn, please note that if you want to execute foo, you need to pass foo() as this
returns a coroutine instead of a function object
:type coroutine: types.coroutine
:returns: A ``Task`` object
:rtype: class: Task
"""
task = Task(coroutine)
self.loop.to_run.append(task)
return task
def schedule(self, coroutine: types.coroutine, n: int):
"""Schedules a task for execution after when seconds
:param coroutine: The coroutine to spawn, please note that if you want to execute foo, you need to pass foo() as this
returns a coroutine instead of a function object
:type coroutine: types.coroutine
:param n: The delay in seconds after which the task should start running
:type n: int
:returns: A ``Task`` object
:rtype: class: Task
"""
self.loop.sequence += 1
task = Task(coroutine)
heappush(self.loop.paused, (self.loop.clock() + n, self.loop.sequence, task))
return task

View File

@ -32,8 +32,7 @@ def join(task):
"""
yield "join", task
assert task.finished
return task.result()
return task.result
@types.coroutine

View File

@ -1,4 +1,4 @@
from giambio import AsyncScheduler, sleep, TaskManager
from giambio import AsyncScheduler, sleep
async def countdown(n: int):
@ -7,7 +7,7 @@ async def countdown(n: int):
n -= 1
await sleep(1)
if n == 5:
raise ValueError('lul')
raise ValueError('lul')
print("Countdown over")
@ -21,13 +21,17 @@ async def countup(stop, step: int or float = 1):
async def main():
async with TaskManager(scheduler) as manager:
manager.spawn(countdown(10))
manager.spawn(countup(5, 2))
print("Counters started, awaiting completion")
cup = scheduler.create_task(countdown(10))
cdown = scheduler.create_task(countup(5, 2))
print("Counters started, awaiting completion")
await cup.join()
await cdown.join()
print("Task execution complete")
if __name__ == "__main__":
scheduler = AsyncScheduler()
scheduler.start(main())
try:
scheduler.start(main())
except Exception:
print("main() errored!")

View File

@ -17,11 +17,10 @@ async def server(address: tuple):
sock.listen(5)
asock = sched.wrap_socket(sock)
logging.info(f"Echo server serving asynchronously at {address}")
async with giambio.TaskManager(sched) as manager:
while True:
conn, addr = await asock.accept()
logging.info(f"{addr} connected")
manager.spawn(echo_handler(conn, addr))
while True:
conn, addr = await asock.accept()
logging.info(f"{addr} connected")
task = sched.create_task(echo_handler(conn, addr))
async def echo_handler(sock: AsyncSocket, addr: tuple):