structio/structio/parallel.py

450 lines
14 KiB
Python

"""
Module inspired by subprocess which allows for asynchronous
multiprocessing
"""
import os
import struct
import sys
import structio
import msgpack
import platform
import subprocess
from itertools import count
from structio.util.finder import ObjectReference
from structio.io import FileStream
from multiprocessing import cpu_count
from structio import Semaphore, Queue
from typing import Callable, Any, Coroutine
from structio.core.syscalls import checkpoint
from structio.exceptions import ResourceBroken, StructIOException
from subprocess import CalledProcessError, CompletedProcess, DEVNULL, PIPE
if platform.system() == "Windows":
# Windows doesn't really support non-blocking file
# descriptors (except sockets), so we just use threads
from structio.io.files import AsyncFile as FileStream
class Process:
"""
Class similar to subprocess.Popen, but async. The constructor
is analogous to its synchronous counterpart
"""
def __init__(self, *args, **kwargs):
if "universal_newlines" in kwargs:
# Not sure why? But everyone else is doing it so :shrug:
raise RuntimeError("universal_newlines is not supported")
if stdin := kwargs.get("stdin"):
if stdin not in {PIPE, DEVNULL}:
# Curio mentions stuff breaking if the child process
# is passed a stdin fd that is set to non-blocking mode
if hasattr(os, "set_blocking"):
os.set_blocking(stdin.fileno(), True)
# Delegate to Popen's constructor
self._process: subprocess.Popen | None = None
self._args = args
self._kwargs = kwargs
self.stdin = None
self.stdout = None
self.stderr = None
self.returncode = None
self.pid = -1
self.shutdown_handlers: list[
tuple[int, bool, Callable[[Any, Any], Coroutine[Any, Any, Any]], args]
] = []
self._handler_id = count()
self._taskid = None
self._started = structio.Event()
async def terminate(self):
"""
Terminates the subprocess asynchronously
"""
return await structio.thread.run_in_worker(
self._process.terminate, cancellable=True
)
async def _run_shutdown_handlers(self, before_wait: bool = False):
for _, _, coro, args in filter(
lambda h: h[1] is before_wait, self.shutdown_handlers
):
await coro(*args)
def add_shutdown_handler(
self,
func: Callable[[Any, Any], Coroutine[Any, Any, Any]],
*args,
before_wait: bool = False,
) -> int:
"""
Registers a coroutine to be executed either right after, or right before wait() is called.
Shutdown handlers are executed one at a time in the order in which they are registered. All
positional arguments are passed to the given coroutine. Returns a unique identifier that can
be used to unregister the handler if necessary. Note that shutdown handlers are called only
once (i.e. the first time wait() returns)
:param func: The coroutine function to add as a shutdown handler
:param before_wait: If False, the default, the handler is executed after wait()
returns. If True, it is executed right before calling it
"""
handler_id = next(self._handler_id)
self.shutdown_handlers.append((handler_id, before_wait, func, args))
return handler_id
def start(self, *, attached: bool = True):
"""
Begin execution of the process
"""
self._process = subprocess.Popen(*self._args, **self._kwargs)
self.pid = self._process.pid
if self._process.stdin:
self.stdin = FileStream(self._process.stdin)
if self._process.stdout:
self.stdout = FileStream(self._process.stdout)
if self._process.stderr:
self.stderr = FileStream(self._process.stderr)
if attached:
self._taskid = structio.current_loop().add_shutdown_task(self.wait)
self._started.set()
async def wait_started(self):
"""
Wait until the process has started. If the process
has already been started or exited, this is a no-op
"""
await self._started.wait()
def detach(self):
"""
Detaches the process from the event loop. This is necessary
if the process needs to outlive the program that spawned it,
but keep in mind that you'll have to handle process termination
yourself. If the process is not attached to an event loop,
StructIOException is raised
"""
if self._taskid is None:
raise StructIOException("process is not attached to an event loop")
structio.current_loop().remove_shutdown_task(self._taskid)
self._taskid = None
def attach(self):
"""
Attach the process to the current event loop. This makes it so that structio
will wait for the process to exit, even if the main task terminates (unless it crashes).
If the process is already attached, StructIOException is raised
"""
if self._taskid is not None:
raise StructIOException("process is already attached to an event loop")
self._taskid = structio.current_loop().add_shutdown_task(self.wait)
async def is_running(self):
"""
Returns whether the process is currently running
"""
if self._process is None:
return False
return self._process.poll() is None
async def wait(self):
"""
Async equivalent of subprocess.Popen.wait()
"""
if self._process is None:
raise RuntimeError("process is not running yet")
status = self._process.poll()
if status is None:
await self._run_shutdown_handlers(before_wait=True)
status = await structio.thread.run_in_worker(
self._process.wait, cancellable=True
)
await self._run_shutdown_handlers()
self.returncode = status
return status
async def communicate(self, input=b"") -> tuple[bytes, bytes]:
if input:
await self.stdin.write(input)
await self.stdin.close()
out = b""
err = b""
if self.stdout:
out = await self.stdout.readall()
if self.stderr:
err = await self.stderr.readall()
return out, err
async def __aenter__(self):
self.start()
return self
async def __aexit__(self, *args):
if self.stdin:
await self.stdin.close()
if self.stdout:
await self.stdout.close()
if self.stderr:
await self.stderr.close()
await self.wait()
def __getattr__(self, item):
# Delegate to internal process object
return getattr(self._process, item)
async def run(
args, *, stdin=None, input=None, stdout=None, stderr=None, shell=False, check=False
):
"""
Async version of subprocess.run()
"""
if input:
stdin = subprocess.PIPE
async with Process(
args, stdin=stdin, stdout=stdout, stderr=stderr, shell=shell
) as process:
try:
stdout, stderr = await process.communicate(input)
except:
process.kill()
raise
if check and process.returncode:
raise CalledProcessError(
process.returncode, process.args, output=stdout, stderr=stderr
)
return CompletedProcess(process.args, process.returncode, stdout, stderr)
async def check_output(args, *, stdin=None, stderr=None, shell=False, input=None):
"""
Async version of subprocess.check_output
"""
out = await run(
args,
stdout=PIPE,
stdin=stdin,
stderr=stderr,
shell=shell,
check=True,
input=input,
)
return out.stdout
class ProcessLimiter:
"""
A class meant to constrain resource usage by limiting
the number of parallel processes that can be spawned.
Note however that processes are *not* reused!
:param max_workers: The maximum number of parallel
worker processes that can be running at any given
time
"""
def __init__(self, max_workers: int = cpu_count()):
self._closing = False
self.processes: Queue = Queue(max_workers)
self._sem = Semaphore(max_workers)
structio.current_loop().spawn_system_task(self._manage_processes)
structio.current_loop().add_shutdown_task(self._ensure_completed)
async def _wait_for_process(self, process: Process):
await process.wait()
await self._sem.release()
@property
def max_workers(self):
return self._sem.max_size
@property
def current_workers(self):
return self._sem.size
async def _manage_process(self, process: Process):
process.start()
await process.wait()
await self._sem.release()
async def _manage_processes(self):
async with structio.create_pool() as pool:
while True:
pool.spawn(self._manage_process, await self.processes.get())
async def _ensure_completed(self):
"""
Makes sure that all submitted processes are run to completion, even
if _manage_processes is cancelled (e.g. because the event loop is
shutting down)
"""
async with structio.create_pool() as pool:
while self.processes:
# If this raises WouldBlock, it's a bug
pool.spawn(self._manage_process, self.processes.get_noblock())
async def submit(self, *args, **kwargs) -> Process:
"""
Submits work to the process pool. All positional
and keyword arguments are passed along to the constructor
of `structio.parallel.Process`. If there are max_workers
processes already running, the call blocks until a spot
is freed. The process object is returned
"""
await self._sem.acquire()
process = Process(*args, **kwargs)
# By using non-blocking variants of the queue methods
# we can ensure processes are always submitted correctly:
# the semaphore already ensures there's enough space on the
# queue, so if this raises WouldBlock it's 100% a bug
self.processes.put_noblock(process)
return process
class PythonProcess:
"""
Run a separate python process asynchronously
"""
def __init__(self, target):
self.target = ObjectReference(target)
self._sock = structio.socket.socket()
self._remote: structio.AsyncSocket | None = None
self._started = structio.Event()
self.process: Process | None = None
async def send_message(self, data: dict):
"""
Sends a
"""
data = msgpack.dumps(data)
await self._remote.send_all(struct.pack("Q", len(data)))
await self._remote.send_all(data)
await self._ensure_ack()
async def _send_ref(self):
msg = {"msg": "EXEC", "ref": str(self.target), "file": None}
if not self.target.in_package:
msg["file"] = self.target.get_file()
await self.send_message(msg)
async def _do_setup(self):
with structio.TaskScope(shielded=True):
# We use a shielded task scope to ensure
# the setup always runs to completion
await self._sock.bind(("127.0.0.1", 0))
await self._sock.listen(1)
addr, port = self._sock.getsockname()
self.process = Process(
[sys.executable, "-m", "structio.util.child_process", addr, str(port)]
)
# If we didn't close the socket before calling wait(), we'd deadlock waiting for the
# process to exit while the process waits for us to send them a message
self.process.add_shutdown_handler(self.close, before_wait=True)
self.process.start()
await self.process.wait_started()
self._started.set()
sock, _addr = await self._sock.accept()
self._remote = sock
await self.send_sos()
await self._send_ref()
async def _ensure_ack(self):
try:
payload = await self.receive_message()
except StructIOException as e:
raise StructIOException("unable to get ACK from remote process") from e
if payload["msg"] != "ACK":
raise StructIOException(
f"invalid message type {payload['msg']!r} received from process (expecting "
f"'ACK'): {payload}"
)
async def send_sos(self):
"""
Sends a Start of Session command to the process, readying it for
execution of arbitrary jobs
"""
await self.send_message({"msg": "HELO"})
async def send_eos(self):
"""
Sends an End of Session command to the process, signaling to shut
it down
"""
await self.send_message({"msg": "CYA"})
async def close(self, graceful: bool = True):
"""
Terminate the remote process. If graceful equals
True, the default, a graceful shutdown is attempted
"""
if not await self.is_running():
return
if graceful:
await self.send_eos()
await self._remote.close()
await self._sock.close()
async def _ensure_started(self):
if not await self.is_running():
raise StructIOException("process is not running")
async def receive_message(self):
"""
Wait for a message from the subprocess and
return it
"""
await self._ensure_started()
data = await self._remote.receive(8)
if not data:
raise ResourceBroken("remote socket was closed abruptly")
size, *_ = struct.unpack("Q", data)
message = msgpack.unpackb(await self._remote.receive_exactly(size))
if not message["ok"]:
raise StructIOException(
f"got error response from remote process: {message}"
)
return message
def start(self):
structio.current_loop().spawn_system_task(self._do_setup)
async def is_running(self):
if not self.process:
await checkpoint()
return False
return await self.process.is_running()
async def wait(self):
await self._ensure_started()
return await self.process.wait()
async def wait_started(self):
# Can't use self.process.wait_started because it
# is likely to be None until _do_setup runs to
# completion
return await self._started.wait()