mirror of https://github.com/nocturn9x/giambio.git
Added support for giambio.sleep() and minor fixes (still joining issue)
This commit is contained in:
parent
94092a7ae7
commit
6c28c7db20
|
@ -2,12 +2,12 @@ import types
|
|||
import datetime
|
||||
from collections import deque, defaultdict
|
||||
from selectors import DefaultSelector, EVENT_READ, EVENT_WRITE
|
||||
from inspect import iscoroutine
|
||||
from functools import wraps
|
||||
from heapq import heappush, heappop
|
||||
import socket
|
||||
from .exceptions import GiambioError, AlreadyJoinedError
|
||||
import traceback
|
||||
|
||||
from timeit import default_timer
|
||||
from time import sleep as wait
|
||||
|
||||
|
||||
class Task:
|
||||
|
@ -32,16 +32,17 @@ class Task:
|
|||
class EventLoop:
|
||||
|
||||
"""Implementation of an event loop, alternates between execution of coroutines (asynchronous functions)
|
||||
to allow a concurrency model"""
|
||||
to allow a concurrency model or 'green threads'"""
|
||||
|
||||
def __init__(self):
|
||||
"""Object constructor"""
|
||||
|
||||
self.to_run = deque() # Scheduled tasks
|
||||
self.paused = deque() # Paused or sleeping tasks
|
||||
self.paused = [] # Sleeping tasks
|
||||
self.selector = DefaultSelector() # Selector object to perform I/O multiplexing
|
||||
self.running = None # This will always point to the currently running coroutine (Task object)
|
||||
self.waitlist = defaultdict(list) # Tasks that want to join
|
||||
self.joined = defaultdict(list) # Tasks that want to join
|
||||
self.clock = default_timer # Monotonic clock to keep track of elapsed time
|
||||
|
||||
def loop(self):
|
||||
"""Main event loop for giambio"""
|
||||
|
@ -50,24 +51,32 @@ class EventLoop:
|
|||
if not self.selector.get_map() and not self.to_run:
|
||||
break
|
||||
while self.selector.get_map(): # If there are sockets ready, schedule their associated task
|
||||
tasks = deque(self.selector.select())
|
||||
timeout = 0.0 if self.to_run else None
|
||||
tasks = deque(self.selector.select(timeout))
|
||||
for key, _ in tasks:
|
||||
self.to_run.append(key.data) # Socket ready? Schedule the task
|
||||
self.selector.unregister(key.fileobj) # Once scheduled, the task does not need to wait anymore
|
||||
while self.to_run:
|
||||
while self.to_run or self.paused:
|
||||
if not self.to_run:
|
||||
wait(max(0.0, self.paused[0][0] - self.clock())) # If there are no tasks ready, just do nothing
|
||||
while self.paused and self.paused[0][0] < self.clock(): # Rechedules task when their timer has elapsed
|
||||
_, coro = heappop(self.paused)
|
||||
self.to_run.append(coro)
|
||||
self.running = self.to_run.popleft() # Sets the currently running task
|
||||
try:
|
||||
meth, *args = self.running.run() # Sneaky method call, thanks to David Beazley for this ;)
|
||||
getattr(self, meth)(*args)
|
||||
method, *args = self.running.run() # Sneaky method call, thanks to David Beazley for this ;)
|
||||
getattr(self, method)(*args)
|
||||
except StopIteration as e:
|
||||
self.running.ret_value = e.args[0] if e.args else None # Saves the return value
|
||||
self.to_run.extend(self.waitlist.pop(self.running, ())) # Reschedules the parent task
|
||||
self.to_run.extend(self.joined.pop(self.running, ())) # Reschedules the parent task
|
||||
except Exception as has_raised:
|
||||
if self.running.joined:
|
||||
self.running.exception = has_raised # Errored? Save the exception
|
||||
else: # If the task is not joined, the exception would disappear, but not in giambio
|
||||
raise GiambioError from has_raised
|
||||
self.to_run.extend(self.waitlist.pop(self.running, ()))
|
||||
raise
|
||||
self.to_run.extend(self.joined.pop(self.running, ()))
|
||||
except KeyboardInterrupt:
|
||||
self.running.coroutine.throw(KeyboardInterrupt)
|
||||
|
||||
def spawn(self, coroutine: types.coroutine):
|
||||
"""Schedules a task for execution, appending it to the call stack"""
|
||||
|
@ -95,30 +104,33 @@ class EventLoop:
|
|||
def wrap_socket(self, sock):
|
||||
return AsyncSocket(sock, self)
|
||||
|
||||
async def read_sock(self, sock, buffer):
|
||||
async def read_sock(self, sock: socket.socket, buffer: int):
|
||||
await want_read(sock)
|
||||
return sock.recv(buffer)
|
||||
|
||||
async def accept_sock(self, sock):
|
||||
async def accept_sock(self, sock: socket.socket):
|
||||
await want_read(sock)
|
||||
return sock.accept()
|
||||
|
||||
async def sock_sendall(self, sock, data):
|
||||
async def sock_sendall(self, sock: socket.socket, data: bytes):
|
||||
while data:
|
||||
await want_write(sock)
|
||||
sent_no = sock.send(data)
|
||||
data = data[sent_no:]
|
||||
|
||||
async def close_sock(self, sock):
|
||||
async def close_sock(self, sock: socket.socket):
|
||||
await want_write(sock)
|
||||
return sock.close()
|
||||
|
||||
def want_join(self, coro):
|
||||
if coro not in self.waitlist:
|
||||
self.waitlist[coro].append(self.running)
|
||||
def want_join(self, coro: types.coroutine):
|
||||
if coro not in self.joined:
|
||||
self.joined[coro].append(self.running)
|
||||
else:
|
||||
raise AlreadyJoinedError("Joining the same task multiple times is not allowed!")
|
||||
|
||||
def want_sleep(self, seconds):
|
||||
heappush(self.paused, (self.clock() + seconds, self.running))
|
||||
|
||||
|
||||
class AsyncSocket(object):
|
||||
"""Abstraction layer for asynchronous sockets"""
|
||||
|
@ -159,15 +171,12 @@ class AsyncSocket(object):
|
|||
return f"AsyncSocket({self.sock}, {self.loop})"
|
||||
|
||||
|
||||
@types.coroutine # TODO: Add support for this function
|
||||
@types.coroutine
|
||||
def sleep(seconds: int):
|
||||
"""Pause the execution of a coroutine for the passed amount of seconds,
|
||||
without blocking the entire event loop, which keeps watching for other events"""
|
||||
|
||||
yield "want_sleep", seconds
|
||||
start = datetime.datetime.now()
|
||||
end = datetime.datetime.now() + datetime.timedelta(seconds=seconds)
|
||||
return (yield end) - start # Return how much time did the coroutine actually wait
|
||||
|
||||
|
||||
@types.coroutine
|
||||
|
|
9
test.py
9
test.py
|
@ -17,14 +17,13 @@ async def make_srv(address: tuple):
|
|||
sock.listen(5)
|
||||
asock = loop.wrap_socket(sock)
|
||||
logging.info(f"Echo server serving asynchronously at {address}")
|
||||
logging.info("Sleeping for 2 secs")
|
||||
await giambio.sleep(2)
|
||||
logging.info("Done!")
|
||||
while True:
|
||||
conn, addr = await asock.accept()
|
||||
logging.info(f"{addr} connected")
|
||||
task = loop.spawn(echo_server(conn, addr))
|
||||
# try:
|
||||
# await giambio.join(task)
|
||||
# except Exception as e:
|
||||
# print(repr(e))
|
||||
loop.spawn(echo_server(conn, addr))
|
||||
|
||||
|
||||
async def echo_server(sock: AsyncSocket, addr: tuple):
|
||||
|
|
Loading…
Reference in New Issue