""" aiosched: Yet another Python async scheduler Copyright (C) 2022 nocturn9x Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at https:www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. """ import warnings from enum import Enum, auto from typing import Coroutine, Any from dataclasses import dataclass, field class TaskState(Enum): """ An enumeration of task states """ # Task has been created and is # ready to run INIT: int = auto() # Task is executing synchronous code RUN: int = auto() # Task is waiting on an I/O resource IO: int = auto() # Task is sleeping or waiting on an # event PAUSED: int = auto() # Task has exited with an exception CRASHED: int = auto() # Task has been cancelled (either # explicitly or implicitly) CANCELLED: int = auto() # Task has finished executing normally FINISHED: int = auto() @dataclass class Task: """ A simple wrapper around a coroutine object """ # The name of the task. Usually this equals self.coroutine.__name__, # but it may fall back to repr(self.coroutine) name: str # The underlying coroutine object to wrap coroutine: Coroutine # This attribute will be None unless the task raised an error exc: BaseException | None = None # The return value of the coroutine result: Any | None = None # Task status state: int = TaskState.INIT # This attribute counts how many times the task's run() method has been called steps: int = 0 # Simple optimization to improve the selector's efficiency. Stores the task's last # I/O operation as well as a reference to the file descriptor it was performed on last_io: tuple[int, Any] | None = None # All the tasks waiting on this task's completion joiners: set["Task"] = field(default_factory=set) # Whether this task has a pending cancellation scheduled. This allows us to delay # cancellation delivery as soon as the task calls another loop primitive pending_cancellation: bool = False # The time when the task was put on the waiting queue paused_when: float = 0.0 # The next deadline, in terms of the absolute clock of the loop, associated to the task next_deadline: float = 0.0 # Is this task within a context? This is needed to fix a bug that would occur when # the event loop tries to raise the exception caused by first task that kicked the # loop even if that context already ignored said error context: "TaskContext" = None # We propagate exception only at the first call to wait() propagate: bool = True def run(self, what: Any | None = None): """ Simple abstraction layer over a coroutine's send method. Does nothing if the task has already exited :param what: The object that has to be sent to the coroutine, defaults to None :type what: Any, optional """ if self.done(): return return self.coroutine.send(what) def throw(self, err: BaseException): """ Simple abstraction layer over a coroutine's throw method. Does nothing if the task has already exited :param err: The exception that has to be raised inside the task :type err: BaseException """ if self.done(): return return self.coroutine.throw(err) def __hash__(self): """ Implements hash(self) """ return hash(self.coroutine) def done(self): """ Returns True if the task is not running, False otherwise """ return self.state in [ TaskState.CANCELLED, TaskState.CRASHED, TaskState.FINISHED, ] def __del__(self): """ Task destructor """ if not self.done(): warnings.warn(f"task '{self.name}' was destroyed, but it has not completed yet") if self.last_io: warnings.warn(f"task '{self.name}' was destroyed, but it has pending I/O")