diff --git a/giambio/__init__.py b/giambio/__init__.py index d04344f..062e70e 100644 --- a/giambio/__init__.py +++ b/giambio/__init__.py @@ -1,5 +1,5 @@ """ -Asynchronous Python made easy (and friendly) +Giambio: Asynchronous Python made easy (and friendly) Copyright (C) 2020 nocturn9x @@ -16,19 +16,22 @@ See the License for the specific language governing permissions and limitations under the License. """ -__author__ = "Nocturn9x aka Isgiambyy" -__version__ = (1, 0, 0) +__author__ = "Nocturn9x" +__version__ = (0, 0, 1) -from . import exceptions, socket +from . import exceptions, socket, context, core from .socket import wrap_socket from .traps import sleep, current_task from .objects import Event from .run import run, clock, create_pool, get_event_loop, new_event_loop, with_timeout from .util import debug + __all__ = [ "exceptions", + "core", + "context", "sleep", "Event", "run", diff --git a/giambio/context.py b/giambio/context.py index dd195d0..d696e2f 100644 --- a/giambio/context.py +++ b/giambio/context.py @@ -1,5 +1,5 @@ """ -Higher-level context managers for async pools +Higher-level context manager(s) for async pools Copyright (C) 2020 nocturn9x @@ -16,38 +16,50 @@ See the License for the specific language governing permissions and limitations under the License. """ - +import giambio import types -from giambio.objects import Task -from giambio.core import AsyncScheduler +from typing import List class TaskManager: """ An asynchronous context manager for giambio + + :param loop: The event loop bound to this pool. Most of the times + it's the return value from giambio.get_event_loop() + :type loop: :class: AsyncScheduler + :param timeout: The pool's timeout length in seconds, if any, defaults to None + :type timeout: float, optional """ - def __init__(self, loop: AsyncScheduler, timeout: float = None) -> None: + def __init__(self, loop: "giambio.core.AsyncScheduler", timeout: float = None) -> None: """ Object constructor """ - self.loop = loop - self.tasks = [] # We store a reference to all tasks in this pool, even the paused ones! - self.cancelled = False - self.started = self.loop.clock() + # The event loop associated with this pool + self.loop: "giambio.core.AsyncScheduler" = loop + # All the tasks that belong to this pool + self.tasks: List[giambio.objects.Task] = [] + # Whether we have been cancelled or not + self.cancelled: bool = False + # The clock time of when we started running, used for + # timeouts expiration + self.started: float = self.loop.clock() + # The pool's timeout (in seconds) if timeout: - self.timeout = self.started + timeout + self.timeout: float = self.started + timeout else: - self.timeout = None - self.timed_out = False + self.timeout: None = None + # Whether our timeout expired or not + self.timed_out: bool = False - def spawn(self, func: types.FunctionType, *args): + def spawn(self, func: types.FunctionType, *args) -> "giambio.objects.Task": """ Spawns a child task """ - task = Task(func(*args), func.__name__ or str(func), self) + task = giambio.objects.Task(func.__name__ or str(func), func(*args), self) task.joiners = [self.loop.current_task] task.next_deadline = self.timeout or 0.0 self.loop.tasks.append(task) @@ -55,13 +67,13 @@ class TaskManager: self.tasks.append(task) return task - def spawn_after(self, func: types.FunctionType, n: int, *args): + def spawn_after(self, func: types.FunctionType, n: int, *args) -> "giambio.objects.Task": """ Schedules a task for execution after n seconds """ assert n >= 0, "The time delay can't be negative" - task = Task(func(*args), func.__name__ or str(func), self) + task = giambio.objects.Task(func.__name__ or str(func), func(*args), self) task.joiners = [self.loop.current_task] task.next_deadline = self.timeout or 0.0 task.sleep_start = self.loop.clock() @@ -71,23 +83,39 @@ class TaskManager: return task async def __aenter__(self): + """ + Implements the asynchronous context manager interface, + marking the pool as started and returning itself + """ + return self async def __aexit__(self, exc_type: Exception, exc: Exception, tb): + """ + Implements the asynchronous context manager interface, joining + all the tasks spawned inside the pool + """ + for task in self.tasks: - # This forces Python to stop at the + # This forces the interpreter to stop at the # end of the block and wait for all # children to exit await task.join() async def cancel(self): """ - Cancels the whole block + Cancels the pool entirely, iterating over all + the pool's tasks and cancelling them """ # TODO: This breaks, somehow, investigation needed for task in self.tasks: await task.cancel() - def done(self): + def done(self) -> bool: + """ + Returns True if all the tasks inside the + pool have exited, False otherwise + """ + return all([task.done() for task in self.tasks]) diff --git a/giambio/core.py b/giambio/core.py index f7fe30d..b1f23db 100644 --- a/giambio/core.py +++ b/giambio/core.py @@ -19,11 +19,13 @@ limitations under the License. # Import libraries and internal resources import types import socket -from timeit import default_timer -from giambio.objects import Task, TimeQueue, DeadlinesQueue -from giambio.traps import want_read, want_write -from giambio.util.debug import BaseDebugger from itertools import chain +from timeit import default_timer +from giambio.context import TaskManager +from typing import List, Optional, Set, Any +from giambio.util.debug import BaseDebugger +from giambio.traps import want_read, want_write +from giambio.objects import Task, TimeQueue, DeadlinesQueue, Event from selectors import DefaultSelector, EVENT_READ, EVENT_WRITE from giambio.exceptions import (InternalError, CancelledError, @@ -34,7 +36,7 @@ from giambio.exceptions import (InternalError, # TODO: Take into account SSLWantReadError and SSLWantWriteError IOInterrupt = (BlockingIOError, InterruptedError) -# TODO: Right now this value is pretty much arbitrary, we need some euristic testing to choose a sensible default +# TODO: Right now this value is pretty much arbitrary, we need some testing to choose a sensible default IO_SKIP_LIMIT = 5 @@ -46,6 +48,13 @@ class AsyncScheduler: with its calculations. An attempt to fix the threaded model has been made without making the API unnecessarily complicated. A few examples are tasks cancellation and exception propagation. + + :param clock: A callable returning monotonically increasing values at each call, + defaults to timeit.default_timer + :type clock: :class: types.FunctionType + :param debugger: A subclass of giambio.util.BaseDebugger or None if no debugging output + is desired, defaults to None + :type debugger: :class: giambio.util.BaseDebugger """ def __init__(self, clock: types.FunctionType = default_timer, debugger: BaseDebugger = None): @@ -54,37 +63,38 @@ class AsyncScheduler: """ # The debugger object. If it is none we create a dummy object that immediately returns an empty - # lambda every time you access any of its attributes to avoid lots of if self.debugger clauses + # lambda which in turn returns None every time we access any of its attributes to avoid lots of + # if self.debugger clauses if debugger: assert issubclass(type(debugger), BaseDebugger), "The debugger must be a subclass of giambio.util.BaseDebugger" self.debugger = debugger or type("DumbDebugger", (object, ), {"__getattr__": lambda *args: lambda *arg: None})() # Tasks that are ready to run - self.tasks = [] + self.tasks: List[Task] = [] # Selector object to perform I/O multiplexing - self.selector = DefaultSelector() + self.selector: DefaultSelector = DefaultSelector() # This will always point to the currently running coroutine (Task object) - self.current_task = None + self.current_task: Optional[Task] = None # Monotonic clock to keep track of elapsed time reliably - self.clock = clock + self.clock: types.FunctionType = clock # Tasks that are asleep - self.paused = TimeQueue(self.clock) + self.paused: TimeQueue = TimeQueue(self.clock) # All active Event objects - self.events = set() + self.events: Set[Event] = set() # Data to send back to a trap - self.to_send = None + self.to_send: Optional[Any] = None # Have we ever ran? - self.has_ran = False + self.has_ran: bool = False # The current pool - self.current_pool = None + self.current_pool: Optional[TaskManager] = None # How many times we skipped I/O checks to let a task run. # We limit the number of times we skip such checks to avoid # I/O starvation in highly concurrent systems - self.io_skip = 0 + self.io_skip: int = 0 # A heap queue of deadlines to be checked - self.deadlines = DeadlinesQueue() + self.deadlines: DeadlinesQueue = DeadlinesQueue() - def done(self): + def done(self) -> bool: """ Returns True if there is no work to do """ @@ -136,7 +146,13 @@ class AsyncScheduler: while self.tasks: # Sets the currently running task self.current_task = self.tasks.pop(0) - # Sets the current pool (for nested pools) + if self.current_task.done(): + # Since ensure_discard only checks for paused tasks, + # we still need to make sure we don't try to execute + # exited tasks that are on the running queue + continue + # Sets the current pool: we need this to take nested pools + # into account and behave accordingly self.current_pool = self.current_task.pool if self.current_pool and self.current_pool.timeout and not self.current_pool.timed_out: # Stores deadlines for tasks (deadlines are pool-specific). @@ -144,7 +160,7 @@ class AsyncScheduler: # a deadline for the same pool twice. This makes the timeouts # model less flexible, because one can't change the timeout # after it is set, but it makes the implementation easier. - self.deadlines.put(self.current_pool.timeout, self.current_pool) + self.deadlines.put(self.current_pool) self.debugger.before_task_step(self.current_task) if self.current_task.cancel_pending: # We perform the deferred cancellation @@ -191,24 +207,26 @@ class AsyncScheduler: self.join(self.current_task) except BaseException as err: # TODO: We might want to do a bit more complex traceback hacking to remove any extra - # frames from the exception call stack, but for now removing at least the first one - # seems a sensible approach (it's us catching it so we don't care about that) + # frames from the exception call stack, but for now removing at least the first one + # seems a sensible approach (it's us catching it so we don't care about that) self.current_task.exc = err self.current_task.exc.__traceback__ = self.current_task.exc.__traceback__.tb_next self.current_task.status = "crashed" self.debugger.on_exception_raised(self.current_task, err) self.join(self.current_task) + self.ensure_discard(self.current_task) def do_cancel(self, task: Task): """ Performs task cancellation by throwing CancelledError inside the given - task in order to stop it from running. The loop continues to execute - as tasks are independent + task in order to stop it from running + + :param task: The task to cancel + :type task: :class: Task """ - if not task.cancelled and not task.exc: - self.debugger.before_cancel(task) - task.throw(CancelledError()) + self.debugger.before_cancel(task) + task.throw(CancelledError()) def get_current(self): """ @@ -224,15 +242,20 @@ class AsyncScheduler: inside the correct pool if its timeout expired """ - while self.deadlines and self.deadlines[0][0] <= self.clock(): - _, __, pool = self.deadlines.get() + while self.deadlines and self.deadlines.get_closest_deadline() <= self.clock(): + pool = self.deadlines.get() pool.timed_out = True - self.cancel_all_from_current_pool(pool) + self.cancel_pool(pool) + # Since we already know that exceptions will behave correctly + # (heck, half of the code in here only serves that purpose) + # all we do here is just raise an exception as if the current + # task raised it and let our machinery deal with the rest raise TooSlowError() def check_events(self): """ - Checks for ready or expired events and triggers them + Checks for ready/expired events and triggers them by + rescheduling all the tasks that called wait() on them """ for event in self.events.copy(): @@ -252,23 +275,24 @@ class AsyncScheduler: has elapsed """ - while self.paused and self.paused[0][0] <= self.clock(): + while self.paused and self.paused.get_closest_deadline() <= self.clock(): # Reschedules tasks when their deadline has elapsed task = self.paused.get() - if not task.done(): - slept = self.clock() - task.sleep_start - task.sleep_start = 0.0 - self.tasks.append(task) - self.debugger.after_sleep(task, slept) + slept = self.clock() - task.sleep_start + task.sleep_start = 0.0 + self.tasks.append(task) + self.debugger.after_sleep(task, slept) def check_io(self): """ - Checks for I/O and implements the sleeping mechanism + Checks for I/O and implements part of the sleeping mechanism for the event loop """ before_time = self.clock() # Used for the debugger if self.tasks or self.events: + # If there is work to do immediately we prefer to + # do that first unless some conditions are met, see below self.io_skip += 1 if self.io_skip == IO_SKIP_LIMIT: # We can't skip every time there's some task ready @@ -284,18 +308,19 @@ class AsyncScheduler: # If there are asleep tasks or deadlines, wait until the closest date if not self.deadlines: # If there are no deadlines just wait until the first task wakeup - timeout = min([max(0.0, self.paused[0][0] - self.clock())]) + timeout = min([max(0.0, self.paused.get_closest_deadline() - self.clock())]) elif not self.paused: # If there are no sleeping tasks just wait until the first deadline - timeout = min([max(0.0, self.deadlines[0][0] - self.clock())]) + timeout = min([max(0.0, self.deadlines.get_closest_deadline() - self.clock())]) else: - # If there are both deadlines AND sleeping tasks scheduled we calculate + # If there are both deadlines AND sleeping tasks scheduled, we calculate # the absolute closest deadline among the two sets and use that as a timeout clock = self.clock() - timeout = min([max(0.0, self.paused[0][0] - clock), self.deadlines[0][0] - clock]) + timeout = min([max(0.0, self.paused.get_closest_deadline() - clock), + self.deadlines.get_closest_deadline() - clock]) else: # If there is *only* I/O, we wait a fixed amount of time - timeout = 86400 # Thanks trio :D + timeout = 86400 # Stolen from trio :D self.debugger.before_io(timeout) io_ready = self.selector.select(timeout) # Get sockets that are ready and schedule their tasks @@ -308,7 +333,7 @@ class AsyncScheduler: Starts the event loop from a sync context """ - entry = Task(func(*args), func.__name__ or str(func), None) + entry = Task(func.__name__ or str(func), func(*args), None) self.tasks.append(entry) self.debugger.on_start() self.run() @@ -317,27 +342,14 @@ class AsyncScheduler: if entry.exc: raise entry.exc - def reschedule_joiners(self, task: Task): + def cancel_pool(self, pool: TaskManager): """ - Reschedules the parent(s) of the - given task, if any + Cancels all tasks in the given pool + + :param pool: The pool to be cancelled + :type pool: :class: TaskManager """ - for t in task.joiners: - if t not in self.tasks: - # Since a task can be the parent - # of multiple children, we need to - # make sure we reschedule it only - # once, otherwise a RuntimeError will - # occur - self.tasks.append(t) - - def cancel_all_from_current_pool(self, pool=None): - """ - Cancels all tasks in the current pool (or the given one) - """ - - pool = pool or self.current_pool if pool: for to_cancel in pool.tasks: self.cancel(to_cancel) @@ -361,24 +373,24 @@ class AsyncScheduler: def get_asleep_tasks(self): """ - Yields all tasks currently sleeping + Yields all tasks that are currently sleeping """ for asleep in self.paused.container: - yield asleep[2] + yield asleep[2] # Deadline, tiebreaker, task - def get_io_tasks(self) -> set: + def get_io_tasks(self): """ - Yields all tasks waiting on I/O resources + Yields all tasks currently waiting on I/O resources """ for k in self.selector.get_map().values(): yield k.data - def get_all_tasks(self) -> set: + def get_all_tasks(self): """ Returns a generator yielding all tasks which the loop is currently - keeping track of. This includes both running and paused tasks. + keeping track of: this includes both running and paused tasks. A paused task is a task which is either waiting on an I/O resource, sleeping, or waiting on an event to be triggered """ @@ -390,25 +402,29 @@ class AsyncScheduler: def ensure_discard(self, task: Task): """ - This method ensures that tasks that need to be cancelled are not - rescheduled further. This will act upon paused tasks only + Ensures that tasks that need to be cancelled are not + rescheduled further. This method exists because tasks might + be cancelled in a context where it's not obvious which Task + object must be discarded and not rescheduled at the next iteration """ + # TODO: Do we need else ifs or ifs? The question arises because + # tasks might be doing I/O and other stuff too even if not at the same time if task in self.paused: self.paused.discard(task) - elif self.selector.get_map(): - for key in self.selector.get_map().values(): + if self.selector.get_map(): + for key in dict(self.selector.get_map()).values(): if key.data == task: - self.selector.unregister(task) - elif task in self.get_event_tasks(): + self.selector.unregister(key.fileobj) + if task in self.get_event_tasks(): for evt in self.events: if task in evt.waiters: evt.waiters.remove(task) - def cancel_all(self): + def cancel_all(self) -> bool: """ - Cancels ALL tasks, this method is called as a result - of self.close() + Cancels ALL tasks as returned by self.get_all_tasks() and returns + whether all tasks exited or not """ for to_cancel in self.get_all_tasks(): @@ -419,7 +435,7 @@ class AsyncScheduler: """ Closes the event loop, terminating all tasks inside it and tearing down any extra machinery. - If ensure_done equals False, the loop will cancel *ALL* + If ensure_done equals False, the loop will cancel ALL running and scheduled tasks and then tear itself down. If ensure_done equals True, which is the default behavior, this method will raise a GiambioError if the loop hasn't @@ -432,6 +448,21 @@ class AsyncScheduler: raise GiambioError("event loop not terminated, call this method with ensure_done=False to forcefully exit") self.shutdown() + def reschedule_joiners(self, task: Task): + """ + Reschedules the parent(s) of the + given task, if any + """ + + for t in task.joiners: + if t not in self.tasks: + # Since a task can be the parent + # of multiple children, we need to + # make sure we reschedule it only + # once, otherwise a RuntimeError will + # occur + self.tasks.append(t) + def join(self, task: Task): """ Joins a task to its callers (implicitly, the parent @@ -442,28 +473,36 @@ class AsyncScheduler: task.joined = True if task.finished or task.cancelled: if self.current_pool and self.current_pool.done() or not self.current_pool: + # If the current pool has finished executing or we're at the first parent + # task that kicked the loop, we can safely reschedule the parent(s) self.reschedule_joiners(task) elif task.exc: - if self.cancel_all_from_current_pool(): - # This will reschedule the parent - # only if all the tasks inside it - # have finished executing, either + if self.cancel_pool(self.current_pool): + # This will reschedule the parent(s) + # only if all the tasks inside the current + # pool have finished executing, either # by cancellation, an exception # or just returned self.reschedule_joiners(task) def sleep(self, seconds: int or float): """ - Puts the caller to sleep for a given amount of seconds + Puts the current task to sleep for a given amount of seconds """ self.debugger.before_sleep(self.current_task, seconds) - if seconds: # if seconds == 0, this acts as a switch! + if seconds: self.current_task.status = "sleep" self.current_task.sleep_start = self.clock() self.paused.put(self.current_task, seconds) self.current_task.next_deadline = self.current_task.sleep_start + seconds else: + # When we're called with a timeout of 0 (the type checking is done + # way before this point) this method acts as a checkpoint that allows + # giambio to kick in and to its job without pausing the task's execution + # for too long. It is recommended to put a couple of checkpoints like these + # in your code if you see degraded concurrent performance in parts of your code + # that block the loop self.tasks.append(self.current_task) def cancel(self, task: Task): @@ -492,8 +531,7 @@ class AsyncScheduler: # if for the next execution step of the task. Giambio will also make sure # to re-raise cancellations at every checkpoint until the task lets the # exception propagate into us, because we *really* want the task to be - # cancelled, and since asking kindly didn't work we have to use some - # force :) + # cancelled task.status = "cancelled" task.cancelled = True task.cancel_pending = False @@ -504,11 +542,19 @@ class AsyncScheduler: # defer this operation for later (check run() for more info) task.cancel_pending = True # Cancellation is deferred - def event_set(self, event): + def event_set(self, event: Event): """ Sets an event + + :param event: The event object to trigger + :type event: :class: Event """ + # When an event is set, we store the event object + # for later, set its attribute and reschedule the + # task that called this method. All tasks waiting + # on this event object will be waken up on the next + # iteration self.events.add(event) event.set = True self.tasks.append(self.current_task) @@ -516,39 +562,70 @@ class AsyncScheduler: def event_wait(self, event): """ Pauses the current task on an event + + :param event: The event object to pause upon + :type event: :class: Event """ event.waiters.append(self.current_task) # Since we don't reschedule the task, it will # not execute until check_events is called - # TODO: More generic I/O rather than just sockets (threads) - def read_or_write(self, sock: socket.socket, evt_type: str): + def register_sock(self, sock: socket.socket, evt_type: str): """ Registers the given socket inside the selector to perform I/0 multiplexing + + :param sock: The socket on which a read or write operation + has to be performed + :type sock: socket.socket + :param evt_type: The type of event to perform on the given + socket, either "read" or "write" + :type evt_type: str """ self.current_task.status = "io" - if self.current_task.last_io: - if self.current_task.last_io == (evt_type, sock): - # Socket is already scheduled! - return - # TODO: Inspect why modify() causes issues - self.selector.unregister(sock) - self.current_task.last_io = evt_type, sock evt = EVENT_READ if evt_type == "read" else EVENT_WRITE - try: - self.selector.register(sock, evt, self.current_task) - except KeyError: - # The socket is already registered doing something else - raise ResourceBusy("The given resource is busy!") from None + if self.current_task.last_io: + # Since most of the times tasks will perform multiple + # I/O operations on a given socket, unregistering them + # every time isn't a sensible approach. A quick and + # easy optimization to address this problem is to + # store the last I/O operation that the task performed + # together with the resource itself, inside the task + # object. If the task wants to perform the same + # operation on the same socket again, then this method + # returns immediately as the socket is already being + # watched by the selector. If the resource is the same, + # but the event has changed, then we modify the resource's + # associated event. Only if the resource is different from + # the last used one this method will register a new socket + if self.current_task.last_io == (evt_type, sock): + # Socket is already listening for that event! + return + elif self.current_task.last_io[1] == sock: + # If the event to listen for has changed we just modify it + self.selector.modify(sock, evt, self.current_task) + self.current_task.last_io = (evt_type, sock) + else: + # Otherwise we register the new socket in our selector + self.current_task.last_io = evt_type, sock + try: + self.selector.register(sock, evt, self.current_task) + except KeyError: + # The socket is already registered doing something else + raise ResourceBusy("The given socket is being read/written by another task") from None # noinspection PyMethodMayBeStatic async def read_sock(self, sock: socket.socket, buffer: int): """ Reads from a socket asynchronously, waiting until the resource is available and returning up to buffer bytes from the socket + + :param sock: The socket that must be read + :type sock: socket.socket + :param buffer: The maximum amount of bytes that will be read + :type buffer: int """ await want_read(sock) @@ -559,24 +636,23 @@ class AsyncScheduler: """ Accepts a socket connection asynchronously, waiting until the resource is available and returning the result of the sock.accept() call + + :param sock: The socket that must be accepted + :type sock: socket.socket """ - # TODO: Is this ok? - # This does not feel right because the loop will only - # exit when the socket has been accepted, preventing other - # stuff from running - while True: - try: - return sock.accept() - except IOInterrupt: # Do we need this exception thingy everywhere? - # Some methods have never errored out, but this did and doing - # so seemed to fix the issue, needs investigation - await want_read(sock) + await want_read(sock) + return sock.accept() # noinspection PyMethodMayBeStatic async def sock_sendall(self, sock: socket.socket, data: bytes): """ - Sends all the passed bytes trough a socket asynchronously + Sends all the passed bytes trough the given socket, asynchronously + + :param sock: The socket that must be written + :type sock: socket.socket + :param data: The bytes to send across the socket + :type data: bytes """ while data: @@ -584,10 +660,12 @@ class AsyncScheduler: sent_no = sock.send(data) data = data[sent_no:] - # TODO: This method seems to cause issues async def close_sock(self, sock: socket.socket): """ - Closes a socket asynchronously + Closes the given socket asynchronously + + :param sock: The socket that must be closed + :type sock: socket.socket """ await want_write(sock) @@ -596,10 +674,17 @@ class AsyncScheduler: self.current_task.last_io = () # noinspection PyMethodMayBeStatic - async def connect_sock(self, sock: socket.socket, addr: tuple): + async def connect_sock(self, sock: socket.socket, address_tuple: tuple): """ - Connects a socket asynchronously + Connects a socket asynchronously to a given endpoint + + :param sock: The socket that must to be connected + :type sock: socket.socket + :param address_tuple: A tuple in the same form as the one + passed to socket.socket.connect with an address as a string + and a port as an integer + :type address_tuple: tuple """ await want_write(sock) - return sock.connect(addr) + return sock.connect(address_tuple) diff --git a/giambio/objects.py b/giambio/objects.py index 2550720..23c8938 100644 --- a/giambio/objects.py +++ b/giambio/objects.py @@ -1,5 +1,5 @@ """ -Various object wrappers and abstraction layers +Various object wrappers and abstraction layers for internal use Copyright (C) 2020 nocturn9x @@ -16,11 +16,10 @@ See the License for the specific language governing permissions and limitations under the License. """ -from giambio.traps import join, cancel, event_set, event_wait -from heapq import heappop, heappush, heapify -from giambio.exceptions import GiambioError +import giambio from dataclasses import dataclass, field -import typing +from heapq import heappop, heappush, heapify +from typing import Union, Coroutine, List, Tuple @dataclass @@ -30,25 +29,58 @@ class Task: A simple wrapper around a coroutine object """ - coroutine: typing.Coroutine + # The name of the task. Usually this equals self.coroutine.__name__, + # but in some cases it falls back to repr(self.coroutine) name: str - pool: "giambio.context.TaskManager" + # The underlying coroutine object to wrap around a giambio task + coroutine: Coroutine + # The async pool that spawned this task. The one and only task that hasn't + # an associated pool is the main entry point which is not available externally + pool: Union["giambio.context.TaskManager", None] = None + # Whether the task has been cancelled or not. This is True both when the task is + # explicitly cancelled via its cancel() method or when it is cancelled as a result + # of an exception in another task in the same pool cancelled: bool = False + # This attribute will be None unless the task raised an error exc: BaseException = None + # The return value of the coroutine result: object = None + # This attribute signals that the task has exited normally (returned) finished: bool = False + # This attribute represents what the task is doing and is updated in real + # time by the event loop, internally. Possible values for this are "init"-- + # when the task has been created but not started running yet--, "run"-- when + # the task is running synchronous code--, "io"-- when the task is waiting on + # an I/O resource--, "sleep"-- when the task is either asleep or waiting on an event + # and "crashed"-- when the task has exited because of an exception status: str = "init" + # This attribute counts how many times the task's run() method has been called steps: int = 0 + # Simple optimization to improve the selector's efficiency. Check AsyncScheduler.register_sock + # inside giambio.core to know more about it last_io: tuple = () + # All the tasks waiting on this task's completion joiners: list = field(default_factory=list) + # Whether this task has been waited for completion or not. The one and only task + # that will have this attribute set to False is the main program entry point, since + # the loop will implicitly wait for anything else to complete before returning joined: bool = False + # Whether this task has a pending cancellation scheduled. Check AsyncScheduler.cancel + # inside giambio.core to know more about this attribute cancel_pending: bool = False + # Absolute clock time that represents the date at which the task started sleeping, + # mainly used for internal purposes and debugging sleep_start: float = 0.0 + # The next deadline, in terms of the absolute clock of the loop, associated to the task next_deadline: float = 0.0 def run(self, what: object = None): """ Simple abstraction layer over coroutines' ``send`` method + + :param what: The object that has to be sent to the coroutine, + defaults to None + :type what: object, optional """ return self.coroutine.send(what) @@ -56,16 +88,22 @@ class Task: def throw(self, err: Exception): """ Simple abstraction layer over coroutines ``throw`` method + + :param err: The exception that has to be raised inside + the task + :type err: Exception """ return self.coroutine.throw(err) async def join(self): """ - Joins the task + Pauses the caller until the task has finished running. + Any return value is passed to the caller and exceptions + are propagated as well """ - res = await join(self) + res = await giambio.traps.join(self) if self.exc: raise self.exc return res @@ -75,12 +113,21 @@ class Task: Cancels the task """ - await cancel(self) + await giambio.traps.cancel(self) def __hash__(self): + """ + Implements hash(self) + """ + return hash(self.coroutine) def done(self): + """ + Returns True if the task is not running, + False otherwise + """ + return self.exc or self.finished or self.cancelled @@ -101,25 +148,29 @@ class Event: async def trigger(self): """ Sets the event, waking up all tasks that called - pause() on us + pause() on it """ if self.set: # This is set by the event loop internally - raise GiambioError("The event has already been set") - await event_set(self) + raise giambio.exceptions.GiambioError("The event has already been set") + await giambio.traps.event_set(self) async def wait(self): """ Waits until the event is set """ - await event_wait(self) + await giambio.traps.event_wait(self) class TimeQueue: """ An abstraction layer over a heap queue based on time. This is where sleeping tasks will be put when they are not running + + :param clock: The same monotonic clock that was passed to the thread-local event loop. + It is important for the queue to be synchronized with the loop as this allows + the sleeping mechanism to work reliably """ def __init__(self, clock): @@ -128,61 +179,129 @@ class TimeQueue: """ self.clock = clock - self.sequence = 0 # The sequence number handles the race condition - # of two tasks with identical deadlines acting + # of two tasks with identical deadlines, acting # as a tie breaker - self.container = [] + self.sequence = 0 + self.container: List[Tuple[float, int, Task]] = [] def __contains__(self, item): + """ + Implements item in self. This method behaves + as if the queue only contained tasks and ignores + their timeouts and tiebreakers + """ + for i in self.container: if i[2] == item: return True return False - def discard(self, item): + def index(self, item): + """ + Returns the index of the given item in the list + or -1 if it is not present + """ + for i in self.container: if i[2] == item: - self.container.remove(i) - heapify(self.container) - return + return i + return -1 + + def discard(self, item): + """ + Discards an item from the queue and + calls heapify(self.container) to keep + the heap invariant if an element is removed. + This method does nothing if the item is not + in the queue, but note that in this case the + operation would still take O(n) iterations + to complete + + :param item: The item to be discarded + """ + + idx = self.index(item) + if idx != 1: + self.container.remove(idx) + heapify(self.container) + + def get_closest_deadline(self) -> float: + """ + Returns the closest deadline that is meant to expire + or raises IndexError if the queue is empty + """ + + if not self: + raise IndexError("TimeQueue is empty") + return self.container[0][0] def __iter__(self): + """ + Implements iter(self) + """ + return self def __next__(self): + """ + Implements next(self) + """ + try: return self.get() except IndexError: raise StopIteration from None def __getitem__(self, item): + """ + Implements self[n] + """ + return self.container.__getitem__(item) def __bool__(self): + """ + Implements bool(self) + """ + return bool(self.container) def __repr__(self): + """ + Implements repr(self) and str(self) + """ + return f"TimeQueue({self.container}, clock={self.clock})" - def put(self, item, amount: float): + def put(self, task: Task, amount: float): """ - Pushes an item onto the queue with its unique - time amount and ID + Pushes a task onto the queue together with its + sleep amount + + :param task: The task that is meant to sleep + :type task: :class: Task + :param amount: The amount of time, in seconds, that the + task should sleep for + :type amount: float """ - heappush(self.container, (self.clock() + amount, self.sequence, item)) + heappush(self.container, (self.clock() + amount, self.sequence, task)) self.sequence += 1 - def get(self): + def get(self) -> Task: """ Gets the first task that is meant to run + + :raises: IndexError if the queue is empty """ + if not self.container: + raise IndexError("get from empty TimeQueue") return heappop(self.container)[2] -class DeadlinesQueue(TimeQueue): +class DeadlinesQueue: """ An ordered queue for storing tasks deadlines """ @@ -192,42 +311,122 @@ class DeadlinesQueue(TimeQueue): Object constructor """ - super().__init__(None) self.pools = set() + self.container: List[Tuple[float, int, giambio.context.TaskManager]] = [] + self.sequence = 0 def __contains__(self, item): - return super().__contains__(item) + """ + Implements item in self. This method behaves + as if the queue only contained tasks and ignores + their timeouts and tiebreakers + """ + + for i in self.container: + if i[2] == item: + return True + return False + + def index(self, item): + """ + Returns the index of the given item in the list + or -1 if it is not present + """ + + for i in self.container: + if i[2] == item: + return i + return -1 + + def discard(self, item): + """ + Discards an item from the queue and + calls heapify(self.container) to keep + the heap invariant if an element is removed. + This method does nothing if the item is not + in the queue, but note that in this case the + operation would still take O(n) iterations + to complete + + :param item: The item to be discarded + """ + + idx = self.index(item) + if idx != 1: + self.container.remove(idx) + heapify(self.container) + + def get_closest_deadline(self) -> float: + """ + Returns the closest deadline that is meant to expire + or raises IndexError if the queue is empty + """ + + if not self: + raise IndexError("DeadlinesQueue is empty") + return self.container[0][0] def __iter__(self): - return super().__iter__() + """ + Implements iter(self) + """ + + return self def __next__(self): - return super().__next__() + """ + Implements next(self) + """ + + try: + return self.get() + except IndexError: + raise StopIteration from None def __getitem__(self, item): - return super().__getitem__(item) + """ + Implements self[n] + """ + + return self.container.__getitem__(item) def __bool__(self): - return super().__bool__() + """ + Implements bool(self) + """ + + return bool(self.container) def __repr__(self): + """ + Implements repr(self) and str(self) + """ + return f"DeadlinesQueue({self.container})" - def put(self, amount: float, pool): + def put(self, pool: "giambio.context.TaskManager"): """ - Pushes a deadline (timeout) onto the queue with its associated - pool + Pushes a pool with its deadline onto the queue. The + timeout amount will be inferred from the pool object + itself + + :param pool: The pool object to store """ if pool not in self.pools: self.pools.add(pool) - heappush(self.container, (amount, self.sequence, pool)) + heappush(self.container, (pool.timeout, self.sequence, pool)) + self.sequence += 1 - def get(self): + def get(self) -> "giambio.context.TaskManager": """ - Gets the first task that is meant to run + Gets the first pool that is meant to expire + + :raises: IndexError if the queue is empty """ + if not self.container: + raise IndexError("get from empty DeadlinesQueue") d = heappop(self.container) self.pools.discard(d[2]) - return d + return d[2] diff --git a/giambio/traps.py b/giambio/traps.py index 0c40e7a..94ed2ce 100644 --- a/giambio/traps.py +++ b/giambio/traps.py @@ -101,7 +101,7 @@ async def want_read(stream): :param stream: The resource that needs to be read """ - await create_trap("read_or_write", stream, "read") + await create_trap("register_sock", stream, "read") async def want_write(stream): @@ -112,7 +112,7 @@ async def want_write(stream): :param stream: The resource that needs to be written """ - await create_trap("read_or_write", stream, "write") + await create_trap("register_sock", stream, "write") async def event_set(event): diff --git a/tests/cancel.py b/tests/cancel.py index e1a90c9..d76e54a 100644 --- a/tests/cancel.py +++ b/tests/cancel.py @@ -25,4 +25,4 @@ async def main(): if __name__ == "__main__": - giambio.run(main) + giambio.run(main, debugger=Debugger()) diff --git a/tests/server.py b/tests/server.py index 0a193a9..aee89f0 100644 --- a/tests/server.py +++ b/tests/server.py @@ -8,36 +8,52 @@ import sys # A test to check for asynchronous I/O -async def serve(address: tuple): +async def serve(bind_address: tuple): + """ + Serves asynchronously forever + + :param bind_address: The address to bind the server to represented as a tuple + (address, port) where address is a string and port is an integer + """ + sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - sock.bind(address) - sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + sock.bind(bind_address) sock.listen(5) async_sock = giambio.wrap_socket(sock) # We make the socket an async socket - logging.info(f"Serving asynchronously at {address[0]}:{address[1]}") + logging.info(f"Serving asynchronously at {bind_address[0]}:{bind_address[1]}") async with giambio.create_pool() as pool: - conn, address_tuple = await async_sock.accept() - logging.info(f"{address_tuple[0]}:{address_tuple[1]} connected") - pool.spawn(handler, conn, address_tuple) + while True: + conn, address_tuple = await async_sock.accept() + logging.info(f"{address_tuple[0]}:{address_tuple[1]} connected") + pool.spawn(handler, conn, address_tuple) -async def handler(sock: AsyncSocket, addr: tuple): - address = f"{addr[0]}:{addr[1]}" - async with sock: +async def handler(sock: AsyncSocket, client_address: tuple): + """ + Handles a single client connection + + :param sock: The giambio.socket.AsyncSocket object connected + to the client + :type sock: :class: giambio.socket.AsyncSocket + :param client_address: The client's address represented as a tuple + (address, port) where address is a string and port is an integer + :type client_address: tuple + """ + + address = f"{client_address[0]}:{client_address[1]}" + async with sock: # Closes the socket automatically await sock.send_all(b"Welcome to the server pal, feel free to send me something!\n") while True: await sock.send_all(b"-> ") data = await sock.receive(1024) if not data: break - elif data == b"raise\n": + elif data == b"exit\n": await sock.send_all(b"I'm dead dude\n") raise TypeError("Oh, no, I'm gonna die!") - to_send_back = data - data = data.decode("utf-8").encode("unicode_escape") - logging.info(f"Got: '{data.decode('utf-8')}' from {address}") - await sock.send_all(b"Got: " + to_send_back) - logging.info(f"Echoed back '{data.decode('utf-8')}' to {address}") + logging.info(f"Got: {data!r} from {address}") + await sock.send_all(b"Got: " + data) + logging.info(f"Echoed back {data!r} to {address}") logging.info(f"Connection from {address} closed") @@ -45,7 +61,7 @@ if __name__ == "__main__": port = int(sys.argv[1]) if len(sys.argv) > 1 else 1500 logging.basicConfig(level=20, format="[%(levelname)s] %(asctime)s %(message)s", datefmt="%d/%m/%Y %p") try: - giambio.run(serve, ("localhost", port), debugger=Debugger()) + giambio.run(serve, ("localhost", port), debugger=None) except (Exception, KeyboardInterrupt) as error: # Exceptions propagate! if isinstance(error, KeyboardInterrupt): logging.info("Ctrl+C detected, exiting") diff --git a/tests/timeout.py b/tests/timeout.py new file mode 100644 index 0000000..ca646e8 --- /dev/null +++ b/tests/timeout.py @@ -0,0 +1,33 @@ +import giambio +from debugger import Debugger + + +async def child(): + print("[child] Child spawned!! Sleeping for 5 seconds") + await giambio.sleep(5) + print("[child] Had a nice nap!") + + +async def child1(): + print("[child 1] Child spawned!! Sleeping for 10 seconds") + await giambio.sleep(10) + print("[child 1] Had a nice nap!") + + +async def main(): + start = giambio.clock() + try: + async with giambio.with_timeout(6) as pool: + # TODO: We need to consider the inner part of + # the with block as an implicit task, otherwise + # timeouts and cancellations won't work properly! + pool.spawn(child) # This will complete + pool.spawn(child1) # This will not + print("[main] Children spawned, awaiting completion") + except giambio.exceptions.TooSlowError: + print("[main] One or more children have timed out!") + print(f"[main] Children execution complete in {giambio.clock() - start:.2f} seconds") + + +if __name__ == "__main__": + giambio.run(main, debugger=())