diff --git a/.idea/aiosched.iml b/.idea/aiosched.iml new file mode 100644 index 0000000..74d515a --- /dev/null +++ b/.idea/aiosched.iml @@ -0,0 +1,10 @@ + + + + + + + + + + \ No newline at end of file diff --git a/aiosched/__init__.py b/aiosched/__init__.py new file mode 100644 index 0000000..f151b51 --- /dev/null +++ b/aiosched/__init__.py @@ -0,0 +1,23 @@ +""" +aiosched: I'm bored and I'm making an async event loop again + +Copyright (C) 2020 nocturn9x + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + https:www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +""" +from aiosched.runtime import run, get_event_loop, new_event_loop, clock +from aiosched.internals.syscalls import spawn, wait, sleep, cancel +import aiosched.task +import aiosched.errors + +__all__ = ["run", "get_event_loop", "new_event_loop", "spawn", "wait", "sleep", "task", "errors", "cancel"] diff --git a/aiosched/errors.py b/aiosched/errors.py new file mode 100644 index 0000000..b0b7307 --- /dev/null +++ b/aiosched/errors.py @@ -0,0 +1,62 @@ +""" +aiosched: I'm bored and I'm making an async event loop again + +Copyright (C) 2020 nocturn9x + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + https:www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +""" +from aiosched.task import Task + + +class SchedulerError(Exception): + """ + A generic scheduler error + """ + + +class InternalError(SchedulerError): + """ + Internal exception + """ + + +class ResourceBusy(SchedulerError): + """ + Exception that is raised when a resource is + accessed by more than one task at a time + """ + + +class ResourceClosed(SchedulerError): + """ + Raised when I/O is attempted on a closed + resource + """ + + +class TimedOutError(SchedulerError): + """ + This is raised if a timeout expires + """ + + task: Task + + +class Cancelled(BaseException): + """ + A cancellation exception. + Inherits from BaseException as + it is not meant to be caught + """ + + task: Task diff --git a/aiosched/internals/__init__.py b/aiosched/internals/__init__.py new file mode 100644 index 0000000..9571994 --- /dev/null +++ b/aiosched/internals/__init__.py @@ -0,0 +1,17 @@ +""" +aiosched: I'm bored and I'm making an async event loop again + +Copyright (C) 2020 nocturn9x + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + https:www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +""" diff --git a/aiosched/internals/queues.py b/aiosched/internals/queues.py new file mode 100644 index 0000000..597e18a --- /dev/null +++ b/aiosched/internals/queues.py @@ -0,0 +1,173 @@ +""" +aiosched: I'm bored and I'm making an async event loop again + +Copyright (C) 2020 nocturn9x + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + https:www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +""" +from typing import Callable, Any +from aiosched.task import Task, TaskState +from heapq import heappush, heappop, heapify + + +class TimeQueue: + """ + An abstraction layer over a heap queue based on time. This is where + paused tasks will be put when they are not running + + :param clock: The same monotonic clock that was passed to the thread-local event loop. + It is important for the queue to be synchronized with the loop as this allows + the sleeping mechanism to work reliably + """ + + def __init__(self, clock: Callable[[], float]): + """ + Object constructor + """ + + self.clock = clock + # The sequence float handles the race condition + # of two tasks with identical deadlines, acting + # as a tiebreaker + self.sequence = 0 + self.container: list[tuple[float, int, Task, dict[str, Any]]] = [] + + def __len__(self): + """ + Returns len(self) + """ + + return len(self.container) + + def __contains__(self, item: Task): + """ + Implements item in self. This method behaves + as if the queue only contained tasks and ignores + their timeouts and tiebreakers + """ + + for i in self.container: + if i[2] == item: + return True + return False + + def index(self, item: Task): + """ + Returns the index of the given item in the list + or -1 if it is not present + """ + + for i, e in enumerate(self.container): + if e[2] == item: + return i + return -1 + + def discard(self, item: Task): + """ + Discards an item from the queue and + calls heapify(self.container) to keep + the heap invariant if an element is removed. + This method does nothing if the item is not + in the queue, but note that in this case the + operation would still take O(n) iterations + to complete + + :param item: The item to be discarded + """ + + idx = self.index(item) + if idx != -1: + self.container.pop(idx) + heapify(self.container) + + def get_closest_deadline(self) -> float: + """ + Returns the closest deadline that is meant to expire + or raises IndexError if the queue is empty + """ + + if not self: + raise IndexError("TimeQueue is empty") + return self.container[0][0] + + def __iter__(self): + """ + Implements iter(self) + """ + + return self + + def __next__(self): + """ + Implements next(self) + """ + + try: + return self.get() + except IndexError: + raise StopIteration from None + + def __getitem__(self, item: int): + """ + Implements self[n] + """ + + return self.container.__getitem__(item) + + def __bool__(self): + """ + Implements bool(self) + """ + + return bool(self.container) + + def __repr__(self): + """ + Implements repr(self) and str(self) + """ + + return f"TimeQueue({self.container}, clock={self.clock})" + + def put(self, task: Task, delay: float, metadata: dict[str, Any] | None = None): + """ + Pushes a task onto the queue together with its + delay and optional metadata + + :param task: The task that is meant to sleep + :type task: :class: Task + :param delay: The delay associated with the task + :type delay: float + :param metadata: A dictionary representing additional + task metadata. Defaults to None + :type metadata: dict[str, Any], optional + """ + + time = self.clock() + task.paused_when = time + task.state = TaskState.PAUSED + task.next_deadline = task.paused_when + delay + heappush(self.container, (time + delay, self.sequence, task, metadata)) + self.sequence += 1 + + def get(self) -> tuple[Task, dict[str, Any] | None]: + """ + Gets the first task that is meant to run along + with its metadata + + :raises: IndexError if the queue is empty + """ + + if not self.container: + raise IndexError("get from empty TimeQueue") + _, __, task, meta = heappop(self.container) + return task, meta diff --git a/aiosched/internals/syscalls.py b/aiosched/internals/syscalls.py new file mode 100644 index 0000000..10748b4 --- /dev/null +++ b/aiosched/internals/syscalls.py @@ -0,0 +1,172 @@ +""" +aiosched: I'm bored and I'm making an async event loop again + +Copyright (C) 2020 nocturn9x + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + https:www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +""" + +import inspect +from types import coroutine +from typing import Any, Callable, Coroutine +from aiosched.task import Task, TaskState +from aiosched.errors import SchedulerError +from selectors import EVENT_READ, EVENT_WRITE + + +@coroutine +def syscall(method: str, *args, **kwargs) -> Any | None: + """ + Lowest-level primitive to interact with the event loop: + calls a loop method with the provided arguments. This + function should not be used directly, but through abstraction + layers. All positional and keyword arguments are passed to + the method itself and its return value is provided once the + loop yields control back to us + + :param method: The loop method to call + :type method: str + :returns: The result of the method call, if any + """ + + result = yield method, args, kwargs + return result + + +async def spawn(func: Callable[..., Coroutine[Any, Any, Any]], *args, **kwargs) -> Task: + """ + Spawns a task from a coroutine and returns it. The coroutine + is put on the running cute and is executed as soon as possible. + Any positional and keyword arguments are passed along to the coroutine + + :param func: The coroutine function to instantiate. Note that this should NOT + be a coroutine object: the arguments to the coroutine should be passed to + spawn as well + :return: + """ + + if inspect.iscoroutine(func): + raise TypeError( + "Looks like you tried to call spawn(your_func(arg1, arg2, ...)), that is wrong!" + "\nWhat you wanna do, instead, is this: spawn(your_func, arg1, arg2, ...)" + ) + elif inspect.iscoroutinefunction(func): + return await syscall("spawn", func, *args, **kwargs) + else: + raise TypeError("func must be a coroutine function") + + +async def sleep(delay: int | float): + """ + Puts the calling task to sleep for the + given amount of time. If the delay is equal + to zero, this call acts as a checkpoint to + perform task switching and is useful to release + pressure from the scheduler in highly concurrent + environments + + :param delay: The amount of time (in seconds) that + the task has to be put to sleep for. Must be + greater than zero + :type delay: int | float + """ + + await syscall("sleep", delay) + + +async def checkpoint(): + """ + Shorthand for sleep(0) + """ + + await sleep(0) + + +async def suspend(): + """ + Suspends the current task. The task is not + rescheduled until some other event (for example + a timer, an event or an I/O operation) reschedules + it + """ + + await syscall("suspend") + + +async def wait(task: Task) -> Any | None: + """ + Waits for the completion of a + given task and returns its + return value. Can be called + multiple times by multiple tasks. + Raises an error if the task has + completed already. Please note that + exceptions are propagated, too + + :param task: The task to wait for + :type task: :class: Task + :returns: The task's return value, if any + """ + + if task.done(): + raise SchedulerError(f"task {task.name!r} has completed already") + await syscall("wait", task) + if task.exc: + raise task.exc + return task.result + + +async def cancel(task: Task): + """ + Cancels the given task. Note that + cancellations may not happen immediately + if the task is blocked in an uninterruptible + state + + :param task: The task to wait for + :type task: :class: Task + """ + + await syscall("cancel", task) + if task.state != TaskState.CANCELLED: + raise SchedulerError(f"task {task.name!r} ignored cancellation") + + +async def closing(stream): + """ + Notifies the event loop that the + given stream is about to be closed, + causing all callers waiting on it + to error out with an exception instead + of blocking forever + """ + + await syscall("notify_closing", stream) + + +async def wait_readable(stream): + """ + Waits until the given stream is + readable + """ + + await syscall("perform_io", stream, EVENT_READ) + + +async def wait_writable(stream): + """ + Waits until the given stream is + writable + """ + + await syscall("perform_io", stream, EVENT_WRITE) diff --git a/aiosched/kernel.py b/aiosched/kernel.py new file mode 100644 index 0000000..1c8e30c --- /dev/null +++ b/aiosched/kernel.py @@ -0,0 +1,492 @@ +""" +aiosched: I'm bored and I'm making an async event loop again + +Copyright (C) 2020 nocturn9x + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + https:www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +""" +import itertools +from collections import deque +from functools import partial +from aiosched.task import Task, TaskState +from timeit import default_timer +from aiosched.internals.queues import TimeQueue +from aiosched.util.debugging import BaseDebugger +from typing import Callable, Any, Coroutine +from aiosched.errors import InternalError, ResourceBusy, Cancelled, ResourceClosed +from selectors import DefaultSelector, BaseSelector + + +class FIFOKernel: + """ + An asynchronous event loop implementation with a FIFO + scheduling policy. + + :param clock: The function used to keep track of time. Defaults to timeit.default_timer + :param debugger: A subclass of aiosched.util.BaseDebugger or None if no debugging output is desired + :type debugger: :class: aiosched.util.debugging.BaseDebugger, optional + :param selector: The selector to use for I/O multiplexing, defaults to selectors.DefaultSelector + :type selector: :class: selectors.DefaultSelector + """ + + def __init__( + self, + clock: Callable[[], float] = default_timer, + debugger: BaseDebugger | None = None, + selector: BaseSelector = DefaultSelector(), + ): + """ + Public constructor + """ + + self.clock = clock + if debugger and not issubclass(type(debugger), BaseDebugger): + raise InternalError( + "The debugger must be a subclass of aiosched.util.debugging.BaseDebugger" + ) + # The debugger object. If it is none we create a dummy object that immediately returns an empty + # lambda which in turn returns None every time we access any of its attributes to avoid lots of + # if self.debugger clauses + self.debugger = ( + debugger + or type( + "DumbDebugger", + (object,), + {"__getattr__": lambda *_: lambda *_: None}, + )() + ) + # Abstraction layer over low-level OS + # primitives for asynchronous I/O + self.selector: BaseSelector = selector + # Tasks that are ready to run + self.run_ready: deque[Task] = deque() + # Tasks that are paused and waiting + # for some deadline to expire + self.paused: TimeQueue = TimeQueue(self.clock) + # Data that is to be sent back to coroutines + self.data: dict[Task, Any] = {} + # The currently running task + self.current_task: Task | None = None + + def __repr__(self): + """ + Returns repr(self) + """ + + fields = { + "debugger", + "run_ready", + "selector", + "clock", + "data", + "paused", + "current_task", + } + data = ", ".join( + name + "=" + str(value) + for name, value in zip(fields, (getattr(self, field) for field in fields)) + ) + return f"{type(self).__name__}({data})" + + def done(self) -> bool: + """ + Returns whether the loop has no more work + to do + """ + + return not any([self.paused, self.run_ready, self.selector.get_map()]) + + def close(self, force: bool = False): + """ + Closes the event loop. If force equals False, + which is the default, raises an InternalError + exception. If force equals True, cancels all + tasks. + + :param force: + :return: + """ + + if not self.done() and not force: + raise InternalError("cannot shut down a running event loop") + for task in self.all(): + self.cancel(task) + + def all(self) -> Task: + """ + Yields all the tasks the event loop is keeping track of + """ + + for task in itertools.chain(self.run_ready, self.paused): + task: Task + yield task + + def shutdown(self): + """ + Shuts down the event loop + """ + + for task in self.all(): + self.io_release_task(task) + self.selector.close() + self.close() + + def wait_io(self): + """ + Waits for I/O and implements part of the sleeping mechanism + for the event loop + """ + + before_time = self.clock() # Used for the debugger + timeout = 0.0 + if self.run_ready: + # If there is work to do immediately (tasks to run) we + # can't wait + timeout = 0.0 + elif self.paused: + # If there are asleep tasks or deadlines, wait until the closest date + timeout = self.paused.get_closest_deadline() + self.debugger.before_io(timeout) + io_ready = self.selector.select(timeout) + # Get sockets that are ready and schedule their tasks + for key, _ in io_ready: + self.run_ready.extend(key.data) # Resource ready? Schedule its tasks + self.debugger.after_io(self.clock() - before_time) + + def awake_tasks(self): + """ + Reschedules paused tasks if their deadline + has elapsed + """ + + while self.paused and self.paused.get_closest_deadline() <= self.clock(): + # Reschedules tasks when their deadline has elapsed + task, _ = self.paused.get() + slept = self.clock() - task.paused_when + self.run_ready.append(task) + task.paused_when = 0 + task.next_deadline = 0 + self.debugger.after_sleep(task, slept) + + def reschedule_running(self): + """ + Reschedules the currently running task + """ + + if self.current_task: + self.run_ready.append(self.current_task) + else: + raise InternalError("aiosched is not running") + + def suspend(self): + """ + Suspends execution of the current task. This is basically + a do-nothing method, since it will not reschedule the task + before returning. The task will stay suspended as long as + something else outside the loop reschedules it + """ + + self.current_task.state = TaskState.PAUSED + + def run_task_step(self): + """ + Runs a single step for the current task. + A step ends when the task awaits any of + our primitives or async methods. + + Note that this method does NOT catch any + exception arising from tasks, nor does it + take StopIteration or CancelledError into + account: that's the job for run()! + """ + + # Sets the currently running task + self.current_task = self.run_ready.popleft() + while self.current_task.done(): + # We need to make sure we don't try to execute + # exited tasks that are on the running queue + self.current_task = self.run_ready.popleft() + self.debugger.before_task_step(self.current_task) + # Some debugging and internal chatter here + self.current_task.state = TaskState.RUN + self.current_task.steps += 1 + if self.current_task.pending_cancellation: + # We perform the deferred cancellation + # if it was previously scheduled + self.cancel(self.current_task) + else: + # Run a single step with the calculation (i.e. until a yield + # somewhere) + method, args, kwargs = self.current_task.run(self.data.get(self.current_task)) + self.data.pop(self.current_task, None) + if not hasattr(self, method) and not callable(getattr(self, method)): + # This if block is meant to be triggered by other async + # libraries, which most likely have different trap names and behaviors + # compared to us. If you get this exception, and you're 100% sure you're + # not mixing async primitives from other libraries, then it's a bug! + raise InternalError( + "Uh oh! Something very bad just happened, did you try to mix primitives from other async libraries?" + ) from None + # Sneaky method call, thanks to David Beazley for this ;) + getattr(self, method)(*args, **kwargs) + self.debugger.after_task_step(self.current_task) + + def run(self): + """ + The event loop's runner function. This method drives + execution for the entire framework and orchestrates I/O, + events, sleeping, cancellations and deadlines, but the + actual functionality for all of that is implemented in + object wrappers. This keeps the size of this module to + a minimum while allowing anyone to replace it with their + own, as long as the system calls required by higher-level + object wrappers are implemented. If you want to add features + to the library, don't add them here, but take inspiration + from the current API (i.e. not depending on any implementation + detail from the loop aside from system calls) + """ + + while True: + if self.done(): + # If we're done, which means there are + # both no paused tasks and no running tasks, we + # simply tear us down and return to self.start + self.shutdown() + break + elif not self.run_ready: + # If there are no actively running tasks, we start by + # checking for I/O. This method will wait for I/O until + # the closest deadline to avoid starving sleeping tasks + # or missing deadlines + if self.selector.get_map(): + self.wait_io() + if self.paused: + # Next we check for deadlines + self.awake_tasks() + else: + # Otherwise, while there are tasks ready to run, we run them! + self.handle_task_run(self.run_task_step) + + def start(self, func: Callable[..., Coroutine[Any, Any, Any]], *args, loop: bool = True) -> Any: + """ + Starts the event loop from a synchronous context. If the loop parameter + is false, the event loop will not start listening for events + automatically and the dispatching is on the users' shoulders + """ + + entry_point = Task(func.__name__ or str(func), func(*args)) + self.run_ready.append(entry_point) + self.debugger.on_start() + if loop: + try: + self.run() + finally: + self.debugger.on_exit() + if entry_point.exc: + raise entry_point.exc + return entry_point.result + + def io_release(self, resource): + """ + Releases the given resource from our + selector + :param resource: The resource to be released + """ + + if self.selector.get_map() and resource in self.selector.get_map(): + self.selector.unregister(resource) + + def io_release_task(self, task: Task): + """ + Calls self.io_release in a loop + for each I/O resource the given task owns + """ + + for key in dict(self.selector.get_map()).values(): + if task in key.data: + key.data.remove(task) + if not key.data: + self.selector.unregister(key.fileobj) + task.last_io = () + + def notify_closing(self, stream): + """ + Notifies paused tasks that a stream + is about to be closed. The stream + itself is not touched and must be + closed by the caller + """ + + for k in filter( + lambda o: o.fileobj == stream, + dict(self.selector.get_map()).values(), + ): + for task in k.data: + self.handle_task_run(partial(task.throw, ResourceClosed("stream has been closed")), task) + + def cancel(self, task: Task): + """ + Schedules the task to be cancelled later + or does so straight away if it is safe to do so + """ + + self.reschedule_running() + if task.done(): + return + match task.state: + case TaskState.IO: + self.io_release_task(task) + case TaskState.PAUSED: + self.paused.discard(task) + case TaskState.INIT: + return + self.handle_task_run(partial(task.throw, Cancelled(task)), task) + if task.state == TaskState.CANCELLED: + self.debugger.after_cancel(task) + else: + task.pending_cancellation = True + + def handle_task_run(self, func: Callable, task: Task | None = None): + """ + Convenience method for handling various exceptions + from tasks + """ + + try: + func() + except StopIteration as ret: + # We re-define it because we call run_task_step + # with this method and that changes the current + # task + task = task or self.current_task + # At the end of the day, coroutines are generator functions with + # some tricky behaviors, and this is one of them. When a coroutine + # hits a return statement (either explicit or implicit), it raises + # a StopIteration exception, which has an attribute named value that + # represents the return value of the coroutine, if it has one. Of course + # this exception is not an error, and we should happily keep going after it: + # most of this code below is just useful for internal/debugging purposes + task.state = TaskState.FINISHED + task.result = ret.value + self.wait(task) + except Cancelled: + # When a task needs to be cancelled, aiosched tries to do it gracefully + # first: if the task is paused in either I/O or sleeping, that's perfect. + # But we also need to cancel a task if it was not sleeping or waiting on + # any I/O because it could never do so (therefore blocking everything + # forever). So, when cancellation can't be done right away, we schedule + # it for the next execution step of the task. aiosched will also make sure + # to re-raise cancellations at every checkpoint until the task lets the + # exception propagate into us, because we *really* want the task to be + # cancelled + task = task or self.current_task + task.state = TaskState.CANCELLED + task.pending_cancellation = False + self.wait(task) + except BaseException as err: + # Any other exception is caught here + task = task or self.current_task + task.exc = err + task.state = TaskState.CRASHED + self.wait(task) + + def sleep(self, seconds: int | float): + """ + Puts the current task to sleep for a given amount of seconds + """ + + if seconds: + self.debugger.before_sleep(self.current_task, seconds) + self.paused.put(self.current_task, seconds) + else: + # When we're called with a timeout of 0, this method acts as a checkpoint + # that allows aiosched to kick in and to its job without pausing the task's + # execution for too long. It is recommended to put a couple of checkpoints + # like these in your code if you see degraded concurrent performance in parts + # of your code that block the loop + self.reschedule_running() + + def wait(self, task: Task): + """ + Makes the current task wait for completion of the given one + """ + + if task.done(): + self.run_ready.extend(task.joiners) + task.joiners = {} + else: + task.joiners.add(self.current_task) + + def spawn(self, func: Callable[..., Coroutine[Any, Any, Any]], *args, **kwargs): + """ + Spawns a task from a coroutine function. All positional and keyword arguments + besides the coroutine function itself are passed to the newly created coroutine + """ + + task = Task(func.__name__ or repr(func), func(*args, **kwargs)) + self.data[self.current_task] = task + self.run_ready.append(task) + self.reschedule_running() + + def perform_io(self, resource, evt_type: int): + """ + Registers the given resource inside our selector to perform I/O multiplexing + + :param resource: The resource on which a read or write operation + has to be performed + :param evt_type: The type of event to perform on the given + socket, either selectors.EVENT_READ or selectors.EVENT_WRITE + :type evt_type: int + """ + + self.current_task.state = TaskState.IO + if self.current_task.last_io: + # Since, most of the time, tasks will perform multiple + # I/O operations on a given resource, unregistering them + # every time isn't a sensible approach. A quick and + # easy optimization to address this problem is to + # store the last I/O operation that the task performed + # together with the resource itself, inside the task + # object. If the task then tries to perform the same + # operation on the same resource again, then this method + # returns immediately as it is already being watched by + # the selector. If the resource is the same, but the + # event type has changed, then we modify the resource's + # associated event. Only if the resource is different from + # the last one used, then this method will register a new + # one + if self.current_task.last_io == (evt_type, resource): + # Socket is already listening for that event! + return + elif self.current_task.last_io[1] == resource: + # If the event to listen for has changed we just modify it + self.selector.modify(resource, evt_type, self.current_task) + self.current_task.last_io = (evt_type, resource) + elif not self.current_task.last_io or self.current_task.last_io[1] != resource: + # The task has either registered a new socket or is doing + # I/O for the first time. In both cases, we register a new socket + self.current_task.last_io = evt_type, resource + try: + self.selector.register(resource, evt_type, [self.current_task]) + except KeyError: + # The resource is already registered doing something else: we try + # to see if we can modify the event + key = self.selector.get_key(resource) + if evt_type != key.events: + self.selector.modify( + resource, evt_type | key.events, key.data + [self.current_task] + ) + # If we get here, two tasks are trying to read or write on the same resource at the same time + raise ResourceBusy( + "The given resource is being read from/written to from another task" + ) from None diff --git a/aiosched/runtime.py b/aiosched/runtime.py new file mode 100644 index 0000000..67764fe --- /dev/null +++ b/aiosched/runtime.py @@ -0,0 +1,86 @@ +""" +aiosched: I'm bored and I'm making an async event loop again + +Copyright (C) 2020 nocturn9x + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + https:www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +""" + +import inspect +from threading import local +from timeit import default_timer +from aiosched.kernel import FIFOKernel +from aiosched.errors import SchedulerError +from aiosched.util.debugging import BaseDebugger +from typing import Coroutine, Callable, Any + + +local_storage = local() + + +def get_event_loop(): + """ + Returns the event loop associated to the current + thread + """ + + try: + return local_storage.loop + except AttributeError: + raise SchedulerError("loop is not running") from None + + +def new_event_loop(clock_function: Callable, debugger: BaseDebugger | None = None): + """ + Associates a new event loop to the current thread + and deactivates the old one. This should not be + called explicitly unless you know what you're doing. + If an event loop is currently set, and it is running, + a SchedulerError exception is raised + """ + + try: + loop = get_event_loop() + except SchedulerError: + local_storage.loop = FIFOKernel(clock_function, debugger) + else: + if not loop.done(): + raise SchedulerError("cannot change event loop while running") + else: + loop.close() + local_storage.loop = FIFOKernel(clock_function, debugger) + + +def run(func: Callable[[Any, Any], Coroutine[Any, Any, Any]], debugger: BaseDebugger | None = None, *args, **kwargs): + """ + Starts the event loop from a synchronous entry point + """ + + if inspect.iscoroutine(func): + raise SchedulerError( + "Looks like you tried to call aiosched.run(your_func(arg1, arg2, ...)), that is wrong!" + "\nWhat you wanna do, instead, is this: aiosched.run(your_func, arg1, arg2, ...)" + ) + elif not inspect.iscoroutinefunction(func): + raise SchedulerError("aiosched.run() requires an async function as parameter!") + new_event_loop(kwargs.get("clock", default_timer), debugger) + get_event_loop().start(func, *args) + + +def clock() -> float: + """ + Returns the current clock time of the thread-local event + loop + """ + + return get_event_loop().clock() diff --git a/aiosched/task.py b/aiosched/task.py new file mode 100644 index 0000000..4394dc9 --- /dev/null +++ b/aiosched/task.py @@ -0,0 +1,133 @@ +""" +aiosched: I'm bored and I'm making an async event loop again + +Copyright (C) 2020 nocturn9x + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + https:www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +""" +import warnings +from enum import Enum, auto +from typing import Coroutine, Any +from dataclasses import dataclass, field + + +class TaskState(Enum): + """ + An enumeration of task states + """ + + # Task has been created and is + # ready to run + INIT: int = auto() + # Task is executing synchronous code + RUN: int = auto() + # Task is waiting on an I/O resource + IO: int = auto() + # Task is sleeping or waiting on an + # event + PAUSED: int = auto() + # Task has exited with an exception + CRASHED: int = auto() + # Task has been cancelled (either + # explicitly or implicitly) + CANCELLED: int = auto() + # Task has finished executing normally + FINISHED: int = auto() + + +@dataclass +class Task: + + """ + A simple wrapper around a coroutine object + """ + + # The name of the task. Usually this equals self.coroutine.__name__, + # but it may fall back to repr(self.coroutine) + name: str + # The underlying coroutine object to wrap + coroutine: Coroutine + # This attribute will be None unless the task raised an error + exc: BaseException | None = None + # The return value of the coroutine + result: Any | None = None + # Task status + state: int = TaskState.INIT + # This attribute counts how many times the task's run() method has been called + steps: int = 0 + # Simple optimization to improve the selector's efficiency. Stores the task's last + # I/O operation as well as a reference to the file descriptor it was performed on + last_io: tuple[int, Any] | None = None + # All the tasks waiting on this task's completion + joiners: set["Task"] = field(default_factory=set) + # Whether this task has a pending cancellation scheduled. This allows us to delay + # cancellation delivery as soon as the task calls another loop primitive + pending_cancellation: bool = False + # The time when the task was put on the waiting queue + paused_when: float = 0.0 + # The next deadline, in terms of the absolute clock of the loop, associated to the task + next_deadline: float = 0.0 + + def run(self, what: Any | None = None): + """ + Simple abstraction layer over a coroutine's send method + + :param what: The object that has to be sent to the coroutine, + defaults to None + :type what: Any, optional + """ + + return self.coroutine.send(what) + + def throw(self, err: BaseException): + """ + Simple abstraction layer over a coroutine's throw method + + :param err: The exception that has to be raised inside + the task + :type err: BaseException + """ + + self.exc = err + return self.coroutine.throw(err) + + def __hash__(self): + """ + Implements hash(self) + """ + + return hash(self.coroutine) + + def done(self): + """ + Returns True if the task is not running, + False otherwise + """ + + return self.state in [ + TaskState.CANCELLED, + TaskState.CRASHED, + TaskState.FINISHED, + ] + + def __del__(self): + """ + Task destructor + """ + + try: + self.coroutine.close() + except RuntimeError: + pass # TODO: This is kinda bad + if self.last_io: + warnings.warn(f"task '{self.name}' was destroyed, but has pending I/O") diff --git a/aiosched/util/__init__.py b/aiosched/util/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/aiosched/util/debugging.py b/aiosched/util/debugging.py new file mode 100644 index 0000000..c751c74 --- /dev/null +++ b/aiosched/util/debugging.py @@ -0,0 +1,194 @@ +from abc import ABC, abstractmethod +from aiosched.task import Task + + +class BaseDebugger(ABC): + """ + The base for all debugger objects + """ + + @abstractmethod + def on_start(self): + """ + This method is called when the event + loop starts executing + """ + + return NotImplemented + + @abstractmethod + def on_exit(self): + """ + This method is called when the event + loop exits entirely (all tasks completed) + """ + + return NotImplemented + + @abstractmethod + def on_task_schedule(self, task: Task, delay: float): + """ + This method is called when a new task is + scheduled (not spawned) + + :param task: The Task object representing a + aiosched Task and wrapping a coroutine + :type task: :class: aiosched.task.Task + :param delay: The delay, in seconds, after which + the task will start executing + :type delay: float + """ + + return NotImplemented + + @abstractmethod + def on_task_spawn(self, task: Task): + """ + This method is called when a new task is + spawned + + :param task: The Task object representing a + aiosched Task and wrapping a coroutine + :type task: :class: aiosched.task.Task + """ + + return NotImplemented + + @abstractmethod + def on_task_exit(self, task: Task): + """ + This method is called when a task exits + + :param task: The Task object representing an + aiosched Task and wrapping a coroutine + :type task: :class: aiosched.task.Task + """ + + return NotImplemented + + @abstractmethod + def before_task_step(self, task: Task): + """ + This method is called right before + calling a task's run() method + + :param task: The Task object representing an + aiosched Task and wrapping a coroutine + :type task: :class: aiosched.task.Task + """ + + return NotImplemented + + @abstractmethod + def after_task_step(self, task: Task): + """ + This method is called right after + calling a task's run() method + + :param task: The Task object representing an + aiosched Task and wrapping a coroutine + :type task: :class: aiosched.task.Task + """ + + return NotImplemented + + @abstractmethod + def before_sleep(self, task: Task, seconds: float): + """ + This method is called before a task goes + to sleep + + :param task: The Task object representing an + aiosched Task and wrapping a coroutine + :type task: :class: aiosched.task.Task + :param seconds: The amount of seconds the + task wants to sleep + :type seconds: int + """ + + return NotImplemented + + @abstractmethod + def after_sleep(self, task: Task, seconds: float): + """ + This method is called after a tasks + awakes from sleeping + + :param task: The Task object representing an + aiosched Task and wrapping a coroutine + :type task: :class: aiosched.task.Task + :param seconds: The amount of seconds the + task actually slept + :type seconds: float + """ + + return NotImplemented + + @abstractmethod + def before_io(self, timeout: float): + """ + This method is called right before + the event loop checks for I/O events + + :param timeout: The max. amount of seconds + that the loop will hang when using the select() + system call + :type timeout: float + """ + + return NotImplemented + + @abstractmethod + def after_io(self, timeout: float): + """ + This method is called right after + the event loop has checked for I/O events + + :param timeout: The actual amount of seconds + that the loop has hung when using the select() + system call + :type timeout: float + """ + + return NotImplemented + + @abstractmethod + def before_cancel(self, task: Task): + """ + This method is called right before a task + gets cancelled + + :param task: The Task object representing a + aiosched Task and wrapping a coroutine + :type task: :class: aiosched.task.Task + """ + + return NotImplemented + + @abstractmethod + def after_cancel(self, task: Task) -> object: + """ + This method is called right after a task + gets cancelled + + :param task: The Task object representing a + aiosched Task and wrapping a coroutine + :type task: :class: aiosched.task.Task + """ + + return NotImplemented + + @abstractmethod + def on_exception_raised(self, task: Task, exc: BaseException): + """ + This method is called right after a task + has raised an exception + + :param task: The Task object representing a + aiosched Task and wrapping a coroutine + :type task: :class: aiosched.task.Task + :param exc: The exception that was raised + :type exc: BaseException + """ + + return NotImplemented diff --git a/setup.py b/setup.py new file mode 100644 index 0000000..a2b1c00 --- /dev/null +++ b/setup.py @@ -0,0 +1,23 @@ +import setuptools + + +if __name__ == "__main__": + with open("README.md", "r") as readme: + long_description = readme.read() + setuptools.setup( + name="aiosched", + version="0.1.0", + author="Nocturn9x", + author_email="hackhab@gmail.com", + description="An asynchronous scheduler", + long_description=long_description, + long_description_content_type="text/markdown", + url="https://git.nocturn9x.space/nocturn9x/aiosched", + packages=setuptools.find_packages(), + classifiers=[ + "Programming Language :: Python :: 3", + "Operating System :: OS Independent", + "License :: OSI Approved :: Apache License 2.0", + ], + python_requires=">=3.8", + ) diff --git a/tests/cancel.py b/tests/cancel.py new file mode 100644 index 0000000..ba94ec0 --- /dev/null +++ b/tests/cancel.py @@ -0,0 +1,36 @@ +import random +import aiosched +from debugger import Debugger + + +async def child(name: str, n: int): + before = aiosched.clock() + print(f"[child {name}] Sleeping for {n} seconds") + try: + await aiosched.sleep(n) + except aiosched.errors.Cancelled: + print(f"[child {name}] Oh no, I've been cancelled!") + raise # We re-raise, or things break + print(f"[child {name}] Done! Slept for {aiosched.clock() - before:.2f} seconds") + + +async def main(children: list[tuple[str, int]]): + tasks: list[aiosched.task.Task] = [] + print("[main] Spawning children") + for name, delay in children: + tasks.append(await aiosched.spawn(child, name, delay)) + print(f"[main] Spawned {len(tasks)} children") + print(f"[main] Cancelling a random child") + cancelled = random.choice(tasks) + await aiosched.cancel(cancelled) + tasks.remove(cancelled) + print(f"[main] Waiting for {len(tasks)} children") + before = aiosched.clock() + for i, task in enumerate(tasks): + print(f"[main] Waiting for child #{i + 1}") + await aiosched.wait(task) + print(f"[main] Child #{i + 1} has exited") + print(f"[main] Children exited in {aiosched.clock() - before:.2f} seconds") + + +aiosched.run(main, None, [("first", 1), ("second", 2), ("third", 3)]) diff --git a/tests/catch.py b/tests/catch.py new file mode 100644 index 0000000..91b9dd8 --- /dev/null +++ b/tests/catch.py @@ -0,0 +1,25 @@ +import aiosched +from debugger import Debugger + + +async def child(name: str, n: int): + before = aiosched.clock() + print(f"[child {name}] Sleeping for {n} seconds") + await aiosched.sleep(n) + print(f"[child {name}] Done! Slept for {aiosched.clock() - before:.2f} seconds") + raise TypeError("waa") + + +async def main(n: int): + print("[main] Spawning child") + task = await aiosched.spawn(child, "raise", n) + print("[main] Waiting for child") + before = aiosched.clock() + try: + await aiosched.wait(task) + except Exception as err: + print(f"[main] Child raised an exception -> {type(err).__name__}: {err}") + print(f"[main] Child exited in {aiosched.clock() - before:.2f} seconds") + + +aiosched.run(main, None, 5) diff --git a/tests/debugger.py b/tests/debugger.py new file mode 100644 index 0000000..57c7c81 --- /dev/null +++ b/tests/debugger.py @@ -0,0 +1,51 @@ +from aiosched.util.debugging import BaseDebugger + + +class Debugger(BaseDebugger): + """ + A simple debugger for aiosched + """ + + def on_start(self): + print("## Started running") + + def on_exit(self): + print("## Finished running") + + def on_task_schedule(self, task, delay: int): + print(f">> A task named '{task.name}' was scheduled to run in {delay:.2f} seconds") + + def on_task_spawn(self, task): + print(f">> A task named '{task.name}' was spawned") + + def on_task_exit(self, task): + print(f"<< Task '{task.name}' exited") + + def before_task_step(self, task): + print(f"-> About to run a step for '{task.name}'") + + def after_task_step(self, task): + print(f"<- Ran a step for '{task.name}'") + + def before_sleep(self, task, seconds): + print(f"# About to put '{task.name}' to sleep for {seconds:.2f} seconds") + + def after_sleep(self, task, seconds): + print(f"# Task '{task.name}' slept for {seconds:.2f} seconds") + + def before_io(self, timeout): + if timeout is None: + timeout = float("inf") + print(f"!! About to check for I/O for up to {timeout:.2f} seconds") + + def after_io(self, timeout): + print(f"!! Done I/O check (waited for {timeout:.2f} seconds)") + + def before_cancel(self, task): + print(f"// About to cancel '{task.name}'") + + def after_cancel(self, task): + print(f"// Cancelled '{task.name}'") + + def on_exception_raised(self, task, exc): + print(f"== '{task.name}' raised {repr(exc)}") \ No newline at end of file diff --git a/tests/wait.py b/tests/wait.py new file mode 100644 index 0000000..e2e84e5 --- /dev/null +++ b/tests/wait.py @@ -0,0 +1,27 @@ +import aiosched +from debugger import Debugger + + +async def child(name: str, n: int): + before = aiosched.clock() + print(f"[child {name}] Sleeping for {n} seconds") + await aiosched.sleep(n) + print(f"[child {name}] Done! Slept for {aiosched.clock() - before:.2f} seconds") + + +async def main(children: list[tuple[str, int]]): + tasks: list[aiosched.task.Task] = [] + print("[main] Spawning children") + for name, delay in children: + tasks.append(await aiosched.spawn(child, name, delay)) + print(f"[main] Spawned {len(tasks)} children") + before = aiosched.clock() + print(f"[main] Waiting for {len(tasks)} children") + for i, task in enumerate(tasks): + print(f"[main] Waiting for child #{i + 1}") + await aiosched.wait(task) + print(f"[main] Child #{i + 1} has exited") + print(f"[main] Children exited in {aiosched.clock() - before:.2f} seconds") + + +aiosched.run(main, Debugger(), [("first", 1), ("second", 2), ("third", 3)])