import types from collections import deque, defaultdict from selectors import DefaultSelector, EVENT_READ, EVENT_WRITE from heapq import heappush, heappop import socket from .exceptions import AlreadyJoinedError, CancelledError import traceback from timeit import default_timer from time import sleep as wait from .socket import AsyncSocket class EventLoop: """Implementation of an event loop, alternates between execution of coroutines (asynchronous functions) to allow a concurrency model or 'green threads'""" def __init__(self): """Object constructor""" self.to_run = deque() # Scheduled tasks self.paused = [] # Sleeping tasks self.selector = DefaultSelector() # Selector object to perform I/O multiplexing self.running = None # This will always point to the currently running coroutine (Task object) self.joined = defaultdict(list) # Tasks that want to join self.clock = default_timer # Monotonic clock to keep track of elapsed time self.sequence = 0 # To avoid TypeError in the (unlikely) event of two task with the same deadline we use a unique and incremental integer pushed to the queue together with the deadline and the function itself def loop(self): """Main event loop for giambio""" while True: if not self.selector.get_map() and not any((self.to_run + deque(self.paused))): break while self.selector.get_map(): # If there are sockets ready, (re)schedule their associated task timeout = 0.0 if self.to_run else None tasks = deque(self.selector.select(timeout)) for key, _ in tasks: self.to_run.append(key.data) # Socket ready? Schedule the task self.selector.unregister(key.fileobj) # Once (re)scheduled, the task does not need to perform I/O multiplexing (for now) while self.to_run or self.paused: if not self.to_run: wait(max(0.0, self.paused[0][0] - self.clock())) # If there are no tasks ready, just do nothing while self.paused and self.paused[0][0] < self.clock(): # Reschedules task when their timer has elapsed _, __, coro = heappop(self.paused) self.to_run.append(coro) self.running = self.to_run.popleft() # Sets the currently running task try: method, *args = self.running.run() # Sneaky method call, thanks to David Beazley for this ;) getattr(self, method)(*args) except StopIteration as e: # TODO: Fix this return mechanism, it looks like the return value of the child task gets "replaced" by None at some point self.running.result = Result(e.args[0] if e.args else None, None) # Saves the return value self.to_run.extend(self.joined.pop(self.running, ())) # Reschedules the parent task except Exception as has_raised: self.to_run.extend(self.joined.pop(self.running, ())) # Reschedules the parent task if self.running.joined: # Let the join function handle the hassle of propagating the error self.running.result = Result(None, has_raised) # Save the exception else: # Let the exception propagate (I'm looking at you asyncIO ;)) raise except KeyboardInterrupt: self.running.coroutine.throw(KeyboardInterrupt) def spawn(self, coroutine: types.coroutine): """Schedules a task for execution, appending it to the call stack""" task = Task(coroutine, self) self.to_run.append(task) return task def schedule(self, coroutine: types.coroutine, when: int): # TODO: Fix this """Schedules a task for execution after n seconds""" self.sequence += 1 task = Task(coroutine, self) heappush(self.paused, (self.clock() + when, self.sequence, task)) return task def start(self, coroutine: types.coroutine, *args, **kwargs): """Starts the eventloop""" self.spawn(coroutine(*args, **kwargs)) self.loop() def want_read(self, sock: socket.socket): """Handler for the 'want_read' event, performs the needed operations to read from the passed socket asynchronously""" self.selector.register(sock, EVENT_READ, self.running) def want_write(self, sock: socket.socket): """Handler for the 'want_write' event, performs the needed operations to write into the passed socket asynchronously""" self.selector.register(sock, EVENT_WRITE, self.running) def wrap_socket(self, sock): """Wraps a standard socket into an AsyncSocket""" return AsyncSocket(sock, self) async def read_sock(self, sock: socket.socket, buffer: int): await want_read(sock) return sock.recv(buffer) async def accept_sock(self, sock: socket.socket): await want_read(sock) return sock.accept() async def sock_sendall(self, sock: socket.socket, data: bytes): while data: await want_write(sock) sent_no = sock.send(data) data = data[sent_no:] async def close_sock(self, sock: socket.socket): await want_write(sock) return sock.close() def want_join(self, coro: types.coroutine): if coro not in self.joined: self.joined[coro].append(self.running) else: self.running.throw(AlreadyJoinedError("Joining the same task multiple times is not allowed!")) def want_sleep(self, seconds): self.sequence += 1 # Make this specific sleeping task unique to avoid error when comparing identical deadlines heappush(self.paused, (self.clock() + seconds, self.sequence, self.running)) def want_cancel(self, task): self.to_run.extend(self.joined.pop(self.running, ())) # Reschedules the parent task task.cancelled = True task.throw(CancelledError()) async def connect_sock(self, sock: socket.socket, addr: tuple): await want_write(sock) return sock.connect(addr) class Result: """A wrapper for results of coroutines""" def __init__(self, val=None, exc: Exception = None): self.val = val self.exc = exc def __repr__(self): return f"giambio.core.Result({self.val}, {self.exc})" class Task: """A simple wrapper around a coroutine object""" def __init__(self, coroutine: types.coroutine, loop: EventLoop): self.coroutine = coroutine self.status = False # Not ran yet self.joined = False self.result = None # Updated when the coroutine execution ends self.loop = loop # The EventLoop object that spawned the task def run(self): self.status = True try: return self.coroutine.send(None) except RuntimeError: print(self.loop.to_run) def __repr__(self): return f"giambio.core.Task({self.coroutine}, {self.status}, {self.joined}, {self.result})" def throw(self, exception: Exception): self.result = Result(None, exception) return self.coroutine.throw(exception) async def cancel(self): return await cancel(self) async def join(self): return await join(self) def get_result(self): if self.result: if self.result.exc: raise self.result.exc else: return self.result.val @types.coroutine def sleep(seconds: int): """Pause the execution of a coroutine for the passed amount of seconds, without blocking the entire event loop, which keeps watching for other events""" yield "want_sleep", seconds @types.coroutine def want_read(sock: socket.socket): # TODO: Fix this and make it work also when tasks are not joined """'Tells' the event loop that there is some coroutine that wants to read from the passed socket""" yield "want_read", sock @types.coroutine def want_write(sock: socket.socket): """'Tells' the event loop that there is some coroutine that wants to write into the passed socket""" yield "want_write", sock @types.coroutine def join(task: Task): """'Tells' the scheduler that the desired task MUST be awaited for completion""" task.joined = True yield "want_join", task return task.get_result() @types.coroutine def cancel(task: Task): yield "want_cancel", task