Major library refactoring, made event loops thread-local (no need to carry scheduler objects around anymore) and fixed some bugs related to I/O. Made events simpler as they would overlap with a future implementation of channels

This commit is contained in:
nocturn9x 2020-11-13 10:44:47 +01:00
parent 3745886500
commit 0b1c5d75e7
10 changed files with 192 additions and 89 deletions

View File

@ -16,16 +16,19 @@ limitations under the License.
__author__ = "Nocturn9x aka Isgiambyy"
__version__ = (1, 0, 0)
from ._core import AsyncScheduler
from ._run import run, spawn, clock, wrap_socket
from .exceptions import GiambioError, AlreadyJoinedError, CancelledError
from ._traps import sleep
from ._layers import Event
__all__ = [
"AsyncScheduler",
"GiambioError",
"AlreadyJoinedError",
"CancelledError",
"sleep",
"Event",
"run",
"spawn",
"clock",
"wrap_socket"
]

View File

@ -22,7 +22,7 @@ import socket
from .exceptions import AlreadyJoinedError, CancelledError, ResourceBusy, GiambioError
from timeit import default_timer
from time import sleep as wait
from .socket import AsyncSocket, WantWrite
from .socket import AsyncSocket, WantWrite, WantRead
from ._layers import Task, TimeQueue
from socket import SOL_SOCKET, SO_ERROR
from ._traps import want_read, want_write
@ -52,11 +52,11 @@ class AsyncScheduler:
default_timer # Monotonic clock to keep track of elapsed time reliably
)
self.paused = TimeQueue(self.clock) # Tasks that are asleep
self.events = {} # All Event objects
self.events = set() # All Event objects
self._event_waiting = defaultdict(list) # Coroutines waiting on event objects
self.sequence = 0
def run(self):
def _run(self):
"""
Starts the loop and 'listens' for events until there are either ready or asleep tasks,
then exit. This behavior kinda reflects a kernel, as coroutines can request
@ -84,28 +84,37 @@ class AsyncScheduler:
if self.current_task.status == "cancel": # Deferred cancellation
self.current_task.cancelled = True
self.current_task.throw(CancelledError(self.current_task))
method, *args = self.current_task.run(
self.current_task._notify
) # Run a single step with the calculation (and awake event-waiting tasks if any)
method, *args = self.current_task.run() # Run a single step with the calculation
self.current_task.status = "run"
getattr(self, f"_{method}")(
*args
) # Sneaky method call, thanks to David Beazley for this ;)
if self._event_waiting:
self.check_events()
self._check_events()
except CancelledError as cancelled:
self.tasks.remove(cancelled.args[0]) # Remove the dead task
self.tasks.append(self.current_task)
except StopIteration as e: # Coroutine ends
self.current_task.result = e.args[0] if e.args else None
self.current_task.finished = True
self.reschedule_parent(self.current_task)
self._reschedule_parent()
except BaseException as error: # Coroutine raised
self.current_task.exc = error
self.reschedule_parent(self.current_task)
self._reschedule_parent()
self._join(self.current_task)
def check_events(self):
def clock(self):
"""
Returns the current clock time for the event loop.
Useful to keep track of elapsed time in the terms of
the scheduler itself
:return: whatever self.clock returns
:rtype:
"""
return self.clock()
def _check_events(self):
"""
Checks for ready or expired events and triggers them
"""
@ -113,8 +122,6 @@ class AsyncScheduler:
for event, tasks in self._event_waiting.copy().items():
if event._set:
event.event_caught = True
for task in tasks:
task._notify = event._notify
self.tasks.extend(tasks + [event.notifier])
self._event_waiting.pop(event)
@ -147,40 +154,41 @@ class AsyncScheduler:
for key, _ in io_ready:
self.tasks.append(key.data) # Socket ready? Schedule the task
def create_task(self, coro: types.coroutine):
def spawn(self, func: types.FunctionType, *args):
"""
Spawns a child task
"""
task = Task(coro)
task = Task(func(*args))
self.tasks.append(task)
return task
def schedule_task(self, coro: types.coroutine, n: int):
def spawn_after(self, func: types.FunctionType, n: int, *args):
"""
Schedules a task for execution after n seconds
"""
task = Task(coro)
task = Task(func(*args))
self.paused.put(task, n)
return task
def start(self, coro: types.coroutine):
def start(self, func: types.FunctionType, *args):
"""
Starts the event loop using a coroutine as an entry point.
"""
entry = self.create_task(coro)
self.run()
entry = self.spawn(func, *args)
self._run()
self._join(entry)
return entry
def reschedule_parent(self, coro):
def _reschedule_parent(self):
"""
Reschedules the parent task
Reschedules the parent task of the
currently running task, if any
"""
parent = self.joined.pop(coro, None)
parent = self.joined.pop(self.current_task, None)
if parent:
self.tasks.append(parent)
return parent
@ -200,7 +208,7 @@ class AsyncScheduler:
self.current_task._last_io = "READ", sock
try:
self.selector.register(sock, EVENT_READ, self.current_task)
except KeyError:
except KeyError: # The socket is already registered doing something else
raise ResourceBusy("The given resource is busy!") from None
def _want_write(self, sock: socket.socket):
@ -228,10 +236,10 @@ class AsyncScheduler:
parent task
"""
if child.cancelled: # Task was cancelled and is therefore dead
self.tasks.append(self.current_task)
if child.cancelled or child.finished: # Task was cancelled or has finished executing and is therefore dead
self._reschedule_parent()
elif child.exc: # Task raised an error, propagate it!
self.reschedule_parent(child)
self._reschedule_parent()
raise child.exc
elif child.finished:
self.tasks.append(self.current_task) # Task has already finished
@ -254,27 +262,26 @@ class AsyncScheduler:
else:
self.tasks.append(self.current_task)
def _event_set(self, event, value):
def _event_set(self, event):
"""
Sets an event
"""
event.notifier = self.current_task
event._set = True
event._notify = value
self.events[event] = value
self.events.add(event)
def _event_wait(self, event):
"""
Waits for an event
"""
if self.events.get(event, None):
if event in self.events:
event.waiting -= 1
if event.waiting <= 0:
return self.events.pop(event)
return self.events.remove(event)
else:
return self.events[event]
return
else:
self._event_waiting[event].append(self.current_task)
@ -334,6 +341,7 @@ class AsyncScheduler:
"""
await want_write(sock)
self.selector.unregister(sock)
return sock.close()
async def _connect_sock(self, sock: socket.socket, addr: tuple):

View File

@ -32,7 +32,6 @@ class Task:
self.finished = False
self.status = "init" # This is useful for cancellation
self._last_io = None
self._notify = None
def run(self, what=None):
"""Simple abstraction layer over the coroutines ``send`` method"""
@ -61,24 +60,25 @@ class Task:
class Event:
"""A class designed similarly to threading.Event, but with more features"""
"""A class designed similarly to threading.Event"""
def __init__(self):
"""Object constructor"""
self._set = False
self._notify = None
self.event_caught = False
self.timeout = None
self.waiting = 0
async def set(self, value=True):
"""Sets the event, optionally taking a value. This can be used
to control tasks' flow by 'sending' commands back and fort"""
async def set(self):
"""
Sets the event, waking up all tasks that called
pause() on this event
"""
if self._set:
raise GiambioError("The event has already been set")
await event_set(self, value)
await event_set(self)
async def pause(self):
"""Waits until the event is set and returns a value"""
@ -88,8 +88,10 @@ class Event:
class TimeQueue:
"""An abstraction layer over a heap queue based on time. This is where
sleeping tasks will be put when they are asleep"""
"""
An abstraction layer over a heap queue based on time. This is where
sleeping tasks will be put when they are not running
"""
def __init__(self, clock):
self.clock = clock
@ -112,8 +114,17 @@ class TimeQueue:
return f"TimeQueue({self.container}, clock={self.clock})"
def put(self, item, amount):
"""
Pushes an item onto the queue with its unique
time amount and ID
"""
heappush(self.container, (self.clock() + amount, self.sequence, item))
self.sequence += 1
def get(self):
"""
Gets the first task that is meant to run
"""
return heappop(self.container)[2]

