Initial experimental work for auto-joining process pools

This commit is contained in:
Mattia Giambirtone 2024-03-11 12:17:41 +01:00
parent 3f48c74346
commit d7070c8bbf
9 changed files with 151 additions and 53 deletions

View File

@ -461,6 +461,13 @@ class BaseIOManager(ABC):
resource, if any (None otherwise)
"""
@abstractmethod
def clear(self):
"""
Resets the manager by clearing all I/O
requests currently scheduled
"""
class SignalManager(ABC):
"""
@ -567,7 +574,7 @@ class BaseKernel(ABC):
def spawn(self, func: Callable[[Any, Any], Coroutine[Any, Any, Any]], *args):
"""
Readies a task for execution. All positional arguments are passed
to the given coroutine (for keyword arguments, use functools.partial)
to the given coroutine (for keyword arguments, use `functools.partial`)
"""
return NotImplemented
@ -579,7 +586,9 @@ class BaseKernel(ABC):
"""
Spawns a system task. System tasks run in a special internal
task pool and begin execution in a scope shielded by cancellations
and with Ctrl+C protection enabled
and with Ctrl+C protection enabled. Please note that if a system
tasks raises an exception, all tasks are cancelled and the exception
is propagated into the loop's entry point
"""
return NotImplemented
@ -742,17 +751,41 @@ class BaseKernel(ABC):
def close(self, force: bool = False):
"""
Terminates and shuts down the event loop
This method is meant to be extended by
implementations to do their own cleanup
Terminates and shuts down the event loop.
This method is meant to be extended (*not*
overridden!) by other implementations to do
their own cleanup
:param force: When force equals false,
:param force: When force equals False,
the default, and the event loop is
not done, this function raises a
StructIOException
StructIOException. If True, implementors
should cancel all tasks and shut down the
event loop
"""
if not self.done() and not force:
raise StructIOException("the event loop is running")
@abstractmethod
def add_shutdown_task(self, func: Callable[[Any, Any], Coroutine[Any, Any, Any]], *args) -> Any:
"""
Registers a task to be run right before the event loop shuts
down. The task is spawned as a system task only when the event
loop shuts down cleanly. Note that shutdown tasks are started all
at once in no particular order, so if you need them to do so in a
deterministic way, the burden of synchronizing them is on you (fortunately,
structio's synchronization primitives make that rather easy). Returns a
unique identifier that can be used to unregister the shutdown task
"""
return NotImplemented
@abstractmethod
def remove_shutdown_task(self, ident: Any) -> bool:
"""
Unregisters a previously registered shutdown task.
Returns whether a task was actually removed
"""
return NotImplemented

View File

