""" aiosched: I'm bored and I'm making an async event loop again Copyright (C) 2020 nocturn9x Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at https:www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. """ import itertools from collections import deque from functools import partial from aiosched.task import Task, TaskState from timeit import default_timer from aiosched.internals.queues import TimeQueue from aiosched.util.debugging import BaseDebugger from typing import Callable, Any, Coroutine from aiosched.errors import InternalError, ResourceBusy, Cancelled, ResourceClosed from selectors import DefaultSelector, BaseSelector class FIFOKernel: """ An asynchronous event loop implementation with a FIFO scheduling policy. :param clock: The function used to keep track of time. Defaults to timeit.default_timer :param debugger: A subclass of aiosched.util.BaseDebugger or None if no debugging output is desired :type debugger: :class: aiosched.util.debugging.BaseDebugger, optional :param selector: The selector to use for I/O multiplexing, defaults to selectors.DefaultSelector :type selector: :class: selectors.DefaultSelector """ def __init__( self, clock: Callable[[], float] = default_timer, debugger: BaseDebugger | None = None, selector: BaseSelector = DefaultSelector(), ): """ Public constructor """ self.clock = clock if debugger and not issubclass(type(debugger), BaseDebugger): raise InternalError( "The debugger must be a subclass of aiosched.util.debugging.BaseDebugger" ) # The debugger object. If it is none we create a dummy object that immediately returns an empty # lambda which in turn returns None every time we access any of its attributes to avoid lots of # if self.debugger clauses self.debugger = ( debugger or type( "DumbDebugger", (object,), {"__getattr__": lambda *_: lambda *_: None}, )() ) # Abstraction layer over low-level OS # primitives for asynchronous I/O self.selector: BaseSelector = selector # Tasks that are ready to run self.run_ready: deque[Task] = deque() # Tasks that are paused and waiting # for some deadline to expire self.paused: TimeQueue = TimeQueue(self.clock) # Data that is to be sent back to coroutines self.data: dict[Task, Any] = {} # The currently running task self.current_task: Task | None = None def __repr__(self): """ Returns repr(self) """ fields = { "debugger", "run_ready", "selector", "clock", "data", "paused", "current_task", } data = ", ".join( name + "=" + str(value) for name, value in zip(fields, (getattr(self, field) for field in fields)) ) return f"{type(self).__name__}({data})" def done(self) -> bool: """ Returns whether the loop has no more work to do """ return not any([self.paused, self.run_ready, self.selector.get_map()]) def close(self, force: bool = False): """ Closes the event loop. If force equals False, which is the default, raises an InternalError exception. If force equals True, cancels all tasks. :param force: :return: """ if not self.done() and not force: raise InternalError("cannot shut down a running event loop") for task in self.all(): self.cancel(task) def all(self) -> Task: """ Yields all the tasks the event loop is keeping track of """ for task in itertools.chain(self.run_ready, self.paused): task: Task yield task def shutdown(self): """ Shuts down the event loop """ for task in self.all(): self.io_release_task(task) self.selector.close() self.close() def wait_io(self): """ Waits for I/O and implements part of the sleeping mechanism for the event loop """ before_time = self.clock() # Used for the debugger timeout = 0.0 if self.run_ready: # If there is work to do immediately (tasks to run) we # can't wait timeout = 0.0 elif self.paused: # If there are asleep tasks or deadlines, wait until the closest date timeout = self.paused.get_closest_deadline() self.debugger.before_io(timeout) io_ready = self.selector.select(timeout) # Get sockets that are ready and schedule their tasks for key, _ in io_ready: self.run_ready.extend(key.data) # Resource ready? Schedule its tasks self.debugger.after_io(self.clock() - before_time) def awake_tasks(self): """ Reschedules paused tasks if their deadline has elapsed """ while self.paused and self.paused.get_closest_deadline() <= self.clock(): # Reschedules tasks when their deadline has elapsed task, _ = self.paused.get() slept = self.clock() - task.paused_when self.run_ready.append(task) task.paused_when = 0 task.next_deadline = 0 self.debugger.after_sleep(task, slept) def reschedule_running(self): """ Reschedules the currently running task """ if self.current_task: self.run_ready.append(self.current_task) else: raise InternalError("aiosched is not running") def suspend(self): """ Suspends execution of the current task. This is basically a do-nothing method, since it will not reschedule the task before returning. The task will stay suspended as long as something else outside the loop reschedules it """ self.current_task.state = TaskState.PAUSED def run_task_step(self): """ Runs a single step for the current task. A step ends when the task awaits any of our primitives or async methods. Note that this method does NOT catch any exception arising from tasks, nor does it take StopIteration or CancelledError into account: that's the job for run()! """ # Sets the currently running task self.current_task = self.run_ready.popleft() while self.current_task.done(): # We need to make sure we don't try to execute # exited tasks that are on the running queue self.current_task = self.run_ready.popleft() self.debugger.before_task_step(self.current_task) # Some debugging and internal chatter here self.current_task.state = TaskState.RUN self.current_task.steps += 1 if self.current_task.pending_cancellation: # We perform the deferred cancellation # if it was previously scheduled self.cancel(self.current_task) else: # Run a single step with the calculation (i.e. until a yield # somewhere) method, args, kwargs = self.current_task.run(self.data.get(self.current_task)) self.data.pop(self.current_task, None) if not hasattr(self, method) and not callable(getattr(self, method)): # This if block is meant to be triggered by other async # libraries, which most likely have different trap names and behaviors # compared to us. If you get this exception, and you're 100% sure you're # not mixing async primitives from other libraries, then it's a bug! raise InternalError( "Uh oh! Something very bad just happened, did you try to mix primitives from other async libraries?" ) from None # Sneaky method call, thanks to David Beazley for this ;) getattr(self, method)(*args, **kwargs) self.debugger.after_task_step(self.current_task) def run(self): """ The event loop's runner function. This method drives execution for the entire framework and orchestrates I/O, events, sleeping, cancellations and deadlines, but the actual functionality for all of that is implemented in object wrappers. This keeps the size of this module to a minimum while allowing anyone to replace it with their own, as long as the system calls required by higher-level object wrappers are implemented. If you want to add features to the library, don't add them here, but take inspiration from the current API (i.e. not depending on any implementation detail from the loop aside from system calls) """ while True: if self.done(): # If we're done, which means there are # both no paused tasks and no running tasks, we # simply tear us down and return to self.start self.shutdown() break elif not self.run_ready: # If there are no actively running tasks, we start by # checking for I/O. This method will wait for I/O until # the closest deadline to avoid starving sleeping tasks # or missing deadlines if self.selector.get_map(): self.wait_io() if self.paused: # Next we check for deadlines self.awake_tasks() else: # Otherwise, while there are tasks ready to run, we run them! self.handle_task_run(self.run_task_step) def start(self, func: Callable[..., Coroutine[Any, Any, Any]], *args, loop: bool = True) -> Any: """ Starts the event loop from a synchronous context. If the loop parameter is false, the event loop will not start listening for events automatically and the dispatching is on the users' shoulders """ entry_point = Task(func.__name__ or str(func), func(*args)) self.run_ready.append(entry_point) self.debugger.on_start() if loop: try: self.run() finally: self.debugger.on_exit() if entry_point.exc: raise entry_point.exc return entry_point.result def io_release(self, resource): """ Releases the given resource from our selector :param resource: The resource to be released """ if self.selector.get_map() and resource in self.selector.get_map(): self.selector.unregister(resource) def io_release_task(self, task: Task): """ Calls self.io_release in a loop for each I/O resource the given task owns """ for key in dict(self.selector.get_map()).values(): if task in key.data: key.data.remove(task) if not key.data: self.selector.unregister(key.fileobj) task.last_io = () def notify_closing(self, stream): """ Notifies paused tasks that a stream is about to be closed. The stream itself is not touched and must be closed by the caller """ for k in filter( lambda o: o.fileobj == stream, dict(self.selector.get_map()).values(), ): for task in k.data: self.handle_task_run(partial(task.throw, ResourceClosed("stream has been closed")), task) def cancel(self, task: Task): """ Schedules the task to be cancelled later or does so straight away if it is safe to do so """ self.reschedule_running() if task.done(): return match task.state: case TaskState.IO: self.io_release_task(task) case TaskState.PAUSED: self.paused.discard(task) case TaskState.INIT: return self.handle_task_run(partial(task.throw, Cancelled(task)), task) if task.state == TaskState.CANCELLED: self.debugger.after_cancel(task) else: task.pending_cancellation = True def handle_task_run(self, func: Callable, task: Task | None = None): """ Convenience method for handling various exceptions from tasks """ try: func() except StopIteration as ret: # We re-define it because we call run_task_step # with this method and that changes the current # task task = task or self.current_task # At the end of the day, coroutines are generator functions with # some tricky behaviors, and this is one of them. When a coroutine # hits a return statement (either explicit or implicit), it raises # a StopIteration exception, which has an attribute named value that # represents the return value of the coroutine, if it has one. Of course # this exception is not an error, and we should happily keep going after it: # most of this code below is just useful for internal/debugging purposes task.state = TaskState.FINISHED task.result = ret.value self.wait(task) except Cancelled: # When a task needs to be cancelled, aiosched tries to do it gracefully # first: if the task is paused in either I/O or sleeping, that's perfect. # But we also need to cancel a task if it was not sleeping or waiting on # any I/O because it could never do so (therefore blocking everything # forever). So, when cancellation can't be done right away, we schedule # it for the next execution step of the task. aiosched will also make sure # to re-raise cancellations at every checkpoint until the task lets the # exception propagate into us, because we *really* want the task to be # cancelled task = task or self.current_task task.state = TaskState.CANCELLED task.pending_cancellation = False self.wait(task) except BaseException as err: # Any other exception is caught here task = task or self.current_task task.exc = err task.state = TaskState.CRASHED self.wait(task) def sleep(self, seconds: int | float): """ Puts the current task to sleep for a given amount of seconds """ if seconds: self.debugger.before_sleep(self.current_task, seconds) self.paused.put(self.current_task, seconds) else: # When we're called with a timeout of 0, this method acts as a checkpoint # that allows aiosched to kick in and to its job without pausing the task's # execution for too long. It is recommended to put a couple of checkpoints # like these in your code if you see degraded concurrent performance in parts # of your code that block the loop self.reschedule_running() def wait(self, task: Task): """ Makes the current task wait for completion of the given one """ if task.done(): self.run_ready.extend(task.joiners) task.joiners = {} else: task.joiners.add(self.current_task) def spawn(self, func: Callable[..., Coroutine[Any, Any, Any]], *args, **kwargs): """ Spawns a task from a coroutine function. All positional and keyword arguments besides the coroutine function itself are passed to the newly created coroutine """ task = Task(func.__name__ or repr(func), func(*args, **kwargs)) self.data[self.current_task] = task self.run_ready.append(task) self.reschedule_running() def perform_io(self, resource, evt_type: int): """ Registers the given resource inside our selector to perform I/O multiplexing :param resource: The resource on which a read or write operation has to be performed :param evt_type: The type of event to perform on the given socket, either selectors.EVENT_READ or selectors.EVENT_WRITE :type evt_type: int """ self.current_task.state = TaskState.IO if self.current_task.last_io: # Since, most of the time, tasks will perform multiple # I/O operations on a given resource, unregistering them # every time isn't a sensible approach. A quick and # easy optimization to address this problem is to # store the last I/O operation that the task performed # together with the resource itself, inside the task # object. If the task then tries to perform the same # operation on the same resource again, then this method # returns immediately as it is already being watched by # the selector. If the resource is the same, but the # event type has changed, then we modify the resource's # associated event. Only if the resource is different from # the last one used, then this method will register a new # one if self.current_task.last_io == (evt_type, resource): # Socket is already listening for that event! return elif self.current_task.last_io[1] == resource: # If the event to listen for has changed we just modify it self.selector.modify(resource, evt_type, self.current_task) self.current_task.last_io = (evt_type, resource) elif not self.current_task.last_io or self.current_task.last_io[1] != resource: # The task has either registered a new socket or is doing # I/O for the first time. In both cases, we register a new socket self.current_task.last_io = evt_type, resource try: self.selector.register(resource, evt_type, [self.current_task]) except KeyError: # The resource is already registered doing something else: we try # to see if we can modify the event key = self.selector.get_key(resource) if evt_type != key.events: self.selector.modify( resource, evt_type | key.events, key.data + [self.current_task] ) # If we get here, two tasks are trying to read or write on the same resource at the same time raise ResourceBusy( "The given resource is being read from/written to from another task" ) from None