Major refactoring, cancellation needs a fix

This commit is contained in:
nocturn9x 2020-06-16 16:56:11 +00:00
parent fb4628a1f9
commit 1b958b0341
11 changed files with 181 additions and 334 deletions

View File

@ -40,34 +40,31 @@ If you read carefully, you might now wonder: _"If a coroutine can call other cor
Enough talking though, this is how a giambio based application looks like
```python
import giambio
from giambio.socket import AsyncSocket
import socket
import logging
loop = giambio.EventLoop()
sched = giambio.AsyncScheduler()
logging.basicConfig(level=20,
format="[%(levelname)s] %(asctime)s %(message)s",
datefmt='%d/%m/%Y %p')
async def make_srv(address: tuple):
async def server(address: tuple):
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.bind(address)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.listen(5)
asock = loop.wrap_socket(sock) # This creates a socket that can be read asynchronously
asock = sched.wrap_socket(sock)
logging.info(f"Echo server serving asynchronously at {address}")
async with giambio.TaskManager(loop) as manager:
while True:
conn, addr = await asock.accept()
logging.info(f"{addr} connected")
task = manager.spawn(echo_server(conn, addr)) # This spawns a new task
while True:
conn, addr = await asock.accept()
logging.info(f"{addr} connected")
sched.create_task(echo_handler(conn, addr))
async def echo_server(sock: AsyncSocket, addr: tuple):
async def echo_handler(sock: AsyncSocket, addr: tuple):
with sock:
await sock.send_all(b"Welcome to the server pal!\n")
while True:
@ -82,10 +79,12 @@ async def echo_server(sock: AsyncSocket, addr: tuple):
logging.info(f"Connection from {addr} closed")
try:
loop.start(make_srv, ('', 1501))
except KeyboardInterrupt: # Because exceptions propagate
print("Exiting...")
if __name__ == "__main__":
sched.create_task(server(('', 25000)))
try:
sched.run()
except KeyboardInterrupt: # Exceptions propagate!
print("Exiting...")
```
### Explanation
@ -93,30 +92,9 @@ except KeyboardInterrupt: # Because exceptions propagate
Ok, let's explain this code line by line:
- First, we imported the required libraries
- Then, we created an `EventLoop` object
- Then, we created an `AsyncScheduler` object
- For the sake of this tutorial, we built the "Hello world" of network servers, an echo server. An echo server always replies to the client with the same data that it got from it
- Here comes the real fun: In our `make_srv` function, which is an `async` function, we used a Python 3 feature that might look weird and unfamiliar to newcomers (especially if you are used to asyncio or similar frameworks). Giambio takes advantage of the `async with` context manager to perform its magic: The `TaskManager` object is an ideal space where all tasks are spawned and run until they are done.
- Here comes the real fun: In our `server` function, which is an `async` function, we used the `create_task` method of the scheduler object to spawn a new task. Pretty similar to the threaded model, but there are no threads involved!
The usage of the context manager ensures lots of cool things: For example, the context manager won't exit unless _ALL the tasks inside it completed their execution_ (either cancelled, errored, or returned). This also means that because tasks are always joined automatically, you'll always get the return values of the coroutines and that exceptions will just **work as expected**.
Try using the `netcat` utility to connect to the server and instantiate multiple sessions to the server, you'll see that they are all connected simultaneously.
You don't even need to be inside the with block to spawn tasks! All you need is a reference to the `TaskManager` object, so you can even pass it as a parameter to a function and spawn tasks from another coroutine and still get all the guarantees that giambio ensures.
## Cancellation, exception propagation and return values
The only way to execute asynchronous code in giambio is trough a `TaskManager` (or one of its children classes) object used within an `async with` context manager. The `TaskManager` is an ideal "playground" where all asynchronous code runs and where the internal event loop of giambio can control its execution flow.
The key feature of this mechanism is that all tasks are always joined automatically: You'll never see a task running outside of the controlled context that giambio enforces.
Moreover, while most asynchronous frameworks out there would discard the return values from spawned tasks (in giambio "spawned" means that the coroutine was called using the `spawn()` or the `schedule()` method), here you'll never lose a single bit of information about your functions.
There are a few concepts to explain here, though:
- The term "task" refers to a coroutine executed trough the `TaskManager`'s methods `spawn()` and `schedule()`, as well as one executed with ``await coro()``
- Because of how the framework was designed, an exception in any of the task(s) inside the ``TaskManager`` will trigger the internal cancellation mechanism of giambio. All other running tasks are cancelled, read more below, and the exception(s) that caused the cancellation will be propagated inside the parent task as if you were running synchronous code
- The concept of cancellation is a bit tricky, because there is no real way to stop a coroutine from executing without actually raising an exception inside it. So when giambio needs to cancel a task, it just throws `giambio.exceptions.CancelledError` inside it and hopes for the best.
This exception inherits from `BaseException`, which by convention means that it should not be catched. Doing so in giambio will likely break your code and make it explode spectacularly; If you **really** want to catch it to perform some sort of cleanup, be sure to re-raise it when done.
In general, when writing an asynchronous function in giambio, you should always bear in mind that it
might be cancelled at any time and handle that case accordingly.

