structio/structio/parallel.py

290 lines
9.5 KiB
Python
Raw Normal View History

"""
Module inspired by subprocess which allows for asynchronous
multiprocessing
"""
import os
import structio
2023-08-23 21:56:25 +02:00
import platform
import subprocess
from itertools import count
from structio.io import FileStream
from multiprocessing import cpu_count
from structio import Semaphore, Queue
from typing import Callable, Any, Coroutine
from subprocess import CalledProcessError, CompletedProcess, DEVNULL, PIPE
2024-02-23 13:11:14 +01:00
2023-08-23 21:56:25 +02:00
if platform.system() == "Windows":
# Windows doesn't really support non-blocking file
# descriptors (except sockets), so we just use threads
2023-08-23 21:56:25 +02:00
from structio.io.files import AsyncFile as FileStream
class Process:
"""
Class similar to subprocess.Popen, but async. The constructor
is analogous to its synchronous counterpart
"""
def __init__(self, *args, **kwargs):
if "universal_newlines" in kwargs:
# Not sure why? But everyone else is doing it so :shrug:
raise RuntimeError("universal_newlines is not supported")
if stdin := kwargs.get("stdin"):
if stdin not in {PIPE, DEVNULL}:
# Curio mentions stuff breaking if the child process
# is passed a stdin fd that is set to non-blocking mode
2023-08-23 21:56:25 +02:00
if hasattr(os, "set_blocking"):
os.set_blocking(stdin.fileno(), True)
# Delegate to Popen's constructor
self._process: subprocess.Popen | None = None
self._args = args
self._kwargs = kwargs
self.stdin = None
self.stdout = None
self.stderr = None
self.returncode = None
self.pid = -1
self.shutdown_handlers: list[tuple[int, Callable[[Any, Any], Coroutine[Any, Any, Any]], args]] = []
self._handler_id = count()
self._taskid = None
async def terminate(self):
"""
Terminates the subprocess asynchronously
"""
return await structio.thread.run_in_worker(
self._process.terminate, cancellable=True
)
async def _run_shutdown_handlers(self):
for _, coro, args in self.shutdown_handlers:
await coro(*args)
def add_shutdown_handler(self, func: Callable[[Any, Any], Coroutine[Any, Any, Any]], *args) -> int:
"""
Registers a coroutine to be executed after the process terminates. Shutdown handlers
are executed in the order in which they are registered. All positional arguments are
passed to the given coroutine. Returns a unique identifier that can be used to remove
the handler later
"""
handler_id = next(self._handler_id)
self.shutdown_handlers.append((handler_id, func, args))
return handler_id
def start(self):
"""
Begin execution of the process
"""
self._process = subprocess.Popen(*self._args, **self._kwargs)
self.pid = self._process.pid
if self._process.stdin:
self.stdin = FileStream(self._process.stdin)
if self._process.stdout:
self.stdout = FileStream(self._process.stdout)
if self._process.stderr:
self.stderr = FileStream(self._process.stderr)
self._taskid = structio.current_loop().add_shutdown_task(self.wait)
def detach(self):
"""
Detaches the process from the event loop. This is necessary
if the process needs to outlive the program that spawned it,
but keep in mind that you'll have to handle process termination
yourself. If the process is not attached to an event loop,
StructIOException is raised
"""
if self._taskid is None:
raise structio.exceptions.StructIOException("process is not attached to an event loop")
structio.current_loop().remove_shutdown_task(self._taskid)
self._taskid = None
def attach(self):
"""
Attach the process to the current event loop. This makes it so that structio
will wait for the process to exit, even if the main task terminates (unless it crashes).
If the process is already attached, StructIOException is raised
"""
if self._taskid is not None:
raise structio.exceptions.StructIOException("process is already attached to an event loop")
self._taskid = structio.current_loop().add_shutdown_task(self.wait)
async def is_running(self):
"""
Returns whether the process is currently running
"""
if self._process is None:
return False
elif self._process.poll() is None:
return False
return True
async def wait(self):
"""
Async equivalent of subprocess.Popen.wait()
"""
if self._process is None:
raise RuntimeError("process is not running yet")
status = self._process.poll()
if status is None:
2023-06-19 17:34:44 +02:00
status = await structio.thread.run_in_worker(
self._process.wait, cancellable=True
)
self.returncode = status
await self._run_shutdown_handlers()
return status
async def communicate(self, input=b"") -> tuple[bytes, bytes]:
async with structio.create_pool() as pool:
stdout = pool.spawn(self.stdout.readall) if self.stdout else None
stderr = pool.spawn(self.stderr.readall) if self.stderr else None
if input:
await self.stdin.write(input)
await self.stdin.close()
# Awaiting a task object waits for its completion and
# returns its return value!
out = b""
err = b""
if stdout:
out = await stdout
if stderr:
err = await stderr
return out, err
async def __aenter__(self):
self.start()
return self
async def __aexit__(self, *args):
if self.stdin:
await self.stdin.close()
if self.stdout:
await self.stdout.close()
if self.stderr:
await self.stderr.close()
await self.wait()
def __getattr__(self, item):
# Delegate to internal process object
return getattr(self._process, item)
2023-06-19 17:34:44 +02:00
async def run(
args, *, stdin=None, input=None, stdout=None, stderr=None, shell=False, check=False
):
"""
Async version of subprocess.run()
"""
if input:
stdin = subprocess.PIPE
async with Process(
2023-06-19 17:34:44 +02:00
args, stdin=stdin, stdout=stdout, stderr=stderr, shell=shell
) as process:
try:
stdout, stderr = await process.communicate(input)
except:
process.kill()
raise
if check and process.returncode:
raise CalledProcessError(process.returncode, process.args, output=stdout, stderr=stderr)
return CompletedProcess(process.args, process.returncode, stdout, stderr)
async def check_output(args, *, stdin=None, stderr=None, shell=False, input=None):
"""
Async version of subprocess.check_output
"""
2023-06-19 17:34:44 +02:00
out = await run(
args,
stdout=PIPE,
stdin=stdin,
stderr=stderr,
shell=shell,
check=True,
input=input,
)
return out.stdout
class ProcessLimiter:
"""
A class meant to constrain resource usage by limiting
the number of parallel processes that can be spawned.
Note however that processes are *not* reused!
:param max_workers: The maximum number of parallel
worker processes that can be running at any given
time
"""
def __init__(self, max_workers: int = cpu_count()):
self._closing = False
self.processes: Queue = Queue(max_workers)
self._sem = Semaphore(max_workers)
structio.current_loop().spawn_system_task(self._manage_processes)
structio.current_loop().add_shutdown_task(self._ensure_completed)
async def _wait_for_process(self, process: Process):
await process.wait()
await self._sem.release()
@property
def max_workers(self):
return self._sem.max_size
@property
def current_workers(self):
return self._sem.size
async def _manage_process(self, process: Process):
process.start()
await process.wait()
await self._sem.release()
async def _manage_processes(self):
async with structio.create_pool() as pool:
while True:
pool.spawn(self._manage_process, await self.processes.get())
async def _ensure_completed(self):
"""
Makes sure that all submitted processes are run to completion, even
if _manage_processes is cancelled (e.g. because the event loop is
shutting down)
"""
async with structio.create_pool() as pool:
while self.processes:
# If this raises WouldBlock, it's a bug
pool.spawn(self._manage_process, self.processes.get_noblock())
async def submit(self, *args, **kwargs) -> Process:
"""
Submits work to the process pool. All positional
and keyword arguments are passed along to the constructor
of `structio.parallel.Process`. If there are max_workers
processes already running, the call blocks until a spot
is freed. The process object is returned
"""
await self._sem.acquire()
process = Process(*args, **kwargs)
# By using non-blocking variants of the queue methods
# we can ensure processes are always submitted correctly:
# the semaphore already ensures there's enough space on the
# queue, so if this raises WouldBlock it's 100% a bug
self.processes.put_noblock(process)
return process