Initial library, totally rewritten

This commit is contained in:
Mattia 2020-03-19 18:24:06 +01:00 committed by GitHub
parent a2233f7408
commit 49a6dff27c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 207 additions and 0 deletions

2
__init__.py Normal file
View File

@ -0,0 +1,2 @@
__author__ = "Nocturn9x aka Isgiambyy"
__version__ = (0, 0, 1)

4
exceptions.py Normal file
View File

@ -0,0 +1,4 @@
class GiambioError(Exception):
"""Base class for gaimbio exceptions"""
pass

201
giambio.py Normal file
View File

@ -0,0 +1,201 @@
import types
import datetime
from collections import deque, defaultdict
from selectors import DefaultSelector, EVENT_READ, EVENT_WRITE
from inspect import iscoroutine
from functools import wraps
import socket
from .exceptions import GiambioError
import traceback
return_values = {} # Saves the return values from coroutines
exceptions = {} # Saves exceptions from errored coroutines
def sync_only(func):
@wraps(func)
def wrapper(*args, **kwargs):
if iscoroutine(2):
raise RuntimeError(f"Function '{func.__name__}' MUST be called from a synchronous context!")
return func(*args, **kwargs)
return wrapper
class Task:
"""A simple wrapper around a coroutine object"""
def __init__(self, coroutine: types.coroutine):
self.coroutine = coroutine
def run(self):
return self.coroutine.send(None)
class EventLoop:
"""Implementation of an event loop, alternates between execution of coroutines (asynchronous functions)
to allow a concurrency model"""
def __init__(self):
self.to_run = deque() # Scheduled tasks
self.paused = deque() # Paused or sleeping tasks
self.selector = DefaultSelector() # Selector object to perform I/O multiplexing
self.running = None # This will always point to the currently running coroutine object
self.waitlist = defaultdict(list) # Tasks that want to join
@sync_only
def loop(self):
"""Main event loop for giambio"""
while True:
if not self.selector.get_map() and not self.to_run:
break
while self.selector.get_map(): # If there are sockets ready, schedule their associated task
tasks = deque(self.selector.select())
for key, _ in tasks:
self.to_run.append(Task(key.data)) # Socket ready? Schedule the task
self.selector.unregister(key.fileobj) # Once scheduled, the task does not need to wait anymore
while self.to_run:
self.running = self.to_run.popleft() # Sets the currently running task
try:
meth, *args = self.running.run() # Sneaky method call, thanks to David Beazley for this ;)
getattr(self, meth)(*args)
except StopIteration as e:
return_values[self.running] = e.args[0] if e.args else None # Saves the return value
self.to_run.extend(self.waitlist.pop(self.running, ())) # Reschedules the parent task
except Exception as error:
exceptions[self.running] = error # Errored? Save the exception
self.to_run.extend(self.waitlist.pop(self.running, ()))
def spawn(self, coroutine: types.coroutine):
"""Schedules a task for execution, appending it to the call stack"""
self.to_run.append(coroutine)
@sync_only
def start(self, coroutine: types.coroutine, *args, **kwargs):
self.spawn(coroutine(*args, **kwargs))
self.loop()
def want_read(self, sock: socket.socket):
"""Handler for the 'want_read' event, performs the needed operations to read from the passed socket
asynchronously"""
self.selector.register(sock, EVENT_READ, self.running)
def want_write(self, sock: socket.socket):
"""Handler for the 'want_read' event, performs the needed operations to write into the passed socket
asynchronously"""
self.selector.register(sock, EVENT_WRITE, self.running)
def wrap_socket(self, sock):
return AsyncSocket(sock, self)
async def read_sock(self, sock, buffer):
await want_read(sock)
return sock.recv(buffer)
async def accept_sock(self, sock):
await want_read(sock)
return sock.accept()
async def sock_sendall(self, sock, data):
while data:
await want_write(sock)
sent_no = sock.send(data)
data = data[sent_no:]
async def close_sock(self, sock):
await want_write(sock)
return sock.close()
class AsyncSocket(object):
"""Abstraction layer for asynchronous sockets"""
def __init__(self, sock: socket.socket, loop: EventLoop):
self.sock = sock
self.loop = loop
async def receive(self, max_size: int):
"""Receives up to max_size from a socket asynchronously"""
return await self.loop.read_sock(self.sock, max_size)
async def accept(self):
"""Accepts the socket, completing the 3-step TCP handshake asynchronously"""
to_wrap = await self.loop.accept_sock(self.sock)
return self.loop.wrap_socket(to_wrap[0]), to_wrap[1]
async def send_all(self, data: bytes):
"""Sends all data inside the buffer asynchronously until it is empty"""
return await self.loop.sock_sendall(self.sock, data)
async def close(self):
"""Closes the socket asynchronously"""
await self.loop.close_sock(self.sock)
def __enter__(self):
return self.sock.__enter__()
def __exit__(self, *args):
return self.sock.__exit__(*args)
def __repr__(self):
return f"AsyncSocket({self.sock}, {self.loop})"
@types.coroutine
def sleep(seconds: int):
"""Pause the execution of a coroutine for the passed amount of seconds,
without blocking the entire event loop, which keeps watching for other events"""
start = datetime.datetime.now()
end = datetime.datetime.now() + datetime.timedelta(seconds=seconds)
return (yield end) - start # Return how much time did the coroutine actually wait
@types.coroutine
def want_join(coroutine: types.coroutine):
"""'Tells' the event loop that there is some coroutine that needs to be waited for completion"""
yield "want_join", coroutine
@types.coroutine
def want_read(sock: socket.socket):
"""'Tells' the event loop that there is some coroutine that wants to read from the passed socket"""
yield "want_read", sock
@types.coroutine
def want_write(sock: socket.socket):
"""'Tells' the event loop that there is some coroutine that wants to write into the passed socket"""
yield "want_write", sock
@types.coroutine
def join(task: Task):
"""'Tells' the scheduler that the desired task MUST be awaited for completion"""
yield "want_join", task
has_raised = exceptions.pop(task, None)
if has_raised:
print("Traceback (most recent call last):")
traceback.print_tb(has_raised.__traceback__)
ename = type(has_raised).__name__
if str(has_raised):
print(f"{ename}: {has_raised}")
else:
print(has_raised)
raise GiambioError from has_raised
return return_values.pop(task, None)