Improved features, still needs fix

This commit is contained in:
nocturn9x 2020-03-19 18:48:24 +00:00
parent 49a6dff27c
commit b059125991
6 changed files with 112 additions and 15 deletions

View File

@ -1,2 +0,0 @@
__author__ = "Nocturn9x aka Isgiambyy"
__version__ = (0, 0, 1)

4
giambio/__init__.py Normal file
View File

@ -0,0 +1,4 @@
__author__ = "Nocturn9x aka Isgiambyy"
__version__ = (0, 0, 1)
from .core import EventLoop, join, sleep
__all__ = ["EventLoop", "join", "sleep"]

View File

@ -27,16 +27,24 @@ class Task:
def __init__(self, coroutine: types.coroutine): def __init__(self, coroutine: types.coroutine):
self.coroutine = coroutine self.coroutine = coroutine
self.status = False # Not ran yet
self.joined = False
def run(self): def run(self):
self.status = True
return self.coroutine.send(None) return self.coroutine.send(None)
def __repr__(self):
return f"<giambio.core.Task({self.coroutine}, {self.status})"
class EventLoop: class EventLoop:
"""Implementation of an event loop, alternates between execution of coroutines (asynchronous functions) """Implementation of an event loop, alternates between execution of coroutines (asynchronous functions)
to allow a concurrency model""" to allow a concurrency model"""
def __init__(self): def __init__(self):
"""Object constructor"""
self.to_run = deque() # Scheduled tasks self.to_run = deque() # Scheduled tasks
self.paused = deque() # Paused or sleeping tasks self.paused = deque() # Paused or sleeping tasks
self.selector = DefaultSelector() # Selector object to perform I/O multiplexing self.selector = DefaultSelector() # Selector object to perform I/O multiplexing
@ -46,13 +54,14 @@ class EventLoop:
@sync_only @sync_only
def loop(self): def loop(self):
"""Main event loop for giambio""" """Main event loop for giambio"""
while True: while True:
if not self.selector.get_map() and not self.to_run: if not self.selector.get_map() and not self.to_run:
break break
while self.selector.get_map(): # If there are sockets ready, schedule their associated task while self.selector.get_map(): # If there are sockets ready, schedule their associated task
tasks = deque(self.selector.select()) tasks = deque(self.selector.select())
for key, _ in tasks: for key, _ in tasks:
self.to_run.append(Task(key.data)) # Socket ready? Schedule the task self.to_run.append(key.data) # Socket ready? Schedule the task
self.selector.unregister(key.fileobj) # Once scheduled, the task does not need to wait anymore self.selector.unregister(key.fileobj) # Once scheduled, the task does not need to wait anymore
while self.to_run: while self.to_run:
self.running = self.to_run.popleft() # Sets the currently running task self.running = self.to_run.popleft() # Sets the currently running task
@ -62,13 +71,26 @@ class EventLoop:
except StopIteration as e: except StopIteration as e:
return_values[self.running] = e.args[0] if e.args else None # Saves the return value return_values[self.running] = e.args[0] if e.args else None # Saves the return value
self.to_run.extend(self.waitlist.pop(self.running, ())) # Reschedules the parent task self.to_run.extend(self.waitlist.pop(self.running, ())) # Reschedules the parent task
except Exception as error: except Exception as has_raised:
exceptions[self.running] = error # Errored? Save the exception if self.running.joined:
exceptions[self.running] = has_raised # Errored? Save the exception
else: # If the task is not joined, the exception would disappear, but not in giambio
print("Traceback (most recent call last):")
traceback.print_tb(has_raised.__traceback__)
ename = type(has_raised).__name__
if str(has_raised):
print(f"{ename}: {has_raised}")
else:
print(has_raised)
raise GiambioError from has_raised
self.to_run.extend(self.waitlist.pop(self.running, ())) self.to_run.extend(self.waitlist.pop(self.running, ()))
def spawn(self, coroutine: types.coroutine): def spawn(self, coroutine: types.coroutine):
"""Schedules a task for execution, appending it to the call stack""" """Schedules a task for execution, appending it to the call stack"""
self.to_run.append(coroutine)
task = Task(coroutine)
self.to_run.append(task)
return task
@sync_only @sync_only
def start(self, coroutine: types.coroutine, *args, **kwargs): def start(self, coroutine: types.coroutine, *args, **kwargs):
@ -108,6 +130,13 @@ class EventLoop:
await want_write(sock) await want_write(sock)
return sock.close() return sock.close()
def want_join(self, coro):
if coro not in self.waitlist:
self.waitlist[coro].append(self.running)
else:
raise AlreadyJoinedError("Joining the same task multiple times is not allowed!")
class AsyncSocket(object): class AsyncSocket(object):
"""Abstraction layer for asynchronous sockets""" """Abstraction layer for asynchronous sockets"""
@ -147,24 +176,17 @@ class AsyncSocket(object):
return f"AsyncSocket({self.sock}, {self.loop})" return f"AsyncSocket({self.sock}, {self.loop})"
@types.coroutine @types.coroutine # TODO: Add support for this function
def sleep(seconds: int): def sleep(seconds: int):
"""Pause the execution of a coroutine for the passed amount of seconds, """Pause the execution of a coroutine for the passed amount of seconds,
without blocking the entire event loop, which keeps watching for other events""" without blocking the entire event loop, which keeps watching for other events"""
yield "want_sleep", seconds
start = datetime.datetime.now() start = datetime.datetime.now()
end = datetime.datetime.now() + datetime.timedelta(seconds=seconds) end = datetime.datetime.now() + datetime.timedelta(seconds=seconds)
return (yield end) - start # Return how much time did the coroutine actually wait return (yield end) - start # Return how much time did the coroutine actually wait
@types.coroutine
def want_join(coroutine: types.coroutine):
"""'Tells' the event loop that there is some coroutine that needs to be waited for completion"""
yield "want_join", coroutine
@types.coroutine @types.coroutine
def want_read(sock: socket.socket): def want_read(sock: socket.socket):
"""'Tells' the event loop that there is some coroutine that wants to read from the passed socket""" """'Tells' the event loop that there is some coroutine that wants to read from the passed socket"""
@ -183,6 +205,7 @@ def want_write(sock: socket.socket):
def join(task: Task): def join(task: Task):
"""'Tells' the scheduler that the desired task MUST be awaited for completion""" """'Tells' the scheduler that the desired task MUST be awaited for completion"""
task.joined = True
yield "want_join", task yield "want_join", task
has_raised = exceptions.pop(task, None) has_raised = exceptions.pop(task, None)
if has_raised: if has_raised:

