Added some documentation, a test for timeouts and fixed some bugs with I/O

This commit is contained in:
nocturn9x 2020-12-20 15:58:53 +01:00
parent c8a1008646
commit 5b403703db
8 changed files with 565 additions and 201 deletions

View File

@ -1,5 +1,5 @@
"""
Asynchronous Python made easy (and friendly)
Giambio: Asynchronous Python made easy (and friendly)
Copyright (C) 2020 nocturn9x
@ -16,19 +16,22 @@ See the License for the specific language governing permissions and
limitations under the License.
"""
__author__ = "Nocturn9x aka Isgiambyy"
__version__ = (1, 0, 0)
__author__ = "Nocturn9x"
__version__ = (0, 0, 1)
from . import exceptions, socket
from . import exceptions, socket, context, core
from .socket import wrap_socket
from .traps import sleep, current_task
from .objects import Event
from .run import run, clock, create_pool, get_event_loop, new_event_loop, with_timeout
from .util import debug
__all__ = [
"exceptions",
"core",
"context",
"sleep",
"Event",
"run",

View File

@ -1,5 +1,5 @@
"""
Higher-level context managers for async pools
Higher-level context manager(s) for async pools
Copyright (C) 2020 nocturn9x
@ -16,38 +16,50 @@ See the License for the specific language governing permissions and
limitations under the License.
"""
import giambio
import types
from giambio.objects import Task
from giambio.core import AsyncScheduler
from typing import List
class TaskManager:
"""
An asynchronous context manager for giambio
:param loop: The event loop bound to this pool. Most of the times
it's the return value from giambio.get_event_loop()
:type loop: :class: AsyncScheduler
:param timeout: The pool's timeout length in seconds, if any, defaults to None
:type timeout: float, optional
"""
def __init__(self, loop: AsyncScheduler, timeout: float = None) -> None:
def __init__(self, loop: "giambio.core.AsyncScheduler", timeout: float = None) -> None:
"""
Object constructor
"""
self.loop = loop
self.tasks = [] # We store a reference to all tasks in this pool, even the paused ones!
self.cancelled = False
self.started = self.loop.clock()
# The event loop associated with this pool
self.loop: "giambio.core.AsyncScheduler" = loop
# All the tasks that belong to this pool
self.tasks: List[giambio.objects.Task] = []
# Whether we have been cancelled or not
self.cancelled: bool = False
# The clock time of when we started running, used for
# timeouts expiration
self.started: float = self.loop.clock()
# The pool's timeout (in seconds)
if timeout:
self.timeout = self.started + timeout
self.timeout: float = self.started + timeout
else:
self.timeout = None
self.timed_out = False
self.timeout: None = None
# Whether our timeout expired or not
self.timed_out: bool = False
def spawn(self, func: types.FunctionType, *args):
def spawn(self, func: types.FunctionType, *args) -> "giambio.objects.Task":
"""
Spawns a child task
"""
task = Task(func(*args), func.__name__ or str(func), self)
task = giambio.objects.Task(func.__name__ or str(func), func(*args), self)
task.joiners = [self.loop.current_task]
task.next_deadline = self.timeout or 0.0
self.loop.tasks.append(task)
@ -55,13 +67,13 @@ class TaskManager:
self.tasks.append(task)
return task
def spawn_after(self, func: types.FunctionType, n: int, *args):
def spawn_after(self, func: types.FunctionType, n: int, *args) -> "giambio.objects.Task":
"""
Schedules a task for execution after n seconds
"""
assert n >= 0, "The time delay can't be negative"
task = Task(func(*args), func.__name__ or str(func), self)
task = giambio.objects.Task(func.__name__ or str(func), func(*args), self)
task.joiners = [self.loop.current_task]
task.next_deadline = self.timeout or 0.0
task.sleep_start = self.loop.clock()
@ -71,23 +83,39 @@ class TaskManager:
return task
async def __aenter__(self):
"""
Implements the asynchronous context manager interface,
marking the pool as started and returning itself
"""
return self
async def __aexit__(self, exc_type: Exception, exc: Exception, tb):
"""
Implements the asynchronous context manager interface, joining
all the tasks spawned inside the pool
"""
for task in self.tasks:
# This forces Python to stop at the
# This forces the interpreter to stop at the
# end of the block and wait for all
# children to exit
await task.join()
async def cancel(self):
"""
Cancels the whole block
Cancels the pool entirely, iterating over all
the pool's tasks and cancelling them
"""
# TODO: This breaks, somehow, investigation needed
for task in self.tasks:
await task.cancel()
def done(self):
def done(self) -> bool:
"""
Returns True if all the tasks inside the
pool have exited, False otherwise
"""
return all([task.done() for task in self.tasks])

View File

@ -19,11 +19,13 @@ limitations under the License.
# Import libraries and internal resources
import types
import socket
from timeit import default_timer
from giambio.objects import Task, TimeQueue, DeadlinesQueue
from giambio.traps import want_read, want_write
from giambio.util.debug import BaseDebugger
from itertools import chain
from timeit import default_timer
from giambio.context import TaskManager
from typing import List, Optional, Set, Any
from giambio.util.debug import BaseDebugger
from giambio.traps import want_read, want_write
from giambio.objects import Task, TimeQueue, DeadlinesQueue, Event
from selectors import DefaultSelector, EVENT_READ, EVENT_WRITE
from giambio.exceptions import (InternalError,
CancelledError,
@ -34,7 +36,7 @@ from giambio.exceptions import (InternalError,
# TODO: Take into account SSLWantReadError and SSLWantWriteError
IOInterrupt = (BlockingIOError, InterruptedError)
# TODO: Right now this value is pretty much arbitrary, we need some euristic testing to choose a sensible default
# TODO: Right now this value is pretty much arbitrary, we need some testing to choose a sensible default
IO_SKIP_LIMIT = 5
@ -46,6 +48,13 @@ class AsyncScheduler:
with its calculations. An attempt to fix the threaded model has been made
without making the API unnecessarily complicated. A few examples are tasks
cancellation and exception propagation.
:param clock: A callable returning monotonically increasing values at each call,
defaults to timeit.default_timer
:type clock: :class: types.FunctionType
:param debugger: A subclass of giambio.util.BaseDebugger or None if no debugging output
is desired, defaults to None
:type debugger: :class: giambio.util.BaseDebugger
"""
def __init__(self, clock: types.FunctionType = default_timer, debugger: BaseDebugger = None):
@ -54,37 +63,38 @@ class AsyncScheduler:
"""
# The debugger object. If it is none we create a dummy object that immediately returns an empty
# lambda every time you access any of its attributes to avoid lots of if self.debugger clauses
# lambda which in turn returns None every time we access any of its attributes to avoid lots of
# if self.debugger clauses
if debugger:
assert issubclass(type(debugger),
BaseDebugger), "The debugger must be a subclass of giambio.util.BaseDebugger"
self.debugger = debugger or type("DumbDebugger", (object, ), {"__getattr__": lambda *args: lambda *arg: None})()
# Tasks that are ready to run
self.tasks = []
self.tasks: List[Task] = []
# Selector object to perform I/O multiplexing
self.selector = DefaultSelector()
self.selector: DefaultSelector = DefaultSelector()
# This will always point to the currently running coroutine (Task object)
self.current_task = None
self.current_task: Optional[Task] = None
# Monotonic clock to keep track of elapsed time reliably
self.clock = clock
self.clock: types.FunctionType = clock
# Tasks that are asleep
self.paused = TimeQueue(self.clock)
self.paused: TimeQueue = TimeQueue(self.clock)
# All active Event objects
self.events = set()
self.events: Set[Event] = set()
# Data to send back to a trap
self.to_send = None
self.to_send: Optional[Any] = None
# Have we ever ran?
self.has_ran = False
self.has_ran: bool = False
# The current pool
self.current_pool = None
self.current_pool: Optional[TaskManager] = None
# How many times we skipped I/O checks to let a task run.
# We limit the number of times we skip such checks to avoid
# I/O starvation in highly concurrent systems
self.io_skip = 0
self.io_skip: int = 0
# A heap queue of deadlines to be checked
self.deadlines = DeadlinesQueue()
self.deadlines: DeadlinesQueue = DeadlinesQueue()
def done(self):
def done(self) -> bool:
"""
Returns True if there is no work to do
"""
@ -136,7 +146,13 @@ class AsyncScheduler:
while self.tasks:
# Sets the currently running task
self.current_task = self.tasks.pop(0)
# Sets the current pool (for nested pools)
if self.current_task.done():
# Since ensure_discard only checks for paused tasks,
# we still need to make sure we don't try to execute
# exited tasks that are on the running queue
continue
# Sets the current pool: we need this to take nested pools
# into account and behave accordingly
self.current_pool = self.current_task.pool
if self.current_pool and self.current_pool.timeout and not self.current_pool.timed_out:
# Stores deadlines for tasks (deadlines are pool-specific).
@ -144,7 +160,7 @@ class AsyncScheduler:
# a deadline for the same pool twice. This makes the timeouts
# model less flexible, because one can't change the timeout
# after it is set, but it makes the implementation easier.
self.deadlines.put(self.current_pool.timeout, self.current_pool)
self.deadlines.put(self.current_pool)
self.debugger.before_task_step(self.current_task)
if self.current_task.cancel_pending:
# We perform the deferred cancellation
@ -191,24 +207,26 @@ class AsyncScheduler:
self.join(self.current_task)
except BaseException as err:
# TODO: We might want to do a bit more complex traceback hacking to remove any extra
# frames from the exception call stack, but for now removing at least the first one
# seems a sensible approach (it's us catching it so we don't care about that)
# frames from the exception call stack, but for now removing at least the first one
# seems a sensible approach (it's us catching it so we don't care about that)
self.current_task.exc = err
self.current_task.exc.__traceback__ = self.current_task.exc.__traceback__.tb_next
self.current_task.status = "crashed"
self.debugger.on_exception_raised(self.current_task, err)
self.join(self.current_task)
self.ensure_discard(self.current_task)
def do_cancel(self, task: Task):
"""
Performs task cancellation by throwing CancelledError inside the given
task in order to stop it from running. The loop continues to execute
as tasks are independent
task in order to stop it from running
:param task: The task to cancel
:type task: :class: Task
"""
if not task.cancelled and not task.exc:
self.debugger.before_cancel(task)
task.throw(CancelledError())
self.debugger.before_cancel(task)
task.throw(CancelledError())
def get_current(self):
"""
@ -224,15 +242,20 @@ class AsyncScheduler:
inside the correct pool if its timeout expired
"""
while self.deadlines and self.deadlines[0][0] <= self.clock():
_, __, pool = self.deadlines.get()
while self.deadlines and self.deadlines.get_closest_deadline() <= self.clock():
pool = self.deadlines.get()
pool.timed_out = True
self.cancel_all_from_current_pool(pool)
self.cancel_pool(pool)
# Since we already know that exceptions will behave correctly
# (heck, half of the code in here only serves that purpose)
# all we do here is just raise an exception as if the current
# task raised it and let our machinery deal with the rest
raise TooSlowError()
def check_events(self):
"""
Checks for ready or expired events and triggers them
Checks for ready/expired events and triggers them by
rescheduling all the tasks that called wait() on them
"""
for event in self.events.copy():
@ -252,23 +275,24 @@ class AsyncScheduler:
has elapsed
"""
while self.paused and self.paused[0][0] <= self.clock():
while self.paused and self.paused.get_closest_deadline() <= self.clock():
# Reschedules tasks when their deadline has elapsed
task = self.paused.get()
if not task.done():
slept = self.clock() - task.sleep_start
task.sleep_start = 0.0
self.tasks.append(task)
self.debugger.after_sleep(task, slept)
slept = self.clock() - task.sleep_start
task.sleep_start = 0.0
self.tasks.append(task)
self.debugger.after_sleep(task, slept)
def check_io(self):
"""
Checks for I/O and implements the sleeping mechanism
Checks for I/O and implements part of the sleeping mechanism
for the event loop
"""
before_time = self.clock() # Used for the debugger
if self.tasks or self.events:
# If there is work to do immediately we prefer to
# do that first unless some conditions are met, see below
self.io_skip += 1
if self.io_skip == IO_SKIP_LIMIT:
# We can't skip every time there's some task ready
@ -284,18 +308,19 @@ class AsyncScheduler:
# If there are asleep tasks or deadlines, wait until the closest date
if not self.deadlines:
# If there are no deadlines just wait until the first task wakeup
timeout = min([max(0.0, self.paused[0][0] - self.clock())])
timeout = min([max(0.0, self.paused.get_closest_deadline() - self.clock())])
elif not self.paused:
# If there are no sleeping tasks just wait until the first deadline
timeout = min([max(0.0, self.deadlines[0][0] - self.clock())])
timeout = min([max(0.0, self.deadlines.get_closest_deadline() - self.clock())])
else:
# If there are both deadlines AND sleeping tasks scheduled we calculate
# If there are both deadlines AND sleeping tasks scheduled, we calculate
# the absolute closest deadline among the two sets and use that as a timeout
clock = self.clock()
timeout = min([max(0.0, self.paused[0][0] - clock), self.deadlines[0][0] - clock])
timeout = min([max(0.0, self.paused.get_closest_deadline() - clock),
self.deadlines.get_closest_deadline() - clock])
else:
# If there is *only* I/O, we wait a fixed amount of time
timeout = 86400 # Thanks trio :D
timeout = 86400 # Stolen from trio :D
self.debugger.before_io(timeout)
io_ready = self.selector.select(timeout)
# Get sockets that are ready and schedule their tasks
@ -308,7 +333,7 @@ class AsyncScheduler:
Starts the event loop from a sync context
"""
entry = Task(func(*args), func.__name__ or str(func), None)
entry = Task(func.__name__ or str(func), func(*args), None)
self.tasks.append(entry)
self.debugger.on_start()
self.run()
@ -317,27 +342,14 @@ class AsyncScheduler:
if entry.exc:
raise entry.exc
def reschedule_joiners(self, task: Task):
def cancel_pool(self, pool: TaskManager):
"""
Reschedules the parent(s) of the
given task, if any
Cancels all tasks in the given pool
:param pool: The pool to be cancelled
:type pool: :class: TaskManager
"""
for t in task.joiners:
if t not in self.tasks:
# Since a task can be the parent
# of multiple children, we need to
# make sure we reschedule it only
# once, otherwise a RuntimeError will
# occur
self.tasks.append(t)
def cancel_all_from_current_pool(self, pool=None):
"""
Cancels all tasks in the current pool (or the given one)
"""
pool = pool or self.current_pool
if pool:
for to_cancel in pool.tasks:
self.cancel(to_cancel)
@ -361,24 +373,24 @@ class AsyncScheduler:
def get_asleep_tasks(self):
"""
Yields all tasks currently sleeping
Yields all tasks that are currently sleeping
"""
for asleep in self.paused.container:
yield asleep[2]
yield asleep[2] # Deadline, tiebreaker, task
def get_io_tasks(self) -> set:
def get_io_tasks(self):
"""
Yields all tasks waiting on I/O resources
Yields all tasks currently waiting on I/O resources
"""
for k in self.selector.get_map().values():
yield k.data
def get_all_tasks(self) -> set:
def get_all_tasks(self):
"""
Returns a generator yielding all tasks which the loop is currently
keeping track of. This includes both running and paused tasks.
keeping track of: this includes both running and paused tasks.
A paused task is a task which is either waiting on an I/O resource,
sleeping, or waiting on an event to be triggered
"""
@ -390,25 +402,29 @@ class AsyncScheduler:
def ensure_discard(self, task: Task):
"""
This method ensures that tasks that need to be cancelled are not
rescheduled further. This will act upon paused tasks only
Ensures that tasks that need to be cancelled are not
rescheduled further. This method exists because tasks might
be cancelled in a context where it's not obvious which Task
object must be discarded and not rescheduled at the next iteration
"""
# TODO: Do we need else ifs or ifs? The question arises because
# tasks might be doing I/O and other stuff too even if not at the same time
if task in self.paused:
self.paused.discard(task)
elif self.selector.get_map():
for key in self.selector.get_map().values():
if self.selector.get_map():
for key in dict(self.selector.get_map()).values():
if key.data == task:
self.selector.unregister(task)
elif task in self.get_event_tasks():
self.selector.unregister(key.fileobj)
if task in self.get_event_tasks():
for evt in self.events:
if task in evt.waiters:
evt.waiters.remove(task)
def cancel_all(self):
def cancel_all(self) -> bool:
"""
Cancels ALL tasks, this method is called as a result
of self.close()
Cancels ALL tasks as returned by self.get_all_tasks() and returns
whether all tasks exited or not
"""
for to_cancel in self.get_all_tasks():
@ -419,7 +435,7 @@ class AsyncScheduler:
"""
Closes the event loop, terminating all tasks
inside it and tearing down any extra machinery.
If ensure_done equals False, the loop will cancel *ALL*
If ensure_done equals False, the loop will cancel ALL
running and scheduled tasks and then tear itself down.
If ensure_done equals True, which is the default behavior,
this method will raise a GiambioError if the loop hasn't
@ -432,6 +448,21 @@ class AsyncScheduler:
raise GiambioError("event loop not terminated, call this method with ensure_done=False to forcefully exit")
self.shutdown()
def reschedule_joiners(self, task: Task):
"""
Reschedules the parent(s) of the
given task, if any
"""
for t in task.joiners:
if t not in self.tasks:
# Since a task can be the parent
# of multiple children, we need to
# make sure we reschedule it only
# once, otherwise a RuntimeError will
# occur
self.tasks.append(t)
def join(self, task: Task):
"""
Joins a task to its callers (implicitly, the parent
@ -442,28 +473,36 @@ class AsyncScheduler:
task.joined = True
if task.finished or task.cancelled:
if self.current_pool and self.current_pool.done() or not self.current_pool:
# If the current pool has finished executing or we're at the first parent
# task that kicked the loop, we can safely reschedule the parent(s)
self.reschedule_joiners(task)
elif task.exc:
if self.cancel_all_from_current_pool():
# This will reschedule the parent
# only if all the tasks inside it
# have finished executing, either
if self.cancel_pool(self.current_pool):
# This will reschedule the parent(s)
# only if all the tasks inside the current
# pool have finished executing, either
# by cancellation, an exception
# or just returned
self.reschedule_joiners(task)
def sleep(self, seconds: int or float):
"""
Puts the caller to sleep for a given amount of seconds
Puts the current task to sleep for a given amount of seconds
"""
self.debugger.before_sleep(self.current_task, seconds)
if seconds: # if seconds == 0, this acts as a switch!
if seconds:
self.current_task.status = "sleep"
self.current_task.sleep_start = self.clock()
self.paused.put(self.current_task, seconds)
self.current_task.next_deadline = self.current_task.sleep_start + seconds
else:
# When we're called with a timeout of 0 (the type checking is done
# way before this point) this method acts as a checkpoint that allows
# giambio to kick in and to its job without pausing the task's execution
# for too long. It is recommended to put a couple of checkpoints like these
# in your code if you see degraded concurrent performance in parts of your code
# that block the loop
self.tasks.append(self.current_task)
def cancel(self, task: Task):
@ -492,8 +531,7 @@ class AsyncScheduler:
# if for the next execution step of the task. Giambio will also make sure
# to re-raise cancellations at every checkpoint until the task lets the
# exception propagate into us, because we *really* want the task to be
# cancelled, and since asking kindly didn't work we have to use some
# force :)
# cancelled
task.status = "cancelled"
task.cancelled = True
task.cancel_pending = False
@ -504,11 +542,19 @@ class AsyncScheduler:
# defer this operation for later (check run() for more info)
task.cancel_pending = True # Cancellation is deferred
def event_set(self, event):
def event_set(self, event: Event):
"""
Sets an event
:param event: The event object to trigger
:type event: :class: Event
"""
# When an event is set, we store the event object
# for later, set its attribute and reschedule the
# task that called this method. All tasks waiting
# on this event object will be waken up on the next
# iteration
self.events.add(event)
event.set = True
self.tasks.append(self.current_task)
@ -516,39 +562,70 @@ class AsyncScheduler:
def event_wait(self, event):
"""
Pauses the current task on an event
:param event: The event object to pause upon
:type event: :class: Event
"""
event.waiters.append(self.current_task)
# Since we don't reschedule the task, it will
# not execute until check_events is called
# TODO: More generic I/O rather than just sockets (threads)
def read_or_write(self, sock: socket.socket, evt_type: str):
def register_sock(self, sock: socket.socket, evt_type: str):
"""
Registers the given socket inside the
selector to perform I/0 multiplexing
:param sock: The socket on which a read or write operation
has to be performed
:type sock: socket.socket
:param evt_type: The type of event to perform on the given
socket, either "read" or "write"
:type evt_type: str
"""
self.current_task.status = "io"
if self.current_task.last_io:
if self.current_task.last_io == (evt_type, sock):
# Socket is already scheduled!
return
# TODO: Inspect why modify() causes issues
self.selector.unregister(sock)
self.current_task.last_io = evt_type, sock
evt = EVENT_READ if evt_type == "read" else EVENT_WRITE
try:
self.selector.register(sock, evt, self.current_task)
except KeyError:
# The socket is already registered doing something else
raise ResourceBusy("The given resource is busy!") from None
if self.current_task.last_io:
# Since most of the times tasks will perform multiple
# I/O operations on a given socket, unregistering them
# every time isn't a sensible approach. A quick and
# easy optimization to address this problem is to
# store the last I/O operation that the task performed
# together with the resource itself, inside the task
# object. If the task wants to perform the same
# operation on the same socket again, then this method
# returns immediately as the socket is already being
# watched by the selector. If the resource is the same,
# but the event has changed, then we modify the resource's
# associated event. Only if the resource is different from
# the last used one this method will register a new socket
if self.current_task.last_io == (evt_type, sock):
# Socket is already listening for that event!
return
elif self.current_task.last_io[1] == sock:
# If the event to listen for has changed we just modify it
self.selector.modify(sock, evt, self.current_task)
self.current_task.last_io = (evt_type, sock)
else:
# Otherwise we register the new socket in our selector
self.current_task.last_io = evt_type, sock
try:
self.selector.register(sock, evt, self.current_task)
except KeyError:
# The socket is already registered doing something else
raise ResourceBusy("The given socket is being read/written by another task") from None
# noinspection PyMethodMayBeStatic
async def read_sock(self, sock: socket.socket, buffer: int):
"""
Reads from a socket asynchronously, waiting until the resource is
available and returning up to buffer bytes from the socket
:param sock: The socket that must be read
:type sock: socket.socket
:param buffer: The maximum amount of bytes that will be read
:type buffer: int
"""
await want_read(sock)
@ -559,24 +636,23 @@ class AsyncScheduler:
"""
Accepts a socket connection asynchronously, waiting until the resource
is available and returning the result of the sock.accept() call
:param sock: The socket that must be accepted
:type sock: socket.socket
"""
# TODO: Is this ok?
# This does not feel right because the loop will only
# exit when the socket has been accepted, preventing other
# stuff from running
while True:
try:
return sock.accept()
except IOInterrupt: # Do we need this exception thingy everywhere?
# Some methods have never errored out, but this did and doing
# so seemed to fix the issue, needs investigation
await want_read(sock)
await want_read(sock)
return sock.accept()
# noinspection PyMethodMayBeStatic
async def sock_sendall(self, sock: socket.socket, data: bytes):
"""
Sends all the passed bytes trough a socket asynchronously
Sends all the passed bytes trough the given socket, asynchronously
:param sock: The socket that must be written
:type sock: socket.socket
:param data: The bytes to send across the socket
:type data: bytes
"""
while data:
@ -584,10 +660,12 @@ class AsyncScheduler:
sent_no = sock.send(data)
data = data[sent_no:]
# TODO: This method seems to cause issues
async def close_sock(self, sock: socket.socket):
"""
Closes a socket asynchronously
Closes the given socket asynchronously
:param sock: The socket that must be closed
:type sock: socket.socket
"""
await want_write(sock)
@ -596,10 +674,17 @@ class AsyncScheduler:
self.current_task.last_io = ()
# noinspection PyMethodMayBeStatic
async def connect_sock(self, sock: socket.socket, addr: tuple):
async def connect_sock(self, sock: socket.socket, address_tuple: tuple):
"""
Connects a socket asynchronously
Connects a socket asynchronously to a given endpoint
:param sock: The socket that must to be connected
:type sock: socket.socket
:param address_tuple: A tuple in the same form as the one
passed to socket.socket.connect with an address as a string
and a port as an integer
:type address_tuple: tuple
"""
await want_write(sock)
return sock.connect(addr)
return sock.connect(address_tuple)

View File

@ -1,5 +1,5 @@
"""
Various object wrappers and abstraction layers
Various object wrappers and abstraction layers for internal use
Copyright (C) 2020 nocturn9x
@ -16,11 +16,10 @@ See the License for the specific language governing permissions and
limitations under the License.
"""
from giambio.traps import join, cancel, event_set, event_wait
from heapq import heappop, heappush, heapify
from giambio.exceptions import GiambioError
import giambio
from dataclasses import dataclass, field
import typing
from heapq import heappop, heappush, heapify
from typing import Union, Coroutine, List, Tuple
@dataclass
@ -30,25 +29,58 @@ class Task:
A simple wrapper around a coroutine object
"""
coroutine: typing.Coroutine
# The name of the task. Usually this equals self.coroutine.__name__,
# but in some cases it falls back to repr(self.coroutine)
name: str
pool: "giambio.context.TaskManager"
# The underlying coroutine object to wrap around a giambio task
coroutine: Coroutine
# The async pool that spawned this task. The one and only task that hasn't
# an associated pool is the main entry point which is not available externally
pool: Union["giambio.context.TaskManager", None] = None
# Whether the task has been cancelled or not. This is True both when the task is
# explicitly cancelled via its cancel() method or when it is cancelled as a result
# of an exception in another task in the same pool
cancelled: bool = False
# This attribute will be None unless the task raised an error
exc: BaseException = None
# The return value of the coroutine
result: object = None
# This attribute signals that the task has exited normally (returned)
finished: bool = False
# This attribute represents what the task is doing and is updated in real
# time by the event loop, internally. Possible values for this are "init"--
# when the task has been created but not started running yet--, "run"-- when
# the task is running synchronous code--, "io"-- when the task is waiting on
# an I/O resource--, "sleep"-- when the task is either asleep or waiting on an event
# and "crashed"-- when the task has exited because of an exception
status: str = "init"
# This attribute counts how many times the task's run() method has been called
steps: int = 0
# Simple optimization to improve the selector's efficiency. Check AsyncScheduler.register_sock
# inside giambio.core to know more about it
last_io: tuple = ()
# All the tasks waiting on this task's completion
joiners: list = field(default_factory=list)
# Whether this task has been waited for completion or not. The one and only task
# that will have this attribute set to False is the main program entry point, since
# the loop will implicitly wait for anything else to complete before returning
joined: bool = False
# Whether this task has a pending cancellation scheduled. Check AsyncScheduler.cancel
# inside giambio.core to know more about this attribute
cancel_pending: bool = False
# Absolute clock time that represents the date at which the task started sleeping,
# mainly used for internal purposes and debugging
sleep_start: float = 0.0
# The next deadline, in terms of the absolute clock of the loop, associated to the task
next_deadline: float = 0.0
def run(self, what: object = None):
"""
Simple abstraction layer over coroutines' ``send`` method
:param what: The object that has to be sent to the coroutine,
defaults to None
:type what: object, optional
"""
return self.coroutine.send(what)
@ -56,16 +88,22 @@ class Task:
def throw(self, err: Exception):
"""
Simple abstraction layer over coroutines ``throw`` method
:param err: The exception that has to be raised inside
the task
:type err: Exception
"""
return self.coroutine.throw(err)
async def join(self):
"""
Joins the task
Pauses the caller until the task has finished running.
Any return value is passed to the caller and exceptions
are propagated as well
"""
res = await join(self)
res = await giambio.traps.join(self)
if self.exc:
raise self.exc
return res
@ -75,12 +113,21 @@ class Task:
Cancels the task
"""
await cancel(self)
await giambio.traps.cancel(self)
def __hash__(self):
"""
Implements hash(self)
"""
return hash(self.coroutine)
def done(self):
"""
Returns True if the task is not running,
False otherwise
"""
return self.exc or self.finished or self.cancelled
@ -101,25 +148,29 @@ class Event:
async def trigger(self):
"""
Sets the event, waking up all tasks that called
pause() on us
pause() on it
"""
if self.set: # This is set by the event loop internally
raise GiambioError("The event has already been set")
await event_set(self)
raise giambio.exceptions.GiambioError("The event has already been set")
await giambio.traps.event_set(self)
async def wait(self):
"""
Waits until the event is set
"""
await event_wait(self)
await giambio.traps.event_wait(self)
class TimeQueue:
"""
An abstraction layer over a heap queue based on time. This is where
sleeping tasks will be put when they are not running
:param clock: The same monotonic clock that was passed to the thread-local event loop.
It is important for the queue to be synchronized with the loop as this allows
the sleeping mechanism to work reliably
"""
def __init__(self, clock):
@ -128,61 +179,129 @@ class TimeQueue:
"""
self.clock = clock
self.sequence = 0
# The sequence number handles the race condition
# of two tasks with identical deadlines acting
# of two tasks with identical deadlines, acting
# as a tie breaker
self.container = []
self.sequence = 0
self.container: List[Tuple[float, int, Task]] = []
def __contains__(self, item):
"""
Implements item in self. This method behaves
as if the queue only contained tasks and ignores
their timeouts and tiebreakers
"""
for i in self.container:
if i[2] == item:
return True
return False
def discard(self, item):
def index(self, item):
"""
Returns the index of the given item in the list
or -1 if it is not present
"""
for i in self.container:
if i[2] == item:
self.container.remove(i)
heapify(self.container)
return
return i
return -1
def discard(self, item):
"""
Discards an item from the queue and
calls heapify(self.container) to keep
the heap invariant if an element is removed.
This method does nothing if the item is not
in the queue, but note that in this case the
operation would still take O(n) iterations
to complete
:param item: The item to be discarded
"""
idx = self.index(item)
if idx != 1:
self.container.remove(idx)
heapify(self.container)
def get_closest_deadline(self) -> float:
"""
Returns the closest deadline that is meant to expire
or raises IndexError if the queue is empty
"""
if not self:
raise IndexError("TimeQueue is empty")
return self.container[0][0]
def __iter__(self):
"""
Implements iter(self)
"""
return self
def __next__(self):
"""
Implements next(self)
"""
try:
return self.get()
except IndexError:
raise StopIteration from None
def __getitem__(self, item):
"""
Implements self[n]
"""
return self.container.__getitem__(item)
def __bool__(self):
"""
Implements bool(self)
"""
return bool(self.container)
def __repr__(self):
"""
Implements repr(self) and str(self)
"""
return f"TimeQueue({self.container}, clock={self.clock})"
def put(self, item, amount: float):
def put(self, task: Task, amount: float):
"""
Pushes an item onto the queue with its unique
time amount and ID
Pushes a task onto the queue together with its
sleep amount
:param task: The task that is meant to sleep
:type task: :class: Task
:param amount: The amount of time, in seconds, that the
task should sleep for
:type amount: float
"""
heappush(self.container, (self.clock() + amount, self.sequence, item))
heappush(self.container, (self.clock() + amount, self.sequence, task))
self.sequence += 1
def get(self):
def get(self) -> Task:
"""
Gets the first task that is meant to run
:raises: IndexError if the queue is empty
"""
if not self.container:
raise IndexError("get from empty TimeQueue")
return heappop(self.container)[2]
class DeadlinesQueue(TimeQueue):
class DeadlinesQueue:
"""
An ordered queue for storing tasks deadlines
"""
@ -192,42 +311,122 @@ class DeadlinesQueue(TimeQueue):
Object constructor
"""
super().__init__(None)
self.pools = set()
self.container: List[Tuple[float, int, giambio.context.TaskManager]] = []
self.sequence = 0
def __contains__(self, item):
return super().__contains__(item)
"""
Implements item in self. This method behaves
as if the queue only contained tasks and ignores
their timeouts and tiebreakers
"""
for i in self.container:
if i[2] == item:
return True
return False
def index(self, item):
"""
Returns the index of the given item in the list
or -1 if it is not present
"""
for i in self.container:
if i[2] == item:
return i
return -1
def discard(self, item):
"""
Discards an item from the queue and
calls heapify(self.container) to keep
the heap invariant if an element is removed.
This method does nothing if the item is not
in the queue, but note that in this case the
operation would still take O(n) iterations
to complete
:param item: The item to be discarded
"""
idx = self.index(item)
if idx != 1:
self.container.remove(idx)
heapify(self.container)
def get_closest_deadline(self) -> float:
"""
Returns the closest deadline that is meant to expire
or raises IndexError if the queue is empty
"""
if not self:
raise IndexError("DeadlinesQueue is empty")
return self.container[0][0]
def __iter__(self):
return super().__iter__()
"""
Implements iter(self)
"""
return self
def __next__(self):
return super().__next__()
"""
Implements next(self)
"""
try:
return self.get()
except IndexError:
raise StopIteration from None
def __getitem__(self, item):
return super().__getitem__(item)
"""
Implements self[n]
"""
return self.container.__getitem__(item)
def __bool__(self):
return super().__bool__()
"""
Implements bool(self)
"""
return bool(self.container)
def __repr__(self):
"""
Implements repr(self) and str(self)
"""
return f"DeadlinesQueue({self.container})"
def put(self, amount: float, pool):
def put(self, pool: "giambio.context.TaskManager"):
"""
Pushes a deadline (timeout) onto the queue with its associated
pool
Pushes a pool with its deadline onto the queue. The
timeout amount will be inferred from the pool object
itself
:param pool: The pool object to store
"""
if pool not in self.pools:
self.pools.add(pool)
heappush(self.container, (amount, self.sequence, pool))
heappush(self.container, (pool.timeout, self.sequence, pool))
self.sequence += 1
def get(self):
def get(self) -> "giambio.context.TaskManager":
"""
Gets the first task that is meant to run
Gets the first pool that is meant to expire
:raises: IndexError if the queue is empty
"""
if not self.container:
raise IndexError("get from empty DeadlinesQueue")
d = heappop(self.container)
self.pools.discard(d[2])
return d
return d[2]

View File

@ -101,7 +101,7 @@ async def want_read(stream):
:param stream: The resource that needs to be read
"""
await create_trap("read_or_write", stream, "read")
await create_trap("register_sock", stream, "read")
async def want_write(stream):
@ -112,7 +112,7 @@ async def want_write(stream):
:param stream: The resource that needs to be written
"""
await create_trap("read_or_write", stream, "write")
await create_trap("register_sock", stream, "write")
async def event_set(event):

View File

@ -25,4 +25,4 @@ async def main():
if __name__ == "__main__":
giambio.run(main)
giambio.run(main, debugger=Debugger())

View File

@ -8,36 +8,52 @@ import sys
# A test to check for asynchronous I/O
async def serve(address: tuple):
async def serve(bind_address: tuple):
"""
Serves asynchronously forever
:param bind_address: The address to bind the server to represented as a tuple
(address, port) where address is a string and port is an integer
"""
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.bind(address)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.bind(bind_address)
sock.listen(5)
async_sock = giambio.wrap_socket(sock) # We make the socket an async socket
logging.info(f"Serving asynchronously at {address[0]}:{address[1]}")
logging.info(f"Serving asynchronously at {bind_address[0]}:{bind_address[1]}")
async with giambio.create_pool() as pool:
conn, address_tuple = await async_sock.accept()
logging.info(f"{address_tuple[0]}:{address_tuple[1]} connected")
pool.spawn(handler, conn, address_tuple)
while True:
conn, address_tuple = await async_sock.accept()
logging.info(f"{address_tuple[0]}:{address_tuple[1]} connected")
pool.spawn(handler, conn, address_tuple)
async def handler(sock: AsyncSocket, addr: tuple):
address = f"{addr[0]}:{addr[1]}"
async with sock:
async def handler(sock: AsyncSocket, client_address: tuple):
"""
Handles a single client connection
:param sock: The giambio.socket.AsyncSocket object connected
to the client
:type sock: :class: giambio.socket.AsyncSocket
:param client_address: The client's address represented as a tuple
(address, port) where address is a string and port is an integer
:type client_address: tuple
"""
address = f"{client_address[0]}:{client_address[1]}"
async with sock: # Closes the socket automatically
await sock.send_all(b"Welcome to the server pal, feel free to send me something!\n")
while True:
await sock.send_all(b"-> ")
data = await sock.receive(1024)
if not data:
break
elif data == b"raise\n":
elif data == b"exit\n":
await sock.send_all(b"I'm dead dude\n")
raise TypeError("Oh, no, I'm gonna die!")
to_send_back = data
data = data.decode("utf-8").encode("unicode_escape")
logging.info(f"Got: '{data.decode('utf-8')}' from {address}")
await sock.send_all(b"Got: " + to_send_back)
logging.info(f"Echoed back '{data.decode('utf-8')}' to {address}")
logging.info(f"Got: {data!r} from {address}")
await sock.send_all(b"Got: " + data)
logging.info(f"Echoed back {data!r} to {address}")
logging.info(f"Connection from {address} closed")
@ -45,7 +61,7 @@ if __name__ == "__main__":
port = int(sys.argv[1]) if len(sys.argv) > 1 else 1500
logging.basicConfig(level=20, format="[%(levelname)s] %(asctime)s %(message)s", datefmt="%d/%m/%Y %p")
try:
giambio.run(serve, ("localhost", port), debugger=Debugger())
giambio.run(serve, ("localhost", port), debugger=None)
except (Exception, KeyboardInterrupt) as error: # Exceptions propagate!
if isinstance(error, KeyboardInterrupt):
logging.info("Ctrl+C detected, exiting")

33
tests/timeout.py Normal file
View File

@ -0,0 +1,33 @@
import giambio
from debugger import Debugger
async def child():
print("[child] Child spawned!! Sleeping for 5 seconds")
await giambio.sleep(5)
print("[child] Had a nice nap!")
async def child1():
print("[child 1] Child spawned!! Sleeping for 10 seconds")
await giambio.sleep(10)
print("[child 1] Had a nice nap!")
async def main():
start = giambio.clock()
try:
async with giambio.with_timeout(6) as pool:
# TODO: We need to consider the inner part of
# the with block as an implicit task, otherwise
# timeouts and cancellations won't work properly!
pool.spawn(child) # This will complete
pool.spawn(child1) # This will not
print("[main] Children spawned, awaiting completion")
except giambio.exceptions.TooSlowError:
print("[main] One or more children have timed out!")
print(f"[main] Children execution complete in {giambio.clock() - start:.2f} seconds")
if __name__ == "__main__":
giambio.run(main, debugger=())