From d10ae9c55bd4d26ed0a276d6f97268a15555bb11 Mon Sep 17 00:00:00 2001 From: Nocturn9x Date: Fri, 28 Apr 2023 16:04:30 +0200 Subject: [PATCH] Added task scopes. Many major fixes --- aiosched/__init__.py | 10 +- aiosched/context.py | 160 ++++++++++++++-------------- aiosched/internals/syscalls.py | 67 +++++++++--- aiosched/kernel.py | 110 +++++++++++++------ aiosched/runtime.py | 32 +++++- aiosched/sync.py | 4 +- aiosched/task.py | 4 +- aiosched/util/debugging.py | 10 +- tests/cancel.py | 4 +- tests/chatroom_server.py | 2 +- tests/context_catch.py | 2 +- tests/context_silent_catch.py | 21 ---- tests/context_timeout.py | 25 ++++- tests/context_wait.py | 2 +- tests/echo_server.py | 2 +- tests/events.py | 2 +- tests/memory_channel.py | 2 +- tests/nested_context_catch_inner.py | 9 +- tests/nested_context_catch_outer.py | 6 +- tests/nested_context_wait.py | 4 +- tests/network_channel.py | 2 +- tests/queue.py | 2 +- tests/raw_catch.py | 1 - tests/socket_ssl.py | 4 +- tests/timeout.py | 0 25 files changed, 287 insertions(+), 200 deletions(-) delete mode 100644 tests/context_silent_catch.py create mode 100644 tests/timeout.py diff --git a/aiosched/__init__.py b/aiosched/__init__.py index 610dba1..2e8800c 100644 --- a/aiosched/__init__.py +++ b/aiosched/__init__.py @@ -15,8 +15,8 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. """ -from aiosched.runtime import run, get_event_loop, new_event_loop, clock, with_context -from aiosched.internals.syscalls import spawn, wait, sleep, cancel, checkpoint, join +from aiosched.runtime import run, get_event_loop, new_event_loop, clock, create_pool, skip_after, with_timeout +from aiosched.internals.syscalls import spawn, wait, sleep, cancel, checkpoint import aiosched.util import aiosched.task import aiosched.errors @@ -34,7 +34,7 @@ __all__ = [ "task", "errors", "cancel", - "with_context", + "create_pool", "Event", "Queue", "Channel", @@ -42,5 +42,7 @@ __all__ = [ "checkpoint", "NetworkChannel", "socket", - "util" + "util", + "with_timeout", + "skip_after" ] diff --git a/aiosched/context.py b/aiosched/context.py index 1f4ea79..a10ba05 100644 --- a/aiosched/context.py +++ b/aiosched/context.py @@ -20,14 +20,59 @@ from aiosched.internals.syscalls import ( spawn, wait, cancel, - join, + set_context, + close_context, current_task, - sleep + sleep, + throw, + set_scope, + close_scope, + get_current_scope ) +from aiosched.sync import Event from typing import Any, Coroutine, Callable -class TaskContext: +class TaskScope: + def __init__(self, timeout: int | float = 0.0, silent: bool = False): + self.timeout = timeout + self.silent = silent + self.inner: TaskScope | None = None + self.outer: TaskScope | None = None + self.pools: list[TaskPool] = list() + self.waiter: Task | None = None + self.entry_point: Task | None = None + self.timed_out: bool = False + + async def _timeout_worker(self): + await sleep(self.timeout) + for pool in self.pools: + if not pool.done(): + self.timed_out = True + await pool.cancel() + if pool.entry_point is not self.entry_point: + await cancel(pool.entry_point, block=True) + if not self.entry_point.done(): + self.timed_out = True + # raise TimeoutError("timed out") + await throw(self.entry_point, TimeoutError("timed out")) + + async def __aenter__(self): + self.entry_point = await current_task() + await set_scope(self) + if self.timeout: + self.waiter = await spawn(self._timeout_worker) + return self + + async def __aexit__(self, exc_type: type, exception: Exception, tb): + await close_scope(self) + if not self.waiter.done(): + await cancel(self.waiter, block=True) + if exception is not None: + return self.silent + + +class TaskPool: """ An asynchronous context manager that automatically waits for all tasks spawned within it and cancels itself when @@ -35,7 +80,7 @@ class TaskContext: cancel inner ones if an exception is raised inside them """ - def __init__(self, silent: bool = False, gather: bool = True, timeout: int | float = 0.0) -> None: + def __init__(self, gather: bool = True) -> None: """ Object constructor """ @@ -44,29 +89,17 @@ class TaskContext: self.tasks: list[Task] = [] # Whether we have been cancelled or not self.cancelled: bool = False - # The context's entry point (needed to disguise ourselves as a task ;)) - self.entry_point: Task | TaskContext | None = None - # Do we ignore exceptions? - self.silent: bool = silent + # The context's entry point + self.entry_point: Task | TaskPool | None = None # Do we gather multiple exceptions from # children tasks? self.gather: bool = gather # TODO: Implement - # For how long do we allow tasks inside us - # to run? - self.timeout: int | float = timeout - self.timed_out: bool = False # Have we crashed? self.error: BaseException | None = None - - async def _timeout_worker(self): - await sleep(self.timeout) - if not self.done(): - self.error = TimeoutError("timed out") - self.timed_out = True - for task in self.tasks: - if task is self.entry_point or task.done(): - continue - await cancel(task, block=True) + # Data about inner and outer contexts + self.inner: TaskPool | None = None + self.outer: TaskPool | None = None + self.event: Event = Event() async def spawn( self, func: Callable[..., Coroutine[Any, Any, Any]], *args, **kwargs @@ -78,7 +111,6 @@ class TaskContext: task = await spawn(func, *args, **kwargs) task.context = self self.tasks.append(task) - await join(task) return task async def __aenter__(self): @@ -87,19 +119,12 @@ class TaskContext: """ self.entry_point = await current_task() + scope = await get_current_scope() + if scope: + scope.pools.append(self) + await set_context(self) return self - def __eq__(self, other): - """ - Implements self == other - """ - - if isinstance(other, TaskContext): - return super().__eq__(other) - elif isinstance(other, Task): - return other == self.entry_point - return False - async def __aexit__(self, exc_type: Exception, exc: Exception, tb): """ Implements the asynchronous context manager interface, waiting @@ -107,36 +132,31 @@ class TaskContext: exceptions """ - if self.timeout: - waiter = await spawn(self._timeout_worker) try: for task in self.tasks: # This forces the interpreter to stop at the # end of the block and wait for all # children to exit - if task is self.entry_point: - # We don't wait on the entry - # point because that's us! - # Besides, even if we tried, - # wait() would raise an error - # to avoid a deadlock - continue await wait(task) - except BaseException as exc: - await self.cancel(False) + if self.inner: + # We wait for inner contexts to terminate + await self.event.wait() + except (Exception, KeyboardInterrupt) as exc: + if not self.cancelled: + await self.cancel() self.error = exc finally: - if self.timeout and not waiter.done(): - await cancel(waiter, block=True) self.entry_point.propagate = True - if self.silent: - return - if self.error: + await close_context(self) + self.entry_point.context = None + if self.outer: + # We reschedule the entry point of the outer + # context once we're done + await self.outer.event.trigger() + if self.error and not self.outer: raise self.error - # Task method wrappers - - async def cancel(self, propagate: bool = True): + async def cancel(self): """ Cancels the entire context, iterating over all of its tasks (which includes inner contexts) @@ -144,20 +164,10 @@ class TaskContext: """ for task in self.tasks: - if task is self.entry_point: - continue - if isinstance(task, Task): - await cancel(task) - else: - task: TaskContext - await task.cancel(propagate) + await cancel(task, block=True) + if self.inner: + await self.inner.cancel() self.cancelled = True - if propagate: - if isinstance(self.entry_point, Task): - await cancel(self.entry_point) - else: - self.entry_point: TaskContext - await self.entry_point.cancel(propagate) def done(self) -> bool: """ @@ -168,28 +178,14 @@ class TaskContext: for task in self.tasks: if not task.done(): return False - return True - - def __hash__(self): - return self.entry_point.__hash__() - - def run(self, what: Any | None = None): - return self.entry_point.run(what) - - def __del__(self): - """ - Context destructor - """ - - for task in self.tasks: - task.__del__() + return self.entry_point.done() def __repr__(self): """ Implements repr(self) """ - result = "TaskContext([" + result = "TaskPool([" for i, task in enumerate(self.tasks): result += repr(task) if i < len(self.tasks) - 1: diff --git a/aiosched/internals/syscalls.py b/aiosched/internals/syscalls.py index e0e0972..e695bde 100644 --- a/aiosched/internals/syscalls.py +++ b/aiosched/internals/syscalls.py @@ -120,18 +120,6 @@ async def current_task() -> Task: return await syscall("get_current_task") -async def join(task: Task): - """ - Tells the event loop that the current task - wants to wait on the given one, but without - waiting for its completion. This is a low - level trap and should not be used on its - own - """ - - await syscall("join", task) - - async def wait(task: Task) -> Any | None: """ Waits for the completion of a @@ -149,10 +137,7 @@ async def wait(task: Task) -> Any | None: :returns: The task's return value, if any """ - if task == await current_task(): - # We don't do an "x is y" check because - # tasks and task contexts can compare equal - # despite having different memory addresses + if task is await current_task(): raise SchedulerError("a task cannot join itself") await syscall("wait", task) if task.exc and task.state != TaskState.CANCELLED and task.propagate: @@ -179,6 +164,8 @@ async def cancel(task: Task, block: bool = False): :type block: bool, optional """ + if task.done(): + return await syscall("cancel", task) if block: await wait(task) @@ -223,3 +210,51 @@ async def io_release(stream): """ await syscall("io_release", stream) + + +async def set_context(ctx): + """ + Sets the current task context + """ + + await syscall("set_context", ctx) + + +async def close_context(ctx): + """ + Closes the current task context + """ + + await syscall("close_context", ctx) + + +async def set_scope(scope): + """ + Sets the current task scope + """ + + await syscall("set_scope", scope) + + +async def close_scope(scope): + """ + Closes the current task scope + """ + + await syscall("close_scope", scope) + + +async def get_current_scope(): + """ + Returns the current task scope + """ + + return await syscall("get_current_scope") + + +async def throw(task, ctx): + """ + Throws the given exception in the given task + """ + + await syscall("throw", task, ctx) diff --git a/aiosched/kernel.py b/aiosched/kernel.py index cad31ce..0502fef 100644 --- a/aiosched/kernel.py +++ b/aiosched/kernel.py @@ -15,7 +15,6 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. """ -import random import signal import itertools from collections import deque @@ -32,6 +31,7 @@ from aiosched.errors import ( ResourceClosed, ResourceBroken, ) +from aiosched.context import TaskPool, TaskScope from selectors import DefaultSelector, BaseSelector, EVENT_READ, EVENT_WRITE @@ -91,6 +91,10 @@ class FIFOKernel: self._sigint_handled: bool = False # Are we executing any task code? self._running: bool = False + # The current context we're in + self.current_context: TaskPool | None = None + # The current task scope we're in + self.current_scope: TaskScope | None = None def __repr__(self): """ @@ -123,7 +127,7 @@ class FIFOKernel: # We reschedule the current task # immediately no matter what it's # doing so that we process the - # exception immediately + # exception right away self.reschedule_running() def done(self) -> bool: @@ -176,6 +180,7 @@ class FIFOKernel: for task in self.all(): self.io_release_task(task) + self.paused.discard(task) self.selector.close() self.close() @@ -207,12 +212,6 @@ class FIFOKernel: for key, _ in self.selector.select(timeout): key.data: dict[int, Task] for task in key.data.values(): - # We don't reschedule a task that wasn't - # blocking on I/O before: this way if a - # task waits on a socket and then goes to - # sleep, it won't be woken up early if the - # resource becomes available before its - # deadline expires self.run_ready.append(task) # Resource ready? Schedule its task self.debugger.after_io(self.clock() - before_time) @@ -259,6 +258,58 @@ class FIFOKernel: self.current_task.state = TaskState.PAUSED + def set_context(self, ctx: TaskPool): + """ + Sets the current task context + """ + + self.debugger.on_context_creation(ctx) + self.current_task.context = ctx + if not self.current_context: + self.current_context = ctx + else: + self.current_context.inner = ctx + ctx.outer = self.current_context + self.current_context = ctx + self.reschedule_running() + + def close_context(self, ctx: TaskPool): + """ + Closes the given context + """ + + ctx.inner = None + self.debugger.on_context_exit(ctx) + ctx.entry_point.context = None + self.current_context = ctx.outer + self.reschedule_running() + + def set_scope(self, scope: TaskScope): + """ + Sets the current task scope + """ + + if not self.current_scope: + self.current_scope = scope + else: + self.current_scope.inner = scope + scope.outer = self.current_scope + self.current_scope = scope + self.reschedule_running() + + def close_scope(self, scope: TaskScope): + """ + Closes the given scope + """ + + scope.inner = None + self.current_scope = scope.outer + self.reschedule_running() + + def get_current_scope(self): + self.data[self.current_task] = self.current_scope + self.reschedule_running() + def run_task_step(self): """ Runs a single step for the current task. @@ -293,10 +344,6 @@ class FIFOKernel: # We perform the deferred cancellation # if it was previously scheduled self.cancel(self.current_task) - elif exc := self.current_task.pending_exception: - self.current_task.pending_exception = None - self.reschedule_running() - self.current_task.throw(exc) else: # Some debugging and internal chatter here self.current_task.steps += 1 @@ -412,7 +459,7 @@ class FIFOKernel: self.selector.unregister(resource) self.debugger.on_io_unschedule(resource) if resource is self.current_task.last_io[1]: - self.current_task.last_io = () + self.current_task.last_io = None self.reschedule_running() def io_release_task(self, task: Task): @@ -429,7 +476,7 @@ class FIFOKernel: continue self.notify_closing(key.fileobj, broken=True) self.selector.unregister(key.fileobj) - task.last_io = () + task.last_io = None def get_active_io_count(self) -> int: """ @@ -476,12 +523,20 @@ class FIFOKernel: it fails """ + self.paused.discard(task) + self.io_release_task(task) self.handle_errors(partial(task.throw, Cancelled(task)), task) if task.state != TaskState.CANCELLED: task.pending_cancellation = True - else: - self.io_release_task(task) - self.paused.discard(task) + self.run_ready.append(task) + if self.current_task not in self.run_ready: + self.reschedule_running() + + def throw(self, task, error): + self.paused.discard(task) + self.io_release_task(task) + self.handle_errors(partial(task.throw, error), task) + self.run_ready.appendleft(task) self.reschedule_running() def handle_errors(self, func: Callable, task: Task | None = None): @@ -520,6 +575,7 @@ class FIFOKernel: task = task or self.current_task task.state = TaskState.CANCELLED task.pending_cancellation = False + self.io_release_task(self.current_task) self.debugger.after_cancel(task) self.wait(task) except (Exception, KeyboardInterrupt) as err: @@ -527,6 +583,7 @@ class FIFOKernel: task = task or self.current_task task.exc = err task.state = TaskState.CRASHED + self.io_release_task(self.current_task) self.debugger.on_exception_raised(task, err) self.wait(task) @@ -553,25 +610,12 @@ class FIFOKernel: executing """ - if task != self.current_task: + if task is not self.current_task: task.joiners.add(self.current_task) if task.done(): + self.paused.discard(task) self.io_release_task(task) self.run_ready.extend(task.joiners) - for joiner in task.joiners: - joiner.pending_exception = task.exc - - def join(self, task: Task): - """ - Tells the event loop that the current task - wants to wait on the given one, but without - actually waiting for its completion. This is - an internal method and should not be used outside - the kernel machinery - """ - - task.joiners.add(self.current_task) - self.reschedule_running() def spawn(self, func: Callable[..., Coroutine[Any, Any, Any]], *args, **kwargs): """ @@ -661,4 +705,4 @@ class FIFOKernel: # but having two tasks reading/writing at the # same time can't lead to anything good, better # disallow it - self.current_task.throw(ResourceBusy(f"The resource is being read from/written by another task")) + self.current_task.throw(ResourceBusy(f"The resource is being read from/written to by another task")) diff --git a/aiosched/runtime.py b/aiosched/runtime.py index 665adb1..30d6c60 100644 --- a/aiosched/runtime.py +++ b/aiosched/runtime.py @@ -23,7 +23,7 @@ from aiosched.kernel import FIFOKernel from aiosched.errors import SchedulerError from aiosched.util.debugging import BaseDebugger from typing import Coroutine, Callable, Any -from aiosched.context import TaskContext +from aiosched.context import TaskPool, TaskScope local_storage = local() @@ -84,14 +84,36 @@ def run(func: Callable[[Any, Any], Coroutine[Any, Any, Any]], *args, **kwargs): get_event_loop().start(func, *args, **kwargs) -def with_context(*args, **kwargs) -> TaskContext: +def create_pool(*args, **kwargs) -> TaskPool: """ - Creates and returns a new TaskContext + Creates and returns a new TaskPool object. All positional and keyword arguments - are passed to the TaskContext constructor + are passed to the TaskPool constructor """ - return TaskContext(*args, **kwargs) + return TaskPool(*args, **kwargs) + + +def with_timeout(timeout: int | float) -> TaskScope: + """ + Returns a new task scope with the + specified timeout. A TimeoutError + exception is raised if the timeout + expires + """ + + return TaskScope(timeout=timeout) + + +def skip_after(timeout: int | float) -> TaskScope: + """ + Returns a new task scope with the specified + timeout. No exception is raised if the timeout + expires, but the timed_out attribute of the scope + is set accordingly + """ + + return TaskScope(timeout=timeout, silent=True) def clock() -> float: diff --git a/aiosched/sync.py b/aiosched/sync.py index fefd317..4ce6eb4 100644 --- a/aiosched/sync.py +++ b/aiosched/sync.py @@ -321,7 +321,7 @@ class NetworkChannel(Channel): if self.closed: return False - elif self.reader.fileno == -1: + elif self.reader.fileno() == -1: return False else: try: @@ -369,7 +369,7 @@ class Lock: if self.owner is None: raise RuntimeError("lock is not acquired") elif self.owner is not task: - raise RuntimeError("lock can only released by its owner") + raise RuntimeError("lock can only be released by its owner") elif self.tasks: await self.tasks.popleft().trigger() else: diff --git a/aiosched/task.py b/aiosched/task.py index 9e8fc14..28943b6 100644 --- a/aiosched/task.py +++ b/aiosched/task.py @@ -80,11 +80,9 @@ class Task: # Is this task within a context? This is needed to fix a bug that would occur when # the event loop tries to raise the exception caused by first task that kicked the # loop even if that context already ignored said error - context: "TaskContext" = field(default=None, repr=False) + context: "TaskPool" = field(default=None, repr=False) # We propagate exception only at the first call to wait() propagate: bool = True - # Do we have any exceptions pending? - pending_exception: Exception | None = None def run(self, what: Any | None = None): """ diff --git a/aiosched/util/debugging.py b/aiosched/util/debugging.py index a1666d7..8416c71 100644 --- a/aiosched/util/debugging.py +++ b/aiosched/util/debugging.py @@ -1,6 +1,6 @@ from abc import ABC, abstractmethod from aiosched.task import Task -from aiosched.context import TaskContext +from aiosched.context import TaskPool from selectors import EVENT_READ, EVENT_WRITE @@ -196,28 +196,28 @@ class BaseDebugger(ABC): return NotImplemented @abstractmethod - def on_context_creation(self, ctx: TaskContext): + def on_context_creation(self, ctx: TaskPool): """ This method is called right after a task context is initialized, i.e. when set_context in the event loop is called :param ctx: The context object - :type ctx: TaskContext + :type ctx: TaskPool :return: """ return NotImplemented @abstractmethod - def on_context_exit(self, ctx: TaskContext): + def on_context_exit(self, ctx: TaskPool): """ This method is called right before a task context is closed, i.e. when close_context in the event loop is called :param ctx: The context object - :type ctx: TaskContext + :type ctx: TaskPool :return: """ diff --git a/tests/cancel.py b/tests/cancel.py index 8a1554e..9574b3f 100644 --- a/tests/cancel.py +++ b/tests/cancel.py @@ -21,12 +21,12 @@ async def main(children: list[tuple[str, int]]): print(f"[main] Spawned {len(tasks)} children") print(f"[main] Cancelling a random child") cancelled = random.choice(tasks) - await aiosched.cancel(cancelled) + await aiosched.cancel(cancelled, block=True) tasks.remove(cancelled) print(f"[main] Waiting for {len(tasks)} children") before = aiosched.clock() for i, task in enumerate(tasks): - print(f"[main] Waiting for child #{i + 1}") + print(f"[main] Waiting for child #{i + 1} ({int(task.next_deadline - task.paused_when)})") await aiosched.wait(task) print(f"[main] Child #{i + 1} has exited") print(f"[main] Children exited in {aiosched.clock() - before:.2f} seconds") diff --git a/tests/chatroom_server.py b/tests/chatroom_server.py index 63b755c..a612c67 100644 --- a/tests/chatroom_server.py +++ b/tests/chatroom_server.py @@ -20,7 +20,7 @@ async def serve(bind_address: tuple): await sock.bind(bind_address) await sock.listen(5) logging.info(f"Serving asynchronously at {bind_address[0]}:{bind_address[1]}") - async with aiosched.with_context() as ctx: + async with aiosched.create_pool() as ctx: async with sock: while True: try: diff --git a/tests/context_catch.py b/tests/context_catch.py index 2d9f48f..1cc0d6b 100644 --- a/tests/context_catch.py +++ b/tests/context_catch.py @@ -4,7 +4,7 @@ from raw_catch import child_raises async def main(children: list[tuple[str, int]]): try: - async with aiosched.with_context() as ctx: + async with aiosched.create_pool() as ctx: print("[main] Spawning children") for name, delay in children: await ctx.spawn(child_raises, name, delay) diff --git a/tests/context_silent_catch.py b/tests/context_silent_catch.py deleted file mode 100644 index bf198b8..0000000 --- a/tests/context_silent_catch.py +++ /dev/null @@ -1,21 +0,0 @@ -import aiosched -from raw_catch import child - - - -async def main(children: list[tuple[str, int]]): - async with aiosched.with_context(silent=True) as ctx: - print("[main] Spawning children") - for name, delay in children: - await ctx.spawn(child, name, delay) - print("[main] Children spawned") - before = aiosched.clock() - if ctx.exc: - print( - f"[main] Child raised an exception -> {type(ctx.exc).__name__}: {ctx.exc}" - ) - print(f"[main] Children exited in {aiosched.clock() - before:.2f} seconds") - - -if __name__ == "__main__": - aiosched.run(main, [("first", 1), ("second", 2), ("third", 3)], debugger=None) diff --git a/tests/context_timeout.py b/tests/context_timeout.py index e510ea0..c96696a 100644 --- a/tests/context_timeout.py +++ b/tests/context_timeout.py @@ -4,11 +4,26 @@ from raw_wait import child async def main(children: list[tuple[str, int]]): print("[main] Spawning children") - async with aiosched.with_context(timeout=4, silent=True) as ctx: - for name, delay in children: - await ctx.spawn(child, name, delay) - print("[main] Children spawned") - before = aiosched.clock() + # Only the first two children will complete + before = aiosched.clock() + # This block will not run longer than 5 seconds + async with aiosched.skip_after(5): + async with aiosched.create_pool() as pool: + for name, delay in children: + await pool.spawn(child, name, delay) + print("[main] Children spawned") + # The timeout doesn't apply just to child tasks, + # but rather to the entire indented block! This + # means that even things that are awaited instead + # of spawned will get cancelled when the timeout + # expires. This only works because we created a + # task scope that encompasses this whole block! + await aiosched.sleep(50) + print("This will never be printed") + # When using skip_after, no exception is raised when a timeout + # expires. If you want to handle an exception, you can use with_timeout() + # instead: when the timeout expires, a TimeoutError exception will be raised + # instead. print(f"[main] Children exited in {aiosched.clock() - before:.2f} seconds") diff --git a/tests/context_wait.py b/tests/context_wait.py index 2c3d140..ee6aa6a 100644 --- a/tests/context_wait.py +++ b/tests/context_wait.py @@ -4,7 +4,7 @@ from raw_wait import child async def main(children: list[tuple[str, int]]): print("[main] Spawning children") - async with aiosched.with_context() as ctx: + async with aiosched.create_pool() as ctx: for name, delay in children: await ctx.spawn(child, name, delay) print("[main] Children spawned") diff --git a/tests/echo_server.py b/tests/echo_server.py index 560711b..a471b1f 100644 --- a/tests/echo_server.py +++ b/tests/echo_server.py @@ -18,7 +18,7 @@ async def serve(bind_address: tuple): await sock.bind(bind_address) await sock.listen(5) logging.info(f"Serving asynchronously at {bind_address[0]}:{bind_address[1]}") - async with aiosched.with_context() as ctx: + async with aiosched.create_pool() as ctx: async with sock: while True: try: diff --git a/tests/events.py b/tests/events.py index 391c3be..313b652 100644 --- a/tests/events.py +++ b/tests/events.py @@ -18,7 +18,7 @@ async def child(ev: aiosched.Event, pause: int): async def parent(pause: int = 1): - async with aiosched.with_context() as ctx: + async with aiosched.create_pool() as ctx: event = aiosched.Event() print("[parent] Spawning child task") await ctx.spawn(child, event, pause + 2) diff --git a/tests/memory_channel.py b/tests/memory_channel.py index f5c2375..3a36bf7 100644 --- a/tests/memory_channel.py +++ b/tests/memory_channel.py @@ -22,7 +22,7 @@ async def receiver(c: aiosched.MemoryChannel): async def main(channel: aiosched.MemoryChannel, n: int): print("Starting sender and receiver") - async with aiosched.with_context() as ctx: + async with aiosched.create_pool() as ctx: await ctx.spawn(sender, channel, n) await ctx.spawn(receiver, channel) print("All done!") diff --git a/tests/nested_context_catch_inner.py b/tests/nested_context_catch_inner.py index 6d33a96..8a03809 100644 --- a/tests/nested_context_catch_inner.py +++ b/tests/nested_context_catch_inner.py @@ -3,21 +3,20 @@ from raw_catch import child_raises from raw_wait import child as successful - async def main( children_outer: list[tuple[str, int]], children_inner: list[tuple[str, int]] ): before = aiosched.clock() - async with aiosched.with_context() as ctx: - print("[main] Spawning children in first context") + async with aiosched.create_pool() as ctx: + print(f"[main] Spawning children in first context ({hex(id(ctx))})") for name, delay in children_outer: await ctx.spawn(successful, name, delay) print("[main] Children spawned") # An exception in an outer context cancels everything # inside it, but an exception in an inner context does # not affect outer ones - async with aiosched.with_context() as ctx2: - print("[main] Spawning children in second context") + async with aiosched.create_pool() as ctx2: + print(f"[main] Spawning children in second context ({hex(id(ctx2))})") for name, delay in children_inner: await ctx2.spawn(child_raises, name, delay) print("[main] Children spawned") diff --git a/tests/nested_context_catch_outer.py b/tests/nested_context_catch_outer.py index 7e9a3e2..1b38042 100644 --- a/tests/nested_context_catch_outer.py +++ b/tests/nested_context_catch_outer.py @@ -2,19 +2,17 @@ import aiosched from raw_catch import child_raises - -# TODO: This crashes 1 second later than it should be async def main( children_outer: list[tuple[str, int]], children_inner: list[tuple[str, int]] ): try: - async with aiosched.with_context() as ctx: + async with aiosched.create_pool() as ctx: before = aiosched.clock() print("[main] Spawning children in first context") for name, delay in children_outer: await ctx.spawn(child_raises, name, delay) print("[main] Children spawned") - async with aiosched.with_context() as ctx2: + async with aiosched.create_pool() as ctx2: print("[main] Spawning children in second context") for name, delay in children_inner: await ctx2.spawn(child_raises, name, delay) diff --git a/tests/nested_context_wait.py b/tests/nested_context_wait.py index 3c2b1ac..457795c 100644 --- a/tests/nested_context_wait.py +++ b/tests/nested_context_wait.py @@ -6,13 +6,13 @@ from raw_wait import child async def main( children_outer: list[tuple[str, int]], children_inner: list[tuple[str, int]] ): - async with aiosched.with_context() as ctx: + async with aiosched.create_pool() as ctx: before = aiosched.clock() print("[main] Spawning children in first context") for name, delay in children_outer: await ctx.spawn(child, name, delay) print("[main] Children spawned") - async with aiosched.with_context() as ctx2: + async with aiosched.create_pool() as ctx2: print("[main] Spawning children in second context") for name, delay in children_inner: await ctx2.spawn(child, name, delay) diff --git a/tests/network_channel.py b/tests/network_channel.py index 8ea5bb8..8ab7f67 100644 --- a/tests/network_channel.py +++ b/tests/network_channel.py @@ -27,7 +27,7 @@ async def consumer(c: aiosched.NetworkChannel): async def main(channel: aiosched.NetworkChannel, n: int): t = aiosched.clock() print("[main] Starting children") - async with aiosched.with_context() as ctx: + async with aiosched.create_pool() as ctx: await ctx.spawn(consumer, channel) await ctx.spawn(producer, channel, n) print(f"[main] All done in {aiosched.clock() - t:.2f} seconds") diff --git a/tests/queue.py b/tests/queue.py index 23d1339..182f59b 100644 --- a/tests/queue.py +++ b/tests/queue.py @@ -30,7 +30,7 @@ async def consumer(q: aiosched.Queue): async def main(q: aiosched.Queue, n: int): print("Starting consumer and producer") - async with aiosched.with_context() as ctx: + async with aiosched.create_pool() as ctx: await ctx.spawn(producer, q, n) await ctx.spawn(consumer, q) print("Bye!") diff --git a/tests/raw_catch.py b/tests/raw_catch.py index f4ee7a0..4994ee7 100644 --- a/tests/raw_catch.py +++ b/tests/raw_catch.py @@ -1,7 +1,6 @@ import aiosched - async def child_raises(name: str, n: int): before = aiosched.clock() print(f"[child {name}] Sleeping for {n} seconds") diff --git a/tests/socket_ssl.py b/tests/socket_ssl.py index e772f4b..e8238b6 100644 --- a/tests/socket_ssl.py +++ b/tests/socket_ssl.py @@ -31,7 +31,7 @@ async def test(host: str, port: int, bufsize: int = 4096): print(f"Attempting a connection to {host}:{port}") await socket.connect((host, port)) print("Connected") - async with aiosched.with_context(timeout=5, silent=True) as ctx: + async with aiosched.skip_after(5) as scope: async with socket: # Closes the socket automatically print("Entered socket context manager, sending request data") @@ -51,7 +51,7 @@ async def test(host: str, port: int, bufsize: int = 4096): break if buffer: data = buffer.decode().split("\r\n") - print(f"HTTP Response below {'(might be incomplete)' if ctx.timed_out else ''}:") + print(f"HTTP Response below {'(might be incomplete)' if scope.timed_out else ''}:") _print(f"Response: {data[0]}") _print("Headers:") content = False diff --git a/tests/timeout.py b/tests/timeout.py new file mode 100644 index 0000000..e69de29