View File

@ -2,3 +2,6 @@ class GiambioError(Exception):
"""Base class for gaimbio exceptions""" """Base class for gaimbio exceptions"""
pass pass
class AlreadyJoinedError(GiambioError):
pass

22
setup.py Normal file
View File

@ -0,0 +1,22 @@
import setuptools
with open("README.md", "r") as readme:
long_description = readme.read()
setuptools.setup(
name="GiambIO",
version="0.0.1",
author="Nocturn9x aka IsGiambyy",
author_email="hackhab@gmail.com",
description="Asynchronous Python made easy (and friendly)",
long_description=long_description,
long_description_content_type="text/markdown",
url="https://github.com/nocturn9x/giambio",
packages=setuptools.find_packages(),
classifiers=[
"Programming Language :: Python :: 3",
"Operating System :: OS Independent",
"License :: OSI Approved :: Apache License 2.0"
],
python_requires='>=3.6'
)

47
test.py Normal file
View File

@ -0,0 +1,47 @@
import giambio
from giambio.core import AsyncSocket
import socket
import logging
loop = giambio.EventLoop()
logging.basicConfig(level=20,
format="[%(levelname)s] %(asctime)s %(message)s",
datefmt='%d/%m/%Y %p')
async def make_srv(address: tuple):
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.bind(address)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.listen(5)
asock = loop.wrap_socket(sock)
logging.info(f"Echo server serving asynchronously at {address}")
while True:
conn, addr = await asock.accept()
logging.info(f"{addr} connected")
task = loop.spawn(echo_server(conn, addr))
# try:
# await giambio.join(task)
# except Exception as e:
# print(repr(e))
async def echo_server(sock: AsyncSocket, addr: tuple):
with sock:
# Without the try/except block and the call to giambio.join(), the task would block here
await sock.send_all(b"Welcome to the server pal!\n")
while True:
data = await sock.receive(1000)
if not data:
break
to_send_back = data
data = data.decode("utf-8").encode('unicode_escape')
logging.info(f"Got: '{data.decode('utf-8')}' from {addr}")
await sock.send_all(b"Got: " + to_send_back)
logging.info(f"Echoed back '{data.decode('utf-8')}' to {addr}")
logging.info(f"Connection from {addr} closed")
loop.start(make_srv, ('', 1500))