Various minor fixes

This commit is contained in:
nocturn9x 2020-06-17 16:23:43 +02:00
parent d16c0932ac
commit 094e3d50ac
6 changed files with 65 additions and 65 deletions

View File

@ -3,54 +3,57 @@ from collections import deque
from selectors import DefaultSelector, EVENT_READ, EVENT_WRITE from selectors import DefaultSelector, EVENT_READ, EVENT_WRITE
from heapq import heappush, heappop from heapq import heappush, heappop
import socket import socket
from .exceptions import AlreadyJoinedError, CancelledError, GiambioError from .exceptions import AlreadyJoinedError, CancelledError
from timeit import default_timer from timeit import default_timer
from time import sleep as wait from time import sleep as wait
from .socket import AsyncSocket, WantRead, WantWrite from .socket import AsyncSocket, WantWrite
from ._layers import Task from ._layers import Task
from socket import SOL_SOCKET, SO_ERROR from socket import SOL_SOCKET, SO_ERROR
from ._traps import join, sleep, want_read, want_write, cancel from ._traps import want_read, want_write
class AsyncScheduler: class AsyncScheduler:
"""Implementation of an event loop, alternates between execution of coroutines (asynchronous functions) """Implementation of an event loop, alternates between execution of coroutines (asynchronous functions)
to allow a concurrency model or 'green threads'""" to allow a concurrency model or 'green threads'"""
def __init__(self): def __init__(self):
"""Object constructor""" """Object constructor"""
self.to_run = deque() # Scheduled tasks self.to_run = deque() # Tasks that are ready to run
self.paused = [] # Sleeping tasks self.paused = [] # Tasks that are asleep
self.selector = DefaultSelector() # Selector object to perform I/O multiplexing self.selector = DefaultSelector() # Selector object to perform I/O multiplexing
self.running = None # This will always point to the currently running coroutine (Task object) self.running = None # This will always point to the currently running coroutine (Task object)
self.joined = {} # Tasks that want to join self.joined = {} # Maps child tasks that need to be joined their respective parent task
self.clock = default_timer # Monotonic clock to keep track of elapsed time self.clock = default_timer # Monotonic clock to keep track of elapsed time reliably
self.sequence = 0 # To avoid TypeError in the (unlikely) event of two task with the same deadline we use a unique and incremental integer pushed to the queue together with the deadline and the function itself self.sequence = 0 # A monotonically increasing ID to avoid some corner cases with deadlines comparison
def run(self): def run(self):
"""Main event loop for giambio""" """Starts the loop and 'listens' for events until there are either ready or asleep tasks
then exit. This behavior kinda reflects a kernel, as coroutines can request
the loop's functionality only trough some fixed entry points, which in turn yield and
give execution control to the loop itself."""
while True: while True:
if not self.selector.get_map() and not any(deque(self.paused) + self.to_run): if not self.selector.get_map() and not any(deque(self.paused) + self.to_run):
break break
if not self.to_run and self.paused: # If there are sockets ready, (re)schedule their associated task if not self.to_run and self.paused: # If there are sockets ready, (re)schedule their associated task
wait(max(0.0, self.paused[0][0] - self.clock())) # If there are no tasks ready, just do nothing wait(max(0.0, self.paused[0][0] - self.clock())) # Sleep in order not to waste CPU cycles
while self.paused[0][0] < self.clock(): # Reschedules task when their timer has elapsed while self.paused[0][0] < self.clock(): # Reschedules task when their deadline has elapsed
_, __, coro = heappop(self.paused) _, __, task = heappop(self.paused)
self.to_run.append(coro) self.to_run.append(task)
if not self.paused: if not self.paused:
break break
timeout = 0.0 if self.to_run else None timeout = 0.0 if self.to_run else None
tasks = self.selector.select(timeout) tasks = self.selector.select(timeout)
for key, _ in tasks: for key, _ in tasks:
self.to_run.append(key.data) # Socket ready? Schedule the task self.to_run.append(key.data) # Socket ready? Schedule the task
self.selector.unregister(key.fileobj) # Once (re)scheduled, the task does not need to perform I/O multiplexing (for now) self.selector.unregister(
key.fileobj) # Once (re)scheduled, the task does not need to perform I/O multiplexing (for now)
while self.to_run: while self.to_run:
self.running = self.to_run.popleft() # Sets the currently running task self.running = self.to_run.popleft() # Sets the currently running task
try: try:
method, *args = self.running.run() method, *args = self.running.run()
getattr(self, method)(*args) # Sneaky method call, thanks to David Beazley for this ;) getattr(self, method)(*args) # Sneaky method call, thanks to David Beazley for this ;)
except StopIteration as e: except StopIteration as e:
e = e.args[0] if e.args else None e = e.args[0] if e.args else None
self.running.result = e self.running.result = e
@ -88,6 +91,31 @@ class AsyncScheduler:
self.selector.register(sock, EVENT_WRITE, self.running) self.selector.register(sock, EVENT_WRITE, self.running)
def join(self, coro: types.coroutine):
"""Handler for the 'join' event, does some magic to tell the scheduler
to wait until the passed coroutine ends. The result of this call equals whatever the
coroutine returns or, if an exception gets raised, the exception will get propagated inside the
parent task"""
if coro not in self.joined:
self.joined[coro] = self.running
else:
raise AlreadyJoinedError("Joining the same task multiple times is not allowed!")
def sleep(self, seconds):
"""Puts a task to sleep"""
self.sequence += 1
heappush(self.paused, (self.clock() + seconds, self.sequence, self.running))
self.running = None
def cancel(self, task):
"""Handler for the 'cancel' event, throws CancelledError inside a coroutine
in order to stop it from executing. The loop continues to execute as tasks
are independent"""
task.coroutine.throw(CancelledError)
def wrap_socket(self, sock): def wrap_socket(self, sock):
"""Wraps a standard socket into an AsyncSocket object""" """Wraps a standard socket into an AsyncSocket object"""
@ -123,44 +151,13 @@ class AsyncScheduler:
await want_write(sock) await want_write(sock)
return sock.close() return sock.close()
def join(self, coro: types.coroutine):
"""Handler for the 'want_join' event, does some magic to tell the scheduler
to wait until the passed coroutine ends. The result of this call equals whatever the
coroutine returns or, if an exception gets raised, the exception will get propagated inside the
parent task"""
if coro not in self.joined:
self.joined[coro] = self.running
else:
raise AlreadyJoinedError("Joining the same task multiple times is not allowed!")
def sleep(self, seconds):
"""Puts a task to sleep"""
self.sequence += 1 # Make this specific sleeping task unique to avoid error when comparing identical deadlines
heappush(self.paused, (self.clock() + seconds, self.sequence, self.running))
self.running = None
def cancel(self, task):
"""Cancels a task"""
task.coroutine.throw(CancelledError)
async def connect_sock(self, sock: socket.socket, addr: tuple): async def connect_sock(self, sock: socket.socket, addr: tuple):
"""Connects a socket asynchronously""" """Connects a socket asynchronously"""
try: # "Borrowed" from curio try: # "Borrowed" from curio
return sock.connect(addr) return sock.connect(addr)
except WantWrite: except WantWrite:
await want_write(sock) await want_write(sock)
err = sock.getsockopt(SOL_SOCKET, SO_ERROR) err = sock.getsockopt(SOL_SOCKET, SO_ERROR)
if err != 0: if err != 0:
raise OSError(err, f'Connect call failed: {addr}') raise OSError(err, f'Connect call failed: {addr}')
def create_task(self, coro: types.coroutine):
"""Creates a task and appends it to call stack"""
task = Task(coro)
self.to_run.append(task)
return task