@ -22,6 +22,7 @@ from structio.exceptions import (
)
from collections import deque
from typing import Callable, Coroutine, Any
from itertools import count
from functools import partial
import signal
import sniffio
@ -46,6 +47,8 @@ class FIFOKernel(BaseKernel):
)
# Tasks that are ready to run
self.run_queue: deque[Task] = deque()
self.shutdown_tasks: list[tuple[int, Callable[[Any, Any], Coroutine[Any, Any, Any]], list[Any]]] = []
self._shutdown_task_ident = count(0)
# Data to send back to tasks
self.data: dict[Task, Any] = {}
# Have we handled SIGINT?
@ -55,7 +58,7 @@ class FIFOKernel(BaseKernel):
self.pool = TaskPool()
self.pool.scope.shielded = True
self.current_scope = self.pool.scope
self.current_scope.shielded = False
self._shutting_down = False
def get_closest_deadline(self):
if self.run_queue:
@ -118,7 +121,7 @@ class FIFOKernel(BaseKernel):
self.tools.remove(tool)
def done(self):
if self.entry_point.done():
if not self._shutting_down and self.entry_point.done():
return True
if any([self.run_queue, self.paused, self.io_manager.pending()]):
return False
@ -132,14 +135,18 @@ class FIFOKernel(BaseKernel):
*args,
ki_protected: bool = False,
pool: TaskPool = None,
system_task: bool = False,
):
if isinstance(func, partial):
name = func.func.__name__ or repr(func.func)
else:
name = func.__name__ or repr(func)
if pool is None:
pool = self.current_pool
task = Task(name, func(*args), pool)
if system_task:
pool = self.pool
else:
pool = self.current_pool
task = Task(name, func(*args), pool, is_system_task=system_task)
# We inject our magic secret variable into the coroutine's stack frame, so
# we can look it up later
task.coroutine.cr_frame.f_locals.setdefault(
@ -152,7 +159,7 @@ class FIFOKernel(BaseKernel):
def spawn_system_task(
self, func: Callable[[Any, Any], Coroutine[Any, Any, Any]], *args
):
return self.spawn(func, *args, ki_protected=True, pool=self.pool)
return self.spawn(func, *args, ki_protected=True, system_task=True)
def signal_notify(self, sig: int, frame: FrameType):
match sig:
@ -187,7 +194,10 @@ class FIFOKernel(BaseKernel):
self.current_task.paused_when = 0
self.current_pool = self.current_task.pool
self.current_scope = self.current_pool.scope
method, args, kwargs = runner()
data = self.handle_errors(runner, self.current_task)
if data is None:
return
method, args, kwargs = data
self.current_task.state = TaskState.PAUSED
self.current_task.paused_when = self.clock.current_time()
if not callable(getattr(self, method, None)):
@ -272,6 +282,16 @@ class FIFOKernel(BaseKernel):
)
self.reschedule(task)
def _tick(self):
if self._sigint_handled and not self.restrict_ki_to_checkpoints:
self.throw(self.entry_point, KeyboardInterrupt())
if self.run_queue:
self.step()
self.wakeup()
self.check_scopes()
if self.io_manager.pending():
self.io_manager.wait_io(self.clock.current_time())
def run(self):
"""
This is the actual "loop" part
@ -279,14 +299,19 @@ class FIFOKernel(BaseKernel):
"""
while not self.done():
if self._sigint_handled and not self.restrict_ki_to_checkpoints:
self.throw(self.entry_point, KeyboardInterrupt())
if self.run_queue:
self.handle_errors(self.step)
self.wakeup()
self.check_scopes()
if self.io_manager.pending():
self.io_manager.wait_io(self.clock.current_time())
self._tick()
self.pool.scope.cancel()
self.io_manager.clear()
self._shutting_down = True
self.pool = TaskPool()
self.pool.scope.shielded = True
self.current_pool = self.pool
self.current_scope = self.pool.scope
for _, func, args in self.shutdown_tasks:
self.spawn_system_task(func, *args)
self.pool.scope.owner = None
while not self.done():
self._tick()
self.close()
def reschedule_running(self):
@ -296,7 +321,7 @@ class FIFOKernel(BaseKernel):
self.reschedule(self.current_task)
def handle_errors(self, func: Callable, task: Task | None = None):
def handle_errors(self, func: Callable, task: Task):
"""
Convenience method for handling various exceptions
from tasks
@ -304,11 +329,8 @@ class FIFOKernel(BaseKernel):
old_name, sniffio.thread_local.name = sniffio.thread_local.name, "structured-io"
try:
func()
return func()
except StopIteration as ret:
# We re-define it because we call step() with
# this method and that changes the current task
task = task or self.current_task
# At the end of the day, coroutines are generator functions with
# some tricky behaviors, and this is one of them. When a coroutine
# hits a return statement (either explicit or implicit), it raises
@ -330,14 +352,12 @@ class FIFOKernel(BaseKernel):
# to re-raise cancellations at every checkpoint until the task lets the
# exception propagate into us, because we *really* want the task to be
# cancelled
task = task or self.current_task
task.state = TaskState.CANCELLED
task.pending_cancellation = False
self.on_cancel(task)
self.event("after_cancel", task)
except (Exception, KeyboardInterrupt) as err:
# Any other exception is caught here
task = task or self.current_task
task.exc = err
err.scope = task.pool.scope
task.state = TaskState.CRASHED
@ -368,13 +388,18 @@ class FIFOKernel(BaseKernel):
# Walk up the scope tree and reschedule all necessary
# tasks
scope = task.pool.scope
while scope.done() and scope is not self.pool.scope:
while scope and scope.done() and scope is not self.pool.scope:
if scope.done():
self.reschedule(scope.owner)
scope = scope.outer
self.event("on_task_exit", task)
self.release(task)
def close(self, force: bool = False):
super().close(force)
# Cancel *ALL* tasks
self.pool.scope.cancel()
def on_error(self, task: Task):
"""
The given task raised an exception
@ -382,6 +407,9 @@ class FIFOKernel(BaseKernel):
assert task.state == TaskState.CRASHED
self.event("on_exception_raised", task, task.exc)
if task.is_system_task:
self.close(force=True)
raise task.exc from StructIOException("system task crashed")
scope = task.pool.scope
if task is not scope.owner:
self.reschedule(scope.owner)
@ -434,7 +462,7 @@ class FIFOKernel(BaseKernel):
# current task because this method is
# called synchronously by TaskScope.cancel(),
# so there is nowhere to throw an exception
# to
# to via throw()
if self.current_task in scope.tasks and self.current_task is not scope.owner:
self.current_task.pending_cancellation = True
for child in filter(lambda c: not c.shielded, scope.children):
@ -447,6 +475,7 @@ class FIFOKernel(BaseKernel):
scope is not self.current_task.pool.scope
and scope.owner is not self.current_task
and scope.owner is not self.entry_point
and scope.owner is not None
):
# Handles the case where the current task calls
# cancel() for a scope which it doesn't own, which
@ -475,3 +504,15 @@ class FIFOKernel(BaseKernel):
def teardown(self):
for manager in self.signal_managers:
manager.uninstall()
def add_shutdown_task(self, func: Callable[[Any, Any], Coroutine[Any, Any, Any]], *args) -> int:
ident = next(self._shutdown_task_ident)
self.shutdown_tasks.append((ident, func, args))
return ident
def remove_shutdown_task(self, ident: int) -> bool:
for i, (task_id, func, args) in enumerate(self.shutdown_tasks):
if ident == task_id:
self.shutdown_tasks.pop(i)
return True
return False