View File

@ -1,74 +0,0 @@
import giambio
loop = giambio.EventLoop()
"""
What works and what does not (5 Apr 2020 13:35)
- Run tasks concurrently: V
- Join mechanism: V
- Sleep mechanism: V
- Cancellation mechanism: V
- Exception propagation: V
- Concurrent I/O: V
- Return values of coroutines: V
- Scheduling tasks for future execution: V
- Task Spawner (context manager): V
What's left to implement:
- An event system to wake up tasks programmatically
- Lower-level primitives such as locks, queues, semaphores
- File I/O (Also, consider that Windows won't allow select to work on non-socket fds)
- Complete the AsyncSocket implementation (happy eyeballs, other methods)
- Debugging tools
"""
async def countdown(n):
try:
while n > 0:
print(f"Down {n}")
n -= 1
if n <= 2: # Test an exception that triggers only sometimes
raise ValueError
await giambio.sleep(1)
print("Countdown over")
return "Count DOWN over"
except giambio.CancelledError:
print("countdown cancelled!")
# raise Exception("Oh no!") # This will propagate
async def count(stop, step=1):
try:
x = 0
while x < stop:
print(f"Up {x}")
x += step
await giambio.sleep(step)
print("Countup over")
return "Count UP over"
except giambio.CancelledError:
print("count cancelled!")
async def main():
try:
print("Spawning countdown immediately, scheduling count for 4 secs from now")
async with giambio.TaskManager(loop) as manager:
task = manager.spawn(countdown(8))
task2 = manager.schedule(count(8, 2), 4)
# await giambio.sleep(3)
# await task.cancel() # This works, but other tasks continue running
for task, ret in manager.values.items():
print(f"Function '{task.coroutine.__name__}' at {hex(id(task.coroutine))} returned an object of type '{type(ret).__name__}': {repr(ret)}")
except Exception as e:
print(f"Actually I prefer to catch it here: {repr(e)}") # Everything works just as expected, the try/except block below won't trigger
try:
loop.start(main)
except Exception: # Exceptions climb the whole stack
print("Exceptions propagate!")

View File

@ -1,8 +1,7 @@
__author__ = "Nocturn9x aka Isgiambyy"
__version__ = (0, 0, 1)
from .core import EventLoop
from ._core import AsyncScheduler
from .exceptions import GiambioError, AlreadyJoinedError, CancelledError
from .util import TaskManager
from .traps import _sleep as sleep
from ._traps import sleep
__all__ = ["EventLoop", "GiambioError", "AlreadyJoinedError", "CancelledError", "TaskManager", "sleep"]
__all__ = ["AsyncScheduler", "GiambioError", "AlreadyJoinedError", "CancelledError", "TaskManager", "sleep"]

View File