View File

@ -28,6 +28,11 @@ class Task:
await cancel(self) await cancel(self)
def result(self):
if self.exc:
raise self.exc
return self.result
def __repr__(self): def __repr__(self):
"""Implements repr(self)""" """Implements repr(self)"""

View File

@ -2,8 +2,6 @@ from collections import deque
import types import types
from ._layers import Task from ._layers import Task
from heapq import heappush from heapq import heappush
from ._traps import sleep
import time
class TaskManager: class TaskManager:
@ -28,8 +26,6 @@ class TaskManager:
In general, when writing an asynchronous function, you should always consider that it might be cancelled at any time and handle that case accordingly. In general, when writing an asynchronous function, you should always consider that it might be cancelled at any time and handle that case accordingly.
""" """
def __init__(self, loop): def __init__(self, loop):
self.values = {} # Results from each task self.values = {} # Results from each task
self.loop = loop # The event loop that spawned the TaskManager self.loop = loop # The event loop that spawned the TaskManager

View File

@ -1,7 +1,6 @@
"""Helper methods to interact with the event loop""" """Helper methods to interact with the event loop"""
import types import types
from .exceptions import CancelledError
import socket import socket
@ -23,6 +22,7 @@ def sleep(seconds: int):
yield "sleep", seconds yield "sleep", seconds
@types.coroutine @types.coroutine
def join(task): def join(task):
"""'Tells' the scheduler that the desired task MUST be awaited for completion """'Tells' the scheduler that the desired task MUST be awaited for completion
@ -33,7 +33,8 @@ def join(task):
yield "join", task yield "join", task
assert task.finished assert task.finished
return task.result return task.result()
@types.coroutine @types.coroutine
def cancel(task): def cancel(task):
@ -49,9 +50,10 @@ def cancel(task):
yield "cancel", task yield "cancel", task
assert task.cancelled assert task.cancelled
@types.coroutine @types.coroutine
def want_read(sock: socket.socket): def want_read(sock: socket.socket):
"""'Tells' the event loop that there is some coroutine that wants to read fr > """'Tells' the event loop that there is some coroutine that wants to read from the given socket
:param sock: The socket to perform the operation on :param sock: The socket to perform the operation on
:type sock: class: socket.socket :type sock: class: socket.socket
@ -62,7 +64,7 @@ def want_read(sock: socket.socket):
@types.coroutine @types.coroutine
def want_write(sock: socket.socket): def want_write(sock: socket.socket):
"""'Tells' the event loop that there is some coroutine that wants to write i > """'Tells' the event loop that there is some coroutine that wants to write on the given socket
:param sock: The socket to perform the operation on :param sock: The socket to perform the operation on
:type sock: class: socket.socket :type sock: class: socket.socket

