Fixed events + Added some TODOs

This commit is contained in:
nocturn9x 2020-11-16 23:06:54 +01:00
parent 10c1b33e20
commit 497ef45307
9 changed files with 101 additions and 102 deletions

View File

@ -18,16 +18,14 @@ __author__ = "Nocturn9x aka Isgiambyy"
__version__ = (1, 0, 0)
from .exceptions import GiambioError, AlreadyJoinedError, CancelledError
from . import exceptions
from .traps import sleep, current_task
from .objects import Event
from .run import run, clock, wrap_socket, create_pool, get_event_loop, new_event_loop
__all__ = [
"GiambioError",
"AlreadyJoinedError",
"CancelledError",
"exceptions",
"sleep",
"Event",
"run",

View File

@ -1,4 +1,6 @@
"""
Higher-level context managers for async pools
Copyright (C) 2020 nocturn9x
Licensed under the Apache License, Version 2.0 (the "License");
@ -61,6 +63,7 @@ class TaskManager:
for task in self.tasks:
try:
await task.join()
except BaseException:
except BaseException as e:
self.tasks.remove(task)
for to_cancel in self.tasks:
await to_cancel.cancel()
await to_cancel.cancel()

View File

@ -1,4 +1,6 @@
"""
The main runtime environment for giambio
Copyright (C) 2020 nocturn9x
Licensed under the Apache License, Version 2.0 (the "License");
@ -22,18 +24,15 @@ from timeit import default_timer
from .objects import Task, TimeQueue
from socket import SOL_SOCKET, SO_ERROR
from .traps import want_read, want_write
from collections import defaultdict, deque
from collections import deque
from .socket import AsyncSocket, WantWrite, WantRead
from selectors import DefaultSelector, EVENT_READ, EVENT_WRITE
from .exceptions import (
AlreadyJoinedError,
CancelledError,
ResourceBusy,
GiambioError
)
# The main runtime environment for giambio
class AsyncScheduler:
"""
@ -62,8 +61,6 @@ class AsyncScheduler:
self.paused = TimeQueue(self.clock)
# All active Event objects
self.events = set()
# Coroutines waiting on event objects
self.event_waiting = defaultdict(list)
# Data to send back to a trap
self.to_send = None
# Have we ever ran?
@ -74,7 +71,10 @@ class AsyncScheduler:
Returns True if there is work to do
"""
if self.selector.get_map() or any([self.paused, self.tasks, self.event_waiting]):
if self.selector.get_map() or any([self.paused,
self.tasks,
self.events
]):
return False
return True
@ -83,6 +83,7 @@ class AsyncScheduler:
Shuts down the event loop
"""
# TODO: See if other teardown is required (massive join()?)
self.selector.close()
def run(self):
@ -106,9 +107,9 @@ class AsyncScheduler:
if self.selector.get_map():
# The next step is checking for I/O
self.check_io()
if self.event_waiting:
if self.events:
# Try to awake event-waiting tasks
self.trigger_events()
self.check_events()
# While there are tasks to run
while self.tasks:
# Sets the currently running task
@ -132,6 +133,7 @@ class AsyncScheduler:
self.current_task.status = "cancelled"
self.current_task.cancelled = True
self.current_task.cancel_pending = False
self.join() # TODO: Investigate if a call to join() is needed
except StopIteration as ret:
# Coroutine ends
self.current_task.status = "end"
@ -150,8 +152,8 @@ class AsyncScheduler:
as tasks are independent
"""
# TODO: Do we need anything else?
self.current_task.throw(CancelledError)
self.current_task.coroutine.close()
def get_running(self):
"""
@ -161,16 +163,17 @@ class AsyncScheduler:
self.tasks.append(self.current_task)
self.to_send = self.current_task
def trigger_events(self):
def check_events(self):
"""
Checks for ready or expired events and triggers them
"""
for event, tasks in self.event_waiting.copy().items():
for event in self.events.copy():
if event.set:
event.event_caught = True
self.tasks.extend(tasks + [event.notifier])
self.event_waiting.pop(event)
event.waiters
self.tasks.extend(event.waiters)
self.events.remove(event)
def awake_sleeping(self):
"""
@ -213,15 +216,6 @@ class AsyncScheduler:
if entry.exc:
raise entry.exc from None
def reschedule_parent(self):
"""
Reschedules the parent task of the
currently running task, if any
"""
if parent := self.current_task.parent:
self.tasks.append(parent)
def reschedule_joinee(self):
"""
Reschedules the joinee(s) task of the
@ -238,11 +232,12 @@ class AsyncScheduler:
child = self.current_task
child.joined = True
if child.parent:
child.waiters.append(child.parent)
if child.finished:
self.reschedule_joinee()
self.reschedule_parent()
elif child.exc:
raise child.exc
... # TODO: Handle exceptions
def sleep(self, seconds: int or float):
"""
@ -258,7 +253,8 @@ class AsyncScheduler:
# TODO: More generic I/O rather than just sockets
def want_read(self, sock: socket.socket):
"""
Handler for the 'want_read' event, registers the socket inside the selector to perform I/0 multiplexing
Handler for the 'want_read' event, registers the socket inside the
selector to perform I/0 multiplexing
"""
self.current_task.status = "I/O"
@ -277,7 +273,8 @@ class AsyncScheduler:
def want_write(self, sock: socket.socket):
"""
Handler for the 'want_write' event, registers the socket inside the selector to perform I/0 multiplexing
Handler for the 'want_write' event, registers the socket inside the
selector to perform I/0 multiplexing
"""
self.current_task.status = "I/O"
@ -286,7 +283,7 @@ class AsyncScheduler:
# Socket is already scheduled!
return
else:
# modify() causes issues
# TODO: Inspect why modify() causes issues
self.selector.unregister(sock)
self.current_task.last_io = "WRITE", sock
try:
@ -299,27 +296,23 @@ class AsyncScheduler:
Sets an event
"""
event.notifier = self.current_task
event.set = True
self.events.add(event)
event.waiters.append(self.current_task)
event.set = True
self.reschedule_joinee()
def event_wait(self, event):
"""
Waits for an event
Pauses the current task on an event
"""
if event in self.events:
event.waiting -= 1
if event.waiting <= 0:
return self.events.remove(event)
else:
return
else:
self.event_waiting[event].append(self.current_task)
event.waiters.append(self.current_task)
def cancel(self):
"""
Handler for the 'cancel' event, schedules the task to be cancelled later
or does so straight away if it is safe to do so
"""
if self.current_task.status in ("I/O", "sleep"):
@ -337,8 +330,8 @@ class AsyncScheduler:
async def read_sock(self, sock: socket.socket, buffer: int):
"""
Reads from a socket asynchronously, waiting until the resource is available and returning up to buffer bytes
from the socket
Reads from a socket asynchronously, waiting until the resource is
available and returning up to buffer bytes from the socket
"""
try:
@ -349,8 +342,8 @@ class AsyncScheduler:
async def accept_sock(self, sock: socket.socket):
"""
Accepts a socket connection asynchronously, waiting until the resource is available and returning the
result of the accept() call
Accepts a socket connection asynchronously, waiting until the resource
is available and returning the result of the accept() call
"""
try:

View File

@ -1,4 +1,6 @@
"""
Exceptions for giambio
Copyright (C) 2020 nocturn9x
Licensed under the Apache License, Version 2.0 (the "License");
@ -16,36 +18,35 @@ limitations under the License.
class GiambioError(Exception):
"""Base class for gaimbio exceptions"""
"""
Base class for giambio exceptions
"""
pass
class AlreadyJoinedError(GiambioError):
pass
...
class CancelledError(BaseException):
"""Exception raised by the giambio._layers.Task.cancel() method"""
"""
Exception raised by the giambio.objects.Task.cancel() method
to terminate a child task. This should NOT be catched, or
at least it should be re-raised and never ignored
"""
def __repr__(self):
return "giambio.exceptions.CancelledError"
...
class ResourceBusy(GiambioError):
"""Exception that is raised when a resource is accessed by more than
one task at a time"""
"""
Exception that is raised when a resource is accessed by more than
one task at a time
"""
pass
class BrokenPipeError(GiambioError):
"""Wrapper around the broken pipe socket.error"""
pass
...
class ResourceClosed(GiambioError):
"""Raised when I/O is attempted on a closed fd"""
"""
Raised when I/O is attempted on a closed resource
"""
pass
...

View File

@ -1,4 +1,6 @@
"""
Various object wrappers and abstraction layers
Copyright (C) 2020 nocturn9x
Licensed under the Apache License, Version 2.0 (the "License");
@ -61,15 +63,17 @@ class Task:
Joins the task
"""
return await join(self)
res = await join(self)
if self.exc:
raise self.exc
return res
async def cancel(self):
"""
Cancels the task
"""
if not self.exc and not self.cancelled and not self.finished:
await cancel(self)
await cancel(self)
def __del__(self):
self.coroutine.close()
@ -86,11 +90,10 @@ class Event:
"""
self.set = False
self.waiters = []
self.event_caught = False
self.timeout = None
self.waiting = 0
async def activate(self):
async def trigger(self):
"""
Sets the event, waking up all tasks that called
pause() on us
@ -100,12 +103,11 @@ class Event:
raise GiambioError("The event has already been set")
await event_set(self)
async def pause(self):
async def wait(self):
"""
Waits until the event is set
"""
self.waiting += 1
await event_wait(self)

View File

@ -1,4 +1,6 @@
"""
Helper methods and public API
Copyright (C) 2020 nocturn9x
Licensed under the Apache License, Version 2.0 (the "License");

View File

@ -1,5 +1,4 @@
"""
Basic abstraction layer for giambio asynchronous sockets
Copyright (C) 2020 nocturn9x
@ -22,6 +21,7 @@ import socket
from .exceptions import ResourceClosed
from .traps import sleep
# Stolen from curio
try:
from ssl import SSLWantReadError, SSLWantWriteError
@ -34,18 +34,18 @@ except ImportError:
class AsyncSocket(object):
"""
Abstraction layer for asynchronous sockets
Abstraction layer for asynchronous TCP sockets
"""
def __init__(self, sock: socket.socket, loop):
self.sock = sock
self.sock.setblocking(False)
self.loop = loop
self._closed = False
self.sock.setblocking(False)
async def receive(self, max_size: int):
"""
Receives up to max_size from a socket asynchronously
Receives up to max_size bytes from a socket asynchronously
"""
if self._closed:

View File

@ -1,4 +1,10 @@
"""
Implementation for all giambio traps, which are hooks
into the event loop and allow it to switch tasks.
These coroutines are the one and only way to interact
with the event loop from the user's perspective, and
the entire library is based on them
Copyright (C) 2020 nocturn9x
Licensed under the Apache License, Version 2.0 (the "License");
@ -14,12 +20,6 @@ See the License for the specific language governing permissions and
limitations under the License.
"""
# Implementation for all giambio traps, which are hooks
# into the event loop and allow it to switch tasks
# These coroutines are the one and only way to interact
# with the event loop from the user's perspective, and
# the entire library is based on these traps
import types
@ -95,7 +95,7 @@ async def cancel(task):
async def want_read(stream):
"""
Notifies the event loop that a task that wants to read from the given
Notifies the event loop that a task wants to read from the given
resource
:param stream: The resource that needs to be read
@ -106,7 +106,7 @@ async def want_read(stream):
async def want_write(stream):
"""
Notifies the event loop that a task that wants to read from the given
Notifies the event loop that a task wants to write on the given
resource
:param stream: The resource that needs to be written
@ -128,7 +128,7 @@ async def event_set(event):
async def event_wait(event):
"""
Notifies the event loop that the current task has to wait
for given event to trigger
for the given event to trigger
"""
await create_trap("event_wait", event)

View File

@ -4,10 +4,10 @@ import giambio
# A test for events
async def child(notifier: giambio.Event, pause: int):
async def child(ev: giambio.Event, pause: int):
print("[child] Child is alive! Going to wait until notified")
start_total = giambio.clock()
await notifier.pause()
await ev.wait()
end_pause = giambio.clock() - start_total
print(f"[child] Parent set the event, exiting in {pause} seconds")
start_sleep = giambio.clock()
@ -18,15 +18,15 @@ async def child(notifier: giambio.Event, pause: int):
async def parent(pause: int = 1):
event = giambio.Event()
print("[parent] Spawning child task")
task = giambio.spawn(child, event, pause + 2)
start = giambio.clock()
print(f"[parent] Sleeping {pause} second(s) before setting the event")
await giambio.sleep(pause)
await event.set()
print("[parent] Event set, awaiting child")
await task.join()
async with giambio.create_pool() as pool:
event = giambio.Event()
print("[parent] Spawning child task")
pool.spawn(child, event, pause + 2)
start = giambio.clock()
print(f"[parent] Sleeping {pause} second(s) before setting the event")
await giambio.sleep(pause)
await event.trigger()
print("[parent] Event set, awaiting child")
end = giambio.clock() - start
print(f"[parent] Child exited in {end} seconds")