@ -1,4 +1,3 @@
import types
from collections import deque, defaultdict
from selectors import DefaultSelector, EVENT_READ, EVENT_WRITE
@ -8,13 +7,12 @@ from .exceptions import AlreadyJoinedError, CancelledError, GiambioError
from timeit import default_timer
from time import sleep as wait
from .socket import AsyncSocket, WantRead, WantWrite
from .abstractions import Task, Result
from ._layers import Task
from socket import SOL_SOCKET, SO_ERROR
from .traps import _join, _sleep, _want_read, _want_write, _cancel
from .util import TaskManager
from ._traps import join, sleep, want_read, want_write, cancel
class EventLoop:
class AsyncScheduler:
"""Implementation of an event loop, alternates between execution of coroutines (asynchronous functions)
to allow a concurrency model or 'green threads'"""
@ -29,56 +27,43 @@ class EventLoop:
self.joined = defaultdict(list) # Tasks that want to join
self.clock = default_timer # Monotonic clock to keep track of elapsed time
self.sequence = 0 # To avoid TypeError in the (unlikely) event of two task with the same deadline we use a unique and incremental integer pushed to the queue together with the deadline and the function itself
self._exiting = False
def loop(self):
def run(self):
"""Main event loop for giambio"""
while True:
if not self.selector.get_map() and not any((self.to_run + deque(self.paused))):
if not self.selector.get_map() and not any(deque(self.paused) + self.to_run):
break
while not self.to_run: # If there are sockets ready, (re)schedule their associated task
timeout = 0.0 if self.to_run else None
tasks = self.selector.select(timeout)
for key, _ in tasks:
self.to_run.append(key.data) # Socket ready? Schedule the task
self.selector.unregister(key.fileobj) # Once (re)scheduled, the task does not need to perform I/O multiplexing (for now)
while self.to_run or self.paused:
if not self.to_run:
wait(max(0.0, self.paused[0][0] - self.clock())) # If there are no tasks ready, just do nothing
while self.paused and self.paused[0][0] < self.clock(): # Reschedules task when their timer has elapsed
_, __, coro = heappop(self.paused)
self.to_run.append(coro)
if not self.to_run and self.paused: # If there are sockets ready, (re)schedule their associated task
wait(max(0.0, self.paused[0][0] - self.clock())) # If there are no tasks ready, just do nothing
while self.paused[0][0] < self.clock(): # Reschedules task when their timer has elapsed
_, __, coro = heappop(self.paused)
self.to_run.append(coro)
if not self.paused:
break
timeout = 0.0 if self.to_run else None
tasks = self.selector.select(timeout)
for key, _ in tasks:
self.to_run.append(key.data) # Socket ready? Schedule the task
self.selector.unregister(key.fileobj) # Once (re)scheduled, the task does not need to perform I/O multiplexing (for now)
while self.to_run:
self.running = self.to_run.popleft() # Sets the currently running task
try:
method, *args = self.running.run()
getattr(self, method)(*args) # Sneaky method call, thanks to David Beazley for this ;)
self.running.steps += 1
except StopIteration as e:
self.running.execution = "FINISH"
self.running.result = Result(e.args[0] if e.args else None, None) # Saves the return value
self.to_run.extend(self.joined.pop(self.running, ())) # Reschedules the parent task
except RuntimeError:
self.to_run.extend(self.joined.pop(self.running, ())) # Reschedules the parent task
except CancelledError:
self.running.execution = "CANCELLED"
self.to_run.extend(self.joined.pop(self.running, ()))
except Exception as err:
if not self._exiting:
self.running.execution = "ERRORED"
self.running.result = Result(None, err)
self.to_run.extend(self.joined.pop(self.running, ())) # Reschedules the parent task
else:
raise
except KeyboardInterrupt:
self.running.throw(KeyboardInterrupt)
... # What to do here?
def start(self, coro: types.coroutine):
"""Starts the event loop using a coroutine as an entry point.
Equivalent to self.create_task(coro) and self.run()
"""
def start(self, coroutine: types.coroutine, *args, **kwargs):
"""Starts the event loop"""
self.to_run.append(Task(coro))
self.run()
TaskManager(self).spawn(coroutine(*args, **kwargs))
self.loop()
def want_read(self, sock: socket.socket):
"""Handler for the 'want_read' event, registers the socket inside the selector to perform I/0 multiplexing"""
@ -100,7 +85,7 @@ class EventLoop:
from the socket
"""
await _want_read(sock)
await want_read(sock)
return sock.recv(buffer)
async def accept_sock(self, sock: socket.socket):
@ -108,24 +93,24 @@ class EventLoop:
result of the accept() call
"""
await _want_read(sock)
await want_read(sock)
return sock.accept()
async def sock_sendall(self, sock: socket.socket, data: bytes):
"""Sends all the passed data, as bytes, trough the socket asynchronously"""
while data:
await _want_write(sock)
await want_write(sock)
sent_no = sock.send(data)
data = data[sent_no:]
async def close_sock(self, sock: socket.socket):
"""Closes the socket asynchronously"""
await _want_write(sock)
await want_write(sock)
return sock.close()
def want_join(self, coro: types.coroutine):
def join(self, coro: types.coroutine):
"""Handler for the 'want_join' event, does some magic to tell the scheduler
to wait until the passed coroutine ends. The result of this call equals whatever the
coroutine returns or, if an exception gets raised, the exception will get propagated inside the
@ -135,29 +120,34 @@ class EventLoop:
if coro not in self.joined:
self.joined[coro].append(self.running)
else:
self.running.throw(AlreadyJoinedError("Joining the same task multiple times is not allowed!"))
raise AlreadyJoinedError("Joining the same task multiple times is not allowed!")
def want_sleep(self, seconds):
if seconds > 0: # If seconds <= 0 this function just acts as a checkpoint
self.sequence += 1 # Make this specific sleeping task unique to avoid error when comparing identical deadlines
heappush(self.paused, (self.clock() + seconds, self.sequence, self.running))
else:
self.to_run.append(self.running) # Reschedule the task that called sleep
def sleep(self, seconds):
"""Puts a task to sleep"""
def want_cancel(self, task):
self.to_run.extend(self.joined.pop(self.running, ()))
self.to_run.append(self.running) # Reschedules the parent task
try:
task.throw(CancelledError())
except Exception as e:
task.result.exc = e
self.sequence += 1 # Make this specific sleeping task unique to avoid error when comparing identical deadlines
heappush(self.paused, (self.clock() + seconds, self.sequence, self.running))
self.running = None
def cancel(self, task):
"""Cancels a task"""
self.running.coroutine.throw(CancelledError(task))
async def connect_sock(self, sock: socket.socket, addr: tuple):
"""Connects a socket asynchronously"""
try: # "Borrowed" from curio
result = sock.connect(addr)
return result
return sock.connect(addr)
except WantWrite:
await _want_write(sock)
await want_write(sock)
err = sock.getsockopt(SOL_SOCKET, SO_ERROR)
if err != 0:
raise OSError(err, f'Connect call failed: {addr}')
def create_task(self, coro: types.coroutine):
"""Creates a task and appends it to call stack"""
task = Task(coro)
self.to_run.append(task)
return task

28
giambio/_layers.py Normal file
View File

@ -0,0 +1,28 @@
import types
from ._traps import join, sleep, cancel
from .exceptions import CancelledError
class Task:
"""A simple wrapper around a coroutine object"""
def __init__(self, coroutine: types.coroutine):
self.coroutine = coroutine
self.joined = False # True if the task is joined
self.cancelled = False # True if the task gets cancelled
def run(self):
"""Simple abstraction layer over the coroutines ``send`` method"""
return self.coroutine.send(None)
async def join(self):
"""Joins the task"""
return await join(self)
async def cancel(self):
"""Cancels the task"""
await cancel(self)

View File

@ -1,11 +1,12 @@
"""Helper methods to interact with the event loop"""
import types
from .exceptions import CancelledError
import socket
@types.coroutine
def _sleep(seconds: int):
def sleep(seconds: int):
"""Pause the execution of a coroutine for the passed amount of seconds,
without blocking the entire event loop, which keeps watching for other events
@ -20,33 +21,10 @@ def _sleep(seconds: int):
:type seconds: int
"""
yield "want_sleep", seconds
yield "sleep", seconds
@types.coroutine
def _want_read(sock: socket.socket):
"""'Tells' the event loop that there is some coroutine that wants to read from the passed socket
:param sock: The socket to perform the operation on
:type sock: class: socket.socket
"""
yield "want_read", sock
@types.coroutine
def _want_write(sock: socket.socket):
"""'Tells' the event loop that there is some coroutine that wants to write into the passed socket
:param sock: The socket to perform the operation on
:type sock: class: socket.socket
"""
yield "want_write", sock
@types.coroutine
def _join(task, silent=False):
def join(task, silent=False):
"""'Tells' the scheduler that the desired task MUST be awaited for completion
If silent is True, any exception in the child task will be discarded
@ -56,13 +34,11 @@ def _join(task, silent=False):
:type silent: bool, optional
"""
task.joined = True
yield "want_join", task
return task.get_result(silent)
return (yield "join", task)
@types.coroutine
def _cancel(task):
def cancel(task):
"""'Tells' the scheduler that the passed task must be cancelled
The concept of cancellation here is tricky, because there is no real way to 'stop' a
@ -72,5 +48,27 @@ def _cancel(task):
be cancelled at any time
"""
task.cancelled = True
yield "want_cancel", task
yield "cancel", task
@types.coroutine
def want_read(sock: socket.socket):
"""'Tells' the event loop that there is some coroutine that wants to read fr >
:param sock: The socket to perform the operation on
:type sock: class: socket.socket
"""
yield "want_read", sock
@types.coroutine
def want_write(sock: socket.socket):
"""'Tells' the event loop that there is some coroutine that wants to write i >
:param sock: The socket to perform the operation on
:type sock: class: socket.socket
"""
yield "want_write", sock

View File

@ -1,57 +0,0 @@
import types
from .traps import _join, _cancel, _sleep
class Result:
"""A wrapper for results of coroutines"""
def __init__(self, val=None, exc: Exception = None):
self.val = val
self.exc = exc
def __repr__(self):
return f"giambio.core.Result({self.val}, {repr(self.exc)})"
class Task:
"""A simple wrapper around a coroutine object"""
def __init__(self, coroutine: types.coroutine, loop):
self.coroutine = coroutine
self.joined = False # True if the task is joined
self.result = None # Updated when the coroutine execution ends
self.loop = loop # The EventLoop object that spawned the task
self.cancelled = False # True if the task gets cancelled
self.execution = "INIT" # Is set to 'FINISH' when the task ends
self.steps = 0 # How many steps did the task run before ending, incremented while executing
def run(self):
"""Simple abstraction layer over the coroutines ``send`` method"""
self.steps += 1
self.execution = "RUNNING"
return self.coroutine.send(None)
def __repr__(self):
return f"giambio.core.Task({self.coroutine}, {self.status}, {self.joined}, {self.result})"
def throw(self, exception: Exception):
"""Simple abstraction layer over the coroutines ``throw`` method"""
self.result = Result(None, exception)
return self.coroutine.throw(exception)
async def cancel(self):
"""Cancels the task, throwing inside it a ``giambio.exceptions.CancelledError`` exception
and discarding whatever the function could return"""
await _sleep(0) # Switch tasks (_sleep with 0 as delay merely acts as a checkpoint) or everything breaks: is it a good solution?
return await _cancel(self)
async def join(self, silent=False):
return await _join(self, silent)
def get_result(self, silenced=False):
return self.result

View File

@ -1,51 +0,0 @@
from collections import deque
from .exceptions import CancelledError
import types
from .abstractions import Task
from heapq import heappush
class TaskManager:
"""Class to be used inside context managers to spawn multiple tasks and be sure that they will all joined before the code exits the with block"""
def __init__(self, loop, silent=False):
self.tasks = deque() # All tasks spawned
self.values = {} # Results OR exceptions of each task
self.loop = loop # The event loop that spawned the TaskManager
self.silent = silent # Make exceptions silent? (not recommended)
async def _cancel_and_raise(self, exc):
if not isinstance(exc, CancelledError):
self.loop._exiting = True # Tells the loop not to catch all exceptions
for task in self.tasks:
await task.cancel()
raise exc
async def __aenter__(self):
return self
async def __aexit__(self, type, value, traceback):
while self.tasks:
task = self.tasks.popleft()
self.values[task] = await task.join()
if task.result.exc:
await self._cancel_and_raise(task.result.exc)
def spawn(self, coroutine: types.coroutine):
"""Schedules a task for execution, appending it to the call stack"""
task = Task(coroutine, self)
self.loop.to_run.append(task)
self.tasks.append(task)
return task
def schedule(self, coroutine: types.coroutine, when: int):
"""Schedules a task for execution after n seconds"""
self.loop.sequence += 1
task = Task(coroutine, self)
self.tasks.append(task)
heappush(self.loop.paused, (self.loop.clock() + when, self.loop.sequence, task))
return task

View File

@ -5,7 +5,7 @@ with open("README.md", "r") as readme:
setuptools.setup(
name="GiambIO",
version="0.0.1",
version="1.0",
author="Nocturn9x aka IsGiambyy",
author_email="hackhab@gmail.com",
description="Asynchronous Python made easy (and friendly)",

36
tests/count.py Normal file
View File

@ -0,0 +1,36 @@
from giambio import AsyncScheduler, sleep, CancelledError
import time
async def countdown(n: int):
while n > 0:
print(f"Down {n}")
n -= 1
await sleep(1)
print("Countdown over")
async def countup(stop, step: int or float = 1):
x = 0
while x < stop:
print(f"Up {x}")
x += 1
await sleep(step)
print("Countup over")
async def main():
counter = scheduler.create_task(countup(5, 4))
counter2 = scheduler.create_task(countdown(20))
print("Counters started, awaiting completion")
await sleep(4)
print("4 seconds have passed, killing countup task")
await counter.cancel()
await counter.join()
await counter2.join()
print("Task execution complete")
if __name__ == "__main__":
scheduler = AsyncScheduler()
scheduler.start(main())

View File

@ -3,28 +3,26 @@ from giambio.socket import AsyncSocket
import socket
import logging
loop = giambio.core.EventLoop()
sched = giambio.AsyncScheduler()
logging.basicConfig(level=20,
format="[%(levelname)s] %(asctime)s %(message)s",
datefmt='%d/%m/%Y %p')
async def make_srv(address: tuple):
async def server(address: tuple):
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.bind(address)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.listen(5)
asock = loop.wrap_socket(sock)
asock = sched.wrap_socket(sock)
logging.info(f"Echo server serving asynchronously at {address}")
async with giambio.TaskManager(loop) as manager:
while True:
conn, addr = await asock.accept()
logging.info(f"{addr} connected")
task = manager.spawn(echo_server(conn, addr))
while True:
conn, addr = await asock.accept()
logging.info(f"{addr} connected")
task = sched.create_task(echo_handler(conn, addr))
async def echo_server(sock: AsyncSocket, addr: tuple):
async def echo_handler(sock: AsyncSocket, addr: tuple):
with sock:
await sock.send_all(b"Welcome to the server pal!\n")
while True:
@ -39,7 +37,9 @@ async def echo_server(sock: AsyncSocket, addr: tuple):
logging.info(f"Connection from {addr} closed")
try:
loop.start(make_srv, ('', 1501))
except KeyboardInterrupt: # Exceptions propagate!
print("Exiting...")
if __name__ == "__main__":
sched.create_task(server(('', 25000)))
try:
sched.run()
except KeyboardInterrupt: # Exceptions propagate!
print("Exiting...")