View File

@ -1,3 +1,4 @@
import structio
from structio.abc import BaseIOManager, BaseKernel
from structio.core.task import Task, TaskState
from structio.util.ki import CTRLC_PROTECTION_ENABLED
@ -107,3 +108,7 @@ class SimpleIOManager(BaseIOManager):
for resource, owner in self.writers.copy().items():
if owner == task:
self.writers.pop(resource)
def clear(self):
self.readers = {}
self.writers = {}

View File

@ -46,6 +46,8 @@ class Task:
next_deadline: Any = -1
# Is cancellation pending?
pending_cancellation: bool = False
# Is this a system task?
is_system_task: bool = False
def done(self):
"""

View File

@ -20,7 +20,8 @@ if platform.system() == "Windows":
class Process:
"""
Class similar to subprocess.Popen, but async
Class similar to subprocess.Popen, but async. The constructor
is analogous to its synchronous counterpart
"""
def __init__(self, *args, **kwargs):
@ -42,12 +43,20 @@ class Process:
self.stderr = None
self.returncode = None
self.pid = -1
self._taskid = None
async def start(self):
async def terminate(self):
"""
Begin execution of the process. The reason this
method exists is to prevent the breaking of the
structured concurrency model
Terminates the subprocess asynchronously
"""
return await structio.thread.run_in_worker(
self._process.terminate, cancellable=True
)
def start(self):
"""
Begin execution of the process
"""
self._process = subprocess.Popen(*self._args, **self._kwargs)
@ -58,7 +67,7 @@ class Process:
self.stdout = FileStream(self._process.stdout)
if self._process.stderr:
self.stderr = FileStream(self._process.stderr)
await checkpoint()
# self._taskid = structio.current_loop().add_shutdown_task(self.wait)
async def is_running(self):
"""
@ -85,6 +94,8 @@ class Process:
self._process.wait, cancellable=True
)
self.returncode = status
if self._taskid is not None:
structio.current_loop().remove_shutdown_task(self._taskid)
return status
async def communicate(self, input=b"") -> tuple[bytes, bytes]:
@ -105,7 +116,7 @@ class Process:
return out, err
async def __aenter__(self):
await self.start()
self.start()
return self
async def __aexit__(self, *args):