View File

@ -1,5 +1,4 @@
from giambio import AsyncScheduler, sleep, TaskManager from giambio import AsyncScheduler, sleep, TaskManager
import time
async def countdown(n: int): async def countdown(n: int):

View File

@ -17,10 +17,12 @@ async def server(address: tuple):
sock.listen(5) sock.listen(5)
asock = sched.wrap_socket(sock) asock = sched.wrap_socket(sock)
logging.info(f"Echo server serving asynchronously at {address}") logging.info(f"Echo server serving asynchronously at {address}")
while True: async with giambio.TaskManager(sched) as manager:
conn, addr = await asock.accept() while True:
logging.info(f"{addr} connected") conn, addr = await asock.accept()
task = sched.create_task(echo_handler(conn, addr)) logging.info(f"{addr} connected")
manager.spawn(echo_handler(conn, addr))
async def echo_handler(sock: AsyncSocket, addr: tuple): async def echo_handler(sock: AsyncSocket, addr: tuple):
with sock: with sock:
@ -38,8 +40,7 @@ async def echo_handler(sock: AsyncSocket, addr: tuple):
if __name__ == "__main__": if __name__ == "__main__":
sched.create_task(server(('', 25000)))
try: try:
sched.run() sched.start(server(('', 25000)))
except KeyboardInterrupt: # Exceptions propagate! except KeyboardInterrupt: # Exceptions propagate!
print("Exiting...") print("Exiting...")