This repository has been archived on 2023-05-12. You can view files and clone it, but cannot push or open issues or pull requests.
aiosched/aiosched/context.py

113 lines
3.1 KiB
Python

"""
aiosched: Yet another Python async scheduler
Copyright (C) 2022 nocturn9x
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
https:www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
from aiosched.task import Task
from aiosched.internals.syscalls import spawn, wait, cancel
from typing import Any, Coroutine, Callable
class TaskContext(Task):
"""
An asynchronous task context that automatically waits
for all tasks spawned within it. A TaskContext object
behaves like a task and is handled as a single unit
inside the event loop
"""
def __init__(self) -> None:
"""
Object constructor
"""
# All the tasks that belong to this context. This
# includes any inner contexts contained within this
# one
self.tasks: list[Task | "TaskContext"] = []
# Whether we have been cancelled or not
self.cancelled: bool = False
super().__init__(f"TaskContext object at {hex(id(self))}", None)
async def spawn(
self, func: Callable[..., Coroutine[Any, Any, Any]], *args, **kwargs
) -> Task:
"""
Spawns a child task
"""
task = await spawn(func, *args, **kwargs)
self.tasks.append(task)
return task
async def __aenter__(self):
"""
Implements the asynchronous context manager interface
"""
return self
async def __aexit__(self, exc_type: Exception, exc: Exception, tb):
"""
Implements the asynchronous context manager interface, waiting
for all the tasks spawned inside the context
"""
for task in self.tasks:
# This forces the interpreter to stop at the
# end of the block and wait for all
# children to exit
try:
await wait(task)
self.tasks.remove(task)
except BaseException:
self.tasks.remove(task)
await self.cancel()
raise
async def cancel(self):
"""
Cancels the entire context, iterating over all
of its tasks and cancelling them
"""
for task in self.tasks:
await cancel(task)
self.cancelled = True
self.tasks = []
def done(self) -> bool:
"""
Returns True if all the tasks inside the
context have exited, False otherwise
"""
return all([task.done() for task in self.tasks])
def __del__(self):
"""
Context destructor
"""
for task in self.tasks:
task.__del__()
def __repr__(self):
"""
Implements repr(self)
"""
return f"TaskContext({self.tasks})"