View File

@ -28,15 +28,16 @@ class AsyncProcessPool:
# We need this to be running in the background, so we
# create it as a system task
structio.current_loop().spawn_system_task(self._manage_processes)
structio.current_loop().add_shutdown_task(self.stop)
async def _manage_process(self, process: Process):
await process.start()
await process.wait()
await self._sem.release()
async def _manage_processes(self):
async with structio.create_pool() as pool:
async for process in self.processes.reader:
process.start()
pool.spawn(self._manage_process, process)
@property
@ -55,12 +56,16 @@ class AsyncProcessPool:
await self._sem.acquire()
process = Process(*args, **kwargs)
try:
await self.processes.writer.send(process)
await self.processes.send(process)
except structio.ResourceClosed:
await self._sem.release()
raise
return process
async def stop(self):
await self.close()
await self.wait()
async def close(self):
"""
Close the process pool so that no more
@ -74,11 +79,9 @@ class AsyncProcessPool:
async def wait(self):
"""
Wait until the pool has no more work
to do. Must be called after close()
to do
"""
if not self._closing:
raise RuntimeError("call close() first")
while self.processes.reader.pending():
await structio.sleep(0)

View File

@ -3,6 +3,8 @@ import platform
import signal
from collections import defaultdict
from types import FrameType
import structio
from structio.io.socket import AsyncSocket
from typing import Callable, Any, Coroutine
from structio.thread import AsyncThreadQueue
@ -58,6 +60,8 @@ def set_signal_handler(
async def signal_watcher(sock: AsyncSocket):
while True:
if current_loop().entry_point.done():
break
# Even though we use set_wakeup_fd (which makes sure
# our I/O manager is signal-aware and exits cleanly
# when they arrive), it turns out that actually using the
@ -77,7 +81,10 @@ async def signal_watcher(sock: AsyncSocket):
# using an unbounded queue, but realistically I doubt that one would face
# memory problems because their code is receiving thousands of signals and
# the event loop is not handling them fast enough (right?)
await sock.receive(1)
try:
await sock.receive(1)
except structio.Cancelled:
break
async for (sig, frame) in _sig_data:
if _sig_handlers[sig]:
try:

View File

@ -1,6 +1,7 @@
import httpcore
import structio
async def main():
# Note: this test only works because we have our own version of httpcore that
# implements a structio-compatible backend. It's just an example anyway

View File

@ -8,10 +8,6 @@ import shlex
# to it and it'll work
# TODO: Implement a higher-level multiprocessing module that does not expose the
# internal Process object directly so that there's no chance of leaving dangling
# processes running in the background (unless explicitly desired, of course)
async def main(data: str):
cmd = shlex.split("python -c 'print(input())'")
to_send = data.encode(errors="ignore")
@ -28,20 +24,19 @@ async def main(data: str):
cmd, stdin=subprocess.PIPE, stdout=subprocess.PIPE
)
# Note that the process is spawned only after we run start()!
await process.start()
process.start()
out, _ = await process.communicate(to_send)
out = out.decode().rstrip("\r").rstrip("\r\n").rstrip("\n")
assert out == data
# We can also use process pools to spawn multiple processes
# at the same time while limiting resource usage
pool = structio.AsyncProcessPool(4) # Max. 4 processes at a time
for i in range(0, (4 * 5) + 1): # We iterate 21 instead of 20 times: this is important to the test!
for i in range(0, 4 * 5 + 1): # We iterate 21 instead of 20 times: this is important to the test!
# This will stop every 4 iterations
await pool.submit(shlex.split(f"""python -c 'print({i}); __import__("time").sleep(1)'"""))
# Since we're exiting the program right after the last iteration, we need
# to wait for the pool to complete.
await pool.close()
await pool.wait()
# The AsyncPool class automatically waits for all of its workers
# to exit before terminating the event loop
# TODO: Kinda broken at the moment
structio.run(main, "owo")