diff --git a/giambio/_core.py b/giambio/_core.py index c460ebf..9472b0c 100644 --- a/giambio/_core.py +++ b/giambio/_core.py @@ -3,54 +3,57 @@ from collections import deque from selectors import DefaultSelector, EVENT_READ, EVENT_WRITE from heapq import heappush, heappop import socket -from .exceptions import AlreadyJoinedError, CancelledError, GiambioError +from .exceptions import AlreadyJoinedError, CancelledError from timeit import default_timer from time import sleep as wait -from .socket import AsyncSocket, WantRead, WantWrite +from .socket import AsyncSocket, WantWrite from ._layers import Task from socket import SOL_SOCKET, SO_ERROR -from ._traps import join, sleep, want_read, want_write, cancel +from ._traps import want_read, want_write class AsyncScheduler: - """Implementation of an event loop, alternates between execution of coroutines (asynchronous functions) to allow a concurrency model or 'green threads'""" def __init__(self): """Object constructor""" - self.to_run = deque() # Scheduled tasks - self.paused = [] # Sleeping tasks + self.to_run = deque() # Tasks that are ready to run + self.paused = [] # Tasks that are asleep self.selector = DefaultSelector() # Selector object to perform I/O multiplexing self.running = None # This will always point to the currently running coroutine (Task object) - self.joined = {} # Tasks that want to join - self.clock = default_timer # Monotonic clock to keep track of elapsed time - self.sequence = 0 # To avoid TypeError in the (unlikely) event of two task with the same deadline we use a unique and incremental integer pushed to the queue together with the deadline and the function itself + self.joined = {} # Maps child tasks that need to be joined their respective parent task + self.clock = default_timer # Monotonic clock to keep track of elapsed time reliably + self.sequence = 0 # A monotonically increasing ID to avoid some corner cases with deadlines comparison def run(self): - """Main event loop for giambio""" + """Starts the loop and 'listens' for events until there are either ready or asleep tasks + then exit. This behavior kinda reflects a kernel, as coroutines can request + the loop's functionality only trough some fixed entry points, which in turn yield and + give execution control to the loop itself.""" while True: if not self.selector.get_map() and not any(deque(self.paused) + self.to_run): break if not self.to_run and self.paused: # If there are sockets ready, (re)schedule their associated task - wait(max(0.0, self.paused[0][0] - self.clock())) # If there are no tasks ready, just do nothing - while self.paused[0][0] < self.clock(): # Reschedules task when their timer has elapsed - _, __, coro = heappop(self.paused) - self.to_run.append(coro) - if not self.paused: - break + wait(max(0.0, self.paused[0][0] - self.clock())) # Sleep in order not to waste CPU cycles + while self.paused[0][0] < self.clock(): # Reschedules task when their deadline has elapsed + _, __, task = heappop(self.paused) + self.to_run.append(task) + if not self.paused: + break timeout = 0.0 if self.to_run else None tasks = self.selector.select(timeout) for key, _ in tasks: self.to_run.append(key.data) # Socket ready? Schedule the task - self.selector.unregister(key.fileobj) # Once (re)scheduled, the task does not need to perform I/O multiplexing (for now) + self.selector.unregister( + key.fileobj) # Once (re)scheduled, the task does not need to perform I/O multiplexing (for now) while self.to_run: self.running = self.to_run.popleft() # Sets the currently running task try: method, *args = self.running.run() - getattr(self, method)(*args) # Sneaky method call, thanks to David Beazley for this ;) + getattr(self, method)(*args) # Sneaky method call, thanks to David Beazley for this ;) except StopIteration as e: e = e.args[0] if e.args else None self.running.result = e @@ -88,6 +91,31 @@ class AsyncScheduler: self.selector.register(sock, EVENT_WRITE, self.running) + def join(self, coro: types.coroutine): + """Handler for the 'join' event, does some magic to tell the scheduler + to wait until the passed coroutine ends. The result of this call equals whatever the + coroutine returns or, if an exception gets raised, the exception will get propagated inside the + parent task""" + + if coro not in self.joined: + self.joined[coro] = self.running + else: + raise AlreadyJoinedError("Joining the same task multiple times is not allowed!") + + def sleep(self, seconds): + """Puts a task to sleep""" + + self.sequence += 1 + heappush(self.paused, (self.clock() + seconds, self.sequence, self.running)) + self.running = None + + def cancel(self, task): + """Handler for the 'cancel' event, throws CancelledError inside a coroutine + in order to stop it from executing. The loop continues to execute as tasks + are independent""" + + task.coroutine.throw(CancelledError) + def wrap_socket(self, sock): """Wraps a standard socket into an AsyncSocket object""" @@ -123,44 +151,13 @@ class AsyncScheduler: await want_write(sock) return sock.close() - def join(self, coro: types.coroutine): - """Handler for the 'want_join' event, does some magic to tell the scheduler - to wait until the passed coroutine ends. The result of this call equals whatever the - coroutine returns or, if an exception gets raised, the exception will get propagated inside the - parent task""" - - - if coro not in self.joined: - self.joined[coro] = self.running - else: - raise AlreadyJoinedError("Joining the same task multiple times is not allowed!") - - def sleep(self, seconds): - """Puts a task to sleep""" - - self.sequence += 1 # Make this specific sleeping task unique to avoid error when comparing identical deadlines - heappush(self.paused, (self.clock() + seconds, self.sequence, self.running)) - self.running = None - - def cancel(self, task): - """Cancels a task""" - - task.coroutine.throw(CancelledError) - async def connect_sock(self, sock: socket.socket, addr: tuple): """Connects a socket asynchronously""" - try: # "Borrowed" from curio + try: # "Borrowed" from curio return sock.connect(addr) except WantWrite: await want_write(sock) err = sock.getsockopt(SOL_SOCKET, SO_ERROR) if err != 0: raise OSError(err, f'Connect call failed: {addr}') - - def create_task(self, coro: types.coroutine): - """Creates a task and appends it to call stack""" - - task = Task(coro) - self.to_run.append(task) - return task diff --git a/giambio/_layers.py b/giambio/_layers.py index 050acac..5530af6 100644 --- a/giambio/_layers.py +++ b/giambio/_layers.py @@ -28,6 +28,11 @@ class Task: await cancel(self) + def result(self): + if self.exc: + raise self.exc + return self.result + def __repr__(self): """Implements repr(self)""" diff --git a/giambio/_managers.py b/giambio/_managers.py index 929002c..2b3231a 100644 --- a/giambio/_managers.py +++ b/giambio/_managers.py @@ -2,8 +2,6 @@ from collections import deque import types from ._layers import Task from heapq import heappush -from ._traps import sleep -import time class TaskManager: @@ -28,8 +26,6 @@ class TaskManager: In general, when writing an asynchronous function, you should always consider that it might be cancelled at any time and handle that case accordingly. """ - - def __init__(self, loop): self.values = {} # Results from each task self.loop = loop # The event loop that spawned the TaskManager diff --git a/giambio/_traps.py b/giambio/_traps.py index f3b03c9..564a9fb 100644 --- a/giambio/_traps.py +++ b/giambio/_traps.py @@ -1,7 +1,6 @@ """Helper methods to interact with the event loop""" import types -from .exceptions import CancelledError import socket @@ -23,6 +22,7 @@ def sleep(seconds: int): yield "sleep", seconds + @types.coroutine def join(task): """'Tells' the scheduler that the desired task MUST be awaited for completion @@ -33,7 +33,8 @@ def join(task): yield "join", task assert task.finished - return task.result + return task.result() + @types.coroutine def cancel(task): @@ -49,9 +50,10 @@ def cancel(task): yield "cancel", task assert task.cancelled + @types.coroutine def want_read(sock: socket.socket): - """'Tells' the event loop that there is some coroutine that wants to read fr > + """'Tells' the event loop that there is some coroutine that wants to read from the given socket :param sock: The socket to perform the operation on :type sock: class: socket.socket @@ -62,7 +64,7 @@ def want_read(sock: socket.socket): @types.coroutine def want_write(sock: socket.socket): - """'Tells' the event loop that there is some coroutine that wants to write i > + """'Tells' the event loop that there is some coroutine that wants to write on the given socket :param sock: The socket to perform the operation on :type sock: class: socket.socket diff --git a/tests/count.py b/tests/count.py index f649816..64ca9cc 100644 --- a/tests/count.py +++ b/tests/count.py @@ -1,5 +1,4 @@ from giambio import AsyncScheduler, sleep, TaskManager -import time async def countdown(n: int): diff --git a/tests/server.py b/tests/server.py index f0301e2..9c9eca8 100644 --- a/tests/server.py +++ b/tests/server.py @@ -17,10 +17,12 @@ async def server(address: tuple): sock.listen(5) asock = sched.wrap_socket(sock) logging.info(f"Echo server serving asynchronously at {address}") - while True: - conn, addr = await asock.accept() - logging.info(f"{addr} connected") - task = sched.create_task(echo_handler(conn, addr)) + async with giambio.TaskManager(sched) as manager: + while True: + conn, addr = await asock.accept() + logging.info(f"{addr} connected") + manager.spawn(echo_handler(conn, addr)) + async def echo_handler(sock: AsyncSocket, addr: tuple): with sock: @@ -38,8 +40,7 @@ async def echo_handler(sock: AsyncSocket, addr: tuple): if __name__ == "__main__": - sched.create_task(server(('', 25000))) try: - sched.run() + sched.start(server(('', 25000))) except KeyboardInterrupt: # Exceptions propagate! print("Exiting...")