diff --git a/__init__.py b/__init__.py new file mode 100644 index 0000000..6c76eaa --- /dev/null +++ b/__init__.py @@ -0,0 +1,2 @@ +__author__ = "Nocturn9x aka Isgiambyy" +__version__ = (0, 0, 1) diff --git a/exceptions.py b/exceptions.py new file mode 100644 index 0000000..e675528 --- /dev/null +++ b/exceptions.py @@ -0,0 +1,4 @@ +class GiambioError(Exception): + """Base class for gaimbio exceptions""" + pass + diff --git a/giambio.py b/giambio.py new file mode 100644 index 0000000..8c0a255 --- /dev/null +++ b/giambio.py @@ -0,0 +1,201 @@ +import types +import datetime +from collections import deque, defaultdict +from selectors import DefaultSelector, EVENT_READ, EVENT_WRITE +from inspect import iscoroutine +from functools import wraps +import socket +from .exceptions import GiambioError +import traceback + +return_values = {} # Saves the return values from coroutines +exceptions = {} # Saves exceptions from errored coroutines + + +def sync_only(func): + @wraps(func) + def wrapper(*args, **kwargs): + if iscoroutine(2): + raise RuntimeError(f"Function '{func.__name__}' MUST be called from a synchronous context!") + return func(*args, **kwargs) + return wrapper + + +class Task: + + """A simple wrapper around a coroutine object""" + + def __init__(self, coroutine: types.coroutine): + self.coroutine = coroutine + + def run(self): + return self.coroutine.send(None) + +class EventLoop: + + """Implementation of an event loop, alternates between execution of coroutines (asynchronous functions) + to allow a concurrency model""" + + def __init__(self): + self.to_run = deque() # Scheduled tasks + self.paused = deque() # Paused or sleeping tasks + self.selector = DefaultSelector() # Selector object to perform I/O multiplexing + self.running = None # This will always point to the currently running coroutine object + self.waitlist = defaultdict(list) # Tasks that want to join + + @sync_only + def loop(self): + """Main event loop for giambio""" + while True: + if not self.selector.get_map() and not self.to_run: + break + while self.selector.get_map(): # If there are sockets ready, schedule their associated task + tasks = deque(self.selector.select()) + for key, _ in tasks: + self.to_run.append(Task(key.data)) # Socket ready? Schedule the task + self.selector.unregister(key.fileobj) # Once scheduled, the task does not need to wait anymore + while self.to_run: + self.running = self.to_run.popleft() # Sets the currently running task + try: + meth, *args = self.running.run() # Sneaky method call, thanks to David Beazley for this ;) + getattr(self, meth)(*args) + except StopIteration as e: + return_values[self.running] = e.args[0] if e.args else None # Saves the return value + self.to_run.extend(self.waitlist.pop(self.running, ())) # Reschedules the parent task + except Exception as error: + exceptions[self.running] = error # Errored? Save the exception + self.to_run.extend(self.waitlist.pop(self.running, ())) + + def spawn(self, coroutine: types.coroutine): + """Schedules a task for execution, appending it to the call stack""" + self.to_run.append(coroutine) + + @sync_only + def start(self, coroutine: types.coroutine, *args, **kwargs): + self.spawn(coroutine(*args, **kwargs)) + self.loop() + + def want_read(self, sock: socket.socket): + """Handler for the 'want_read' event, performs the needed operations to read from the passed socket + asynchronously""" + + self.selector.register(sock, EVENT_READ, self.running) + + def want_write(self, sock: socket.socket): + """Handler for the 'want_read' event, performs the needed operations to write into the passed socket + asynchronously""" + + self.selector.register(sock, EVENT_WRITE, self.running) + + def wrap_socket(self, sock): + return AsyncSocket(sock, self) + + async def read_sock(self, sock, buffer): + await want_read(sock) + return sock.recv(buffer) + + async def accept_sock(self, sock): + await want_read(sock) + return sock.accept() + + async def sock_sendall(self, sock, data): + while data: + await want_write(sock) + sent_no = sock.send(data) + data = data[sent_no:] + + async def close_sock(self, sock): + await want_write(sock) + return sock.close() + + +class AsyncSocket(object): + """Abstraction layer for asynchronous sockets""" + + def __init__(self, sock: socket.socket, loop: EventLoop): + self.sock = sock + self.loop = loop + + async def receive(self, max_size: int): + """Receives up to max_size from a socket asynchronously""" + + return await self.loop.read_sock(self.sock, max_size) + + async def accept(self): + """Accepts the socket, completing the 3-step TCP handshake asynchronously""" + + to_wrap = await self.loop.accept_sock(self.sock) + return self.loop.wrap_socket(to_wrap[0]), to_wrap[1] + + async def send_all(self, data: bytes): + """Sends all data inside the buffer asynchronously until it is empty""" + + return await self.loop.sock_sendall(self.sock, data) + + async def close(self): + """Closes the socket asynchronously""" + + await self.loop.close_sock(self.sock) + + def __enter__(self): + return self.sock.__enter__() + + def __exit__(self, *args): + return self.sock.__exit__(*args) + + def __repr__(self): + return f"AsyncSocket({self.sock}, {self.loop})" + + +@types.coroutine +def sleep(seconds: int): + """Pause the execution of a coroutine for the passed amount of seconds, + without blocking the entire event loop, which keeps watching for other events""" + + start = datetime.datetime.now() + end = datetime.datetime.now() + datetime.timedelta(seconds=seconds) + + return (yield end) - start # Return how much time did the coroutine actually wait + + +@types.coroutine +def want_join(coroutine: types.coroutine): + """'Tells' the event loop that there is some coroutine that needs to be waited for completion""" + + yield "want_join", coroutine + + +@types.coroutine +def want_read(sock: socket.socket): + """'Tells' the event loop that there is some coroutine that wants to read from the passed socket""" + + yield "want_read", sock + + +@types.coroutine +def want_write(sock: socket.socket): + """'Tells' the event loop that there is some coroutine that wants to write into the passed socket""" + + yield "want_write", sock + + +@types.coroutine +def join(task: Task): + """'Tells' the scheduler that the desired task MUST be awaited for completion""" + + yield "want_join", task + has_raised = exceptions.pop(task, None) + if has_raised: + print("Traceback (most recent call last):") + traceback.print_tb(has_raised.__traceback__) + ename = type(has_raised).__name__ + if str(has_raised): + print(f"{ename}: {has_raised}") + else: + print(has_raised) + raise GiambioError from has_raised + return return_values.pop(task, None) + + + +