View File

@ -14,11 +14,45 @@ See the License for the specific language governing permissions and
limitations under the License.
"""
from ._core import AsyncScheduler
from types import coroutine
import threading
from ._layers import Task
from .socket import AsyncSocket
from types import FunctionType
import socket
def run(coro: coroutine):
"""Shorthand for giambio.AsyncScheduler().start(coro)"""
thread_local = threading.local()
... # How to do it? (Share objects between coroutines etc)
def run(func: FunctionType, *args) -> Task:
"""
Starts the event loop from a synchronous entry point
"""
return thread_local.loop.start(func, *args)
def clock():
"""
Returns the current clock time of the thread-local event
loop
"""
return thread_local.loop.clock()
def spawn(func: FunctionType, *args):
"""
Spawns a child task in the current event
loop
"""
return thread_local.loop.spawn(func, *args)
def wrap_socket(sock: socket.socket) -> AsyncSocket:
"""
Wraps a synchronous socket into a giambio.socket.AsyncSocket
"""
return thread_local.loop.wrap_socket(sock)

View File

@ -56,13 +56,16 @@ def join(task):
@types.coroutine
def cancel(task):
"""'Tells' the scheduler that the passed task must be cancelled
"""
'Tells' the scheduler that the passed task must be cancelled
The concept of cancellation here is tricky, because there is no real way to 'stop' a
running task if not by raising an exception inside it and just ignore whatever the task
returns (and also hoping that the task won't cause damage when exiting abruptly).
returns (and also hoping that the task won't cause collateral damage when exiting abruptly).
It is highly recommended that when you write a coroutine you take into account that it might
be cancelled at any time
be cancelled at any time. Please note, though, that ignoring a giambio.exceptions.CancelledError
exception *will* break your code, so if you really wanna do that be sure to re-raise
it when done!
"""
yield "cancel", task
@ -71,7 +74,8 @@ def cancel(task):
@types.coroutine
def want_read(sock: socket.socket):
"""'Tells' the event loop that there is some coroutine that wants to read from the given socket
"""
'Tells' the event loop that there is some coroutine that wants to read from the given socket
:param sock: The socket to perform the operation on
:type sock: class: socket.socket
@ -82,7 +86,8 @@ def want_read(sock: socket.socket):
@types.coroutine
def want_write(sock: socket.socket):
"""'Tells' the event loop that there is some coroutine that wants to write on the given socket
"""
'Tells' the event loop that there is some coroutine that wants to write on the given socket
:param sock: The socket to perform the operation on
:type sock: class: socket.socket
@ -92,18 +97,19 @@ def want_write(sock: socket.socket):
@types.coroutine
def event_set(event, value):
def event_set(event):
"""Communicates to the loop that the given event object
must be set. This is important as the loop constantly
checks for active events to deliver them
"""
yield "event_set", event, value
yield "event_set", event
@types.coroutine
def event_wait(event):
"""Notifies the event loop that the current task has to wait
"""
Notifies the event loop that the current task has to wait
for the event to trigger
"""

View File

@ -81,12 +81,10 @@ class AsyncSocket(object):
await self.loop._connect_sock(self.sock, addr)
async def __aenter__(self):
await sleep(0)
return self.sock.__enter__()
async def __aexit__(self, *args):
await sleep(0)
return self.sock.__exit__(*args)
async def __aexit__(self, *_):
await self.close()
def __repr__(self):
return f"giambio.socket.AsyncSocket({self.sock}, {self.loop})"

View File

@ -1,12 +1,16 @@
import giambio
# A test for cancellation
async def countdown(n: int):
while n > 0:
print(f"Down {n}")
n -= 1
await giambio.sleep(1)
print("Countdown over")
# raise Exception("oh no man")
return 0
@ -21,8 +25,8 @@ async def countup(stop: int, step: int = 1):
async def main():
cdown = scheduler.create_task(countdown(10))
cup = scheduler.create_task(countup(5, 2))
cdown = giambio.spawn(countdown, 10)
cup = giambio.spawn(countup, 5, 2)
print("Counters started, awaiting completion")
await giambio.sleep(2)
print("Slept 2 seconds, killing countup")
@ -36,8 +40,7 @@ async def main():
if __name__ == "__main__":
scheduler = giambio.AsyncScheduler()
try:
scheduler.start(main())
except Exception:
print("bruh")
giambio.run(main)
except Exception as e:
print(f"Exception caught! -> {type(e).__name__}: {e}")

View File

@ -1,30 +1,35 @@
import giambio
async def child(notifier: giambio.Event, reply: giambio.Event, pause: int):
print("[child] Child is alive! Going to sleep until notified")
notification = await notifier.pause()
print(f"[child] Parent said: '{notification}', replying in {pause} seconds")
# A test for events
async def child(notifier: giambio.Event, pause: int):
print("[child] Child is alive! Going to wait until notified")
start_total = giambio.clock()
await notifier.pause()
end_pause = giambio.clock() - start_total
print(f"[child] Parent set the event, exiting in {pause} seconds")
start_sleep = giambio.clock() - start_total
await giambio.sleep(pause)
print("[child] Replying to parent")
await reply.set("Hi daddy!")
end_sleep = giambio.clock() - start_sleep
end_total = giambio.clock() - start_total
print(f"[child] Done! Slept for {end_total} seconds total ({end_pause} paused, {end_sleep} sleeping), nice nap!")
async def parent(pause: int = 1):
event = giambio.Event()
reply = giambio.Event()
print("[parent] Spawning child task")
task = scheduler.create_task(child(event, reply, pause))
task = giambio.spawn(child, event, pause + 2)
start = giambio.clock()
print(f"[parent] Sleeping {pause} second(s) before setting the event")
await giambio.sleep(pause)
await event.set("Hi, my child")
print("[parent] Event set, awaiting reply")
reply = await reply.pause()
print(f"[parent] Child replied: '{reply}'")
await event.set()
print("[parent] Event set, awaiting child")
await task.join()
print("[parent] Child exited")
end = giambio.clock() - start
print(f"[parent] Child exited in {end} seconds")
if __name__ == "__main__":
scheduler = giambio.AsyncScheduler()
scheduler.start(parent(5))
giambio.run(parent, 3)

26
tests/join.py Normal file
View File

@ -0,0 +1,26 @@
import giambio
# A test to see if tasks are properly joined
async def child(sleep: int):
start = giambio.clock()
print(f"[child] Gonna sleep for {sleep} seconds!")
await giambio.sleep(sleep)
end = giambio.clock() - start
print(f"[child] I woke up! Slept for {end} seconds")
async def main():
print("[parent] Spawning child")
task = giambio.spawn(child, 5)
start = giambio.clock()
print("[parent] Child spawned, awaiting completion")
await task.join()
end = giambio.clock() - start
print(f"[parent] Child exited in {end} seconds")
if __name__ == "__main__":
giambio.run(main)

View File

@ -1,9 +1,11 @@
import giambio
import traceback
from giambio.socket import AsyncSocket
import socket
import logging
import sys
sched = giambio.AsyncScheduler()
# A test to check for asynchronous I/O
logging.basicConfig(
level=20, format="[%(levelname)s] %(asctime)s %(message)s", datefmt="%d/%m/%Y %p"
@ -15,21 +17,26 @@ async def server(address: tuple):
sock.bind(address)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.listen(5)
asock = sched.wrap_socket(sock)
asock = giambio.wrap_socket(sock) # We make the socket an async socket
logging.info(f"Echo server serving asynchronously at {address}")
while True:
conn, addr = await asock.accept()
logging.info(f"{addr} connected")
task = sched.create_task(echo_handler(conn, addr))
task = giambio.spawn(echo_handler, conn, addr)
# await task.join() # TODO: Joining I/O tasks seems broken
async def echo_handler(sock: AsyncSocket, addr: tuple):
async with sock:
await sock.send_all(b"Welcome to the server pal!\n")
await sock.send_all(b"Welcome to the server pal, feel free to send me something!\n")
while True:
data = await sock.receive(1000)
await sock.send_all(b"-> ")
data = await sock.receive(1024)
if not data:
break
elif data == b"raise\n":
await sock.send_all(b"I'm dead dude\n")
raise TypeError("Oh, no, I'm gonna die!")
to_send_back = data
data = data.decode("utf-8").encode("unicode_escape")
logging.info(f"Got: '{data.decode('utf-8')}' from {addr}")
@ -40,6 +47,8 @@ async def echo_handler(sock: AsyncSocket, addr: tuple):
if __name__ == "__main__":
try:
sched.start(server(("", 25001)))
except KeyboardInterrupt: # Exceptions propagate!
print("Exiting...")
giambio.run(server, ("", 1501))
except BaseException as error: # Exceptions propagate!
print(f"Exiting due to a {type(error).__name__}: '{error}'", end=" ")
print("traceback below (or above, or in the middle, idk async is weird)")
traceback.print_exception(*sys.exc_info())