450 lines
14 KiB
Python
450 lines
14 KiB
Python
"""
|
|
Module inspired by subprocess which allows for asynchronous
|
|
multiprocessing
|
|
"""
|
|
|
|
import os
|
|
import struct
|
|
import sys
|
|
|
|
import structio
|
|
import msgpack
|
|
import platform
|
|
import subprocess
|
|
from itertools import count
|
|
|
|
from structio.util.finder import ObjectReference
|
|
from structio.io import FileStream
|
|
from multiprocessing import cpu_count
|
|
from structio import Semaphore, Queue
|
|
from typing import Callable, Any, Coroutine
|
|
from structio.core.syscalls import checkpoint
|
|
from structio.exceptions import ResourceBroken, StructIOException
|
|
from subprocess import CalledProcessError, CompletedProcess, DEVNULL, PIPE
|
|
|
|
if platform.system() == "Windows":
|
|
# Windows doesn't really support non-blocking file
|
|
# descriptors (except sockets), so we just use threads
|
|
from structio.io.files import AsyncFile as FileStream
|
|
|
|
|
|
class Process:
|
|
"""
|
|
Class similar to subprocess.Popen, but async. The constructor
|
|
is analogous to its synchronous counterpart
|
|
"""
|
|
|
|
def __init__(self, *args, **kwargs):
|
|
if "universal_newlines" in kwargs:
|
|
# Not sure why? But everyone else is doing it so :shrug:
|
|
raise RuntimeError("universal_newlines is not supported")
|
|
if stdin := kwargs.get("stdin"):
|
|
if stdin not in {PIPE, DEVNULL}:
|
|
# Curio mentions stuff breaking if the child process
|
|
# is passed a stdin fd that is set to non-blocking mode
|
|
if hasattr(os, "set_blocking"):
|
|
os.set_blocking(stdin.fileno(), True)
|
|
# Delegate to Popen's constructor
|
|
self._process: subprocess.Popen | None = None
|
|
self._args = args
|
|
self._kwargs = kwargs
|
|
self.stdin = None
|
|
self.stdout = None
|
|
self.stderr = None
|
|
self.returncode = None
|
|
self.pid = -1
|
|
self.shutdown_handlers: list[
|
|
tuple[int, bool, Callable[[Any, Any], Coroutine[Any, Any, Any]], args]
|
|
] = []
|
|
self._handler_id = count()
|
|
self._taskid = None
|
|
self._started = structio.Event()
|
|
|
|
async def terminate(self):
|
|
"""
|
|
Terminates the subprocess asynchronously
|
|
"""
|
|
|
|
return await structio.thread.run_in_worker(
|
|
self._process.terminate, cancellable=True
|
|
)
|
|
|
|
async def _run_shutdown_handlers(self, before_wait: bool = False):
|
|
for _, _, coro, args in filter(
|
|
lambda h: h[1] is before_wait, self.shutdown_handlers
|
|
):
|
|
await coro(*args)
|
|
|
|
def add_shutdown_handler(
|
|
self,
|
|
func: Callable[[Any, Any], Coroutine[Any, Any, Any]],
|
|
*args,
|
|
before_wait: bool = False,
|
|
) -> int:
|
|
"""
|
|
Registers a coroutine to be executed either right after, or right before wait() is called.
|
|
Shutdown handlers are executed one at a time in the order in which they are registered. All
|
|
positional arguments are passed to the given coroutine. Returns a unique identifier that can
|
|
be used to unregister the handler if necessary. Note that shutdown handlers are called only
|
|
once (i.e. the first time wait() returns)
|
|
|
|
:param func: The coroutine function to add as a shutdown handler
|
|
:param before_wait: If False, the default, the handler is executed after wait()
|
|
returns. If True, it is executed right before calling it
|
|
"""
|
|
|
|
handler_id = next(self._handler_id)
|
|
self.shutdown_handlers.append((handler_id, before_wait, func, args))
|
|
return handler_id
|
|
|
|
def start(self, *, attached: bool = True):
|
|
"""
|
|
Begin execution of the process
|
|
"""
|
|
|
|
self._process = subprocess.Popen(*self._args, **self._kwargs)
|
|
self.pid = self._process.pid
|
|
if self._process.stdin:
|
|
self.stdin = FileStream(self._process.stdin)
|
|
if self._process.stdout:
|
|
self.stdout = FileStream(self._process.stdout)
|
|
if self._process.stderr:
|
|
self.stderr = FileStream(self._process.stderr)
|
|
if attached:
|
|
self._taskid = structio.current_loop().add_shutdown_task(self.wait)
|
|
self._started.set()
|
|
|
|
async def wait_started(self):
|
|
"""
|
|
Wait until the process has started. If the process
|
|
has already been started or exited, this is a no-op
|
|
"""
|
|
|
|
await self._started.wait()
|
|
|
|
def detach(self):
|
|
"""
|
|
Detaches the process from the event loop. This is necessary
|
|
if the process needs to outlive the program that spawned it,
|
|
but keep in mind that you'll have to handle process termination
|
|
yourself. If the process is not attached to an event loop,
|
|
StructIOException is raised
|
|
"""
|
|
|
|
if self._taskid is None:
|
|
raise StructIOException("process is not attached to an event loop")
|
|
structio.current_loop().remove_shutdown_task(self._taskid)
|
|
self._taskid = None
|
|
|
|
def attach(self):
|
|
"""
|
|
Attach the process to the current event loop. This makes it so that structio
|
|
will wait for the process to exit, even if the main task terminates (unless it crashes).
|
|
If the process is already attached, StructIOException is raised
|
|
"""
|
|
|
|
if self._taskid is not None:
|
|
raise StructIOException("process is already attached to an event loop")
|
|
self._taskid = structio.current_loop().add_shutdown_task(self.wait)
|
|
|
|
async def is_running(self):
|
|
"""
|
|
Returns whether the process is currently running
|
|
"""
|
|
|
|
if self._process is None:
|
|
return False
|
|
return self._process.poll() is None
|
|
|
|
async def wait(self):
|
|
"""
|
|
Async equivalent of subprocess.Popen.wait()
|
|
"""
|
|
|
|
if self._process is None:
|
|
raise RuntimeError("process is not running yet")
|
|
|
|
status = self._process.poll()
|
|
if status is None:
|
|
await self._run_shutdown_handlers(before_wait=True)
|
|
status = await structio.thread.run_in_worker(
|
|
self._process.wait, cancellable=True
|
|
)
|
|
await self._run_shutdown_handlers()
|
|
self.returncode = status
|
|
return status
|
|
|
|
async def communicate(self, input=b"") -> tuple[bytes, bytes]:
|
|
if input:
|
|
await self.stdin.write(input)
|
|
await self.stdin.close()
|
|
out = b""
|
|
err = b""
|
|
if self.stdout:
|
|
out = await self.stdout.readall()
|
|
if self.stderr:
|
|
err = await self.stderr.readall()
|
|
return out, err
|
|
|
|
async def __aenter__(self):
|
|
self.start()
|
|
return self
|
|
|
|
async def __aexit__(self, *args):
|
|
if self.stdin:
|
|
await self.stdin.close()
|
|
if self.stdout:
|
|
await self.stdout.close()
|
|
if self.stderr:
|
|
await self.stderr.close()
|
|
await self.wait()
|
|
|
|
def __getattr__(self, item):
|
|
# Delegate to internal process object
|
|
return getattr(self._process, item)
|
|
|
|
|
|
async def run(
|
|
args, *, stdin=None, input=None, stdout=None, stderr=None, shell=False, check=False
|
|
):
|
|
"""
|
|
Async version of subprocess.run()
|
|
"""
|
|
|
|
if input:
|
|
stdin = subprocess.PIPE
|
|
async with Process(
|
|
args, stdin=stdin, stdout=stdout, stderr=stderr, shell=shell
|
|
) as process:
|
|
try:
|
|
stdout, stderr = await process.communicate(input)
|
|
except:
|
|
process.kill()
|
|
raise
|
|
|
|
if check and process.returncode:
|
|
raise CalledProcessError(
|
|
process.returncode, process.args, output=stdout, stderr=stderr
|
|
)
|
|
return CompletedProcess(process.args, process.returncode, stdout, stderr)
|
|
|
|
|
|
async def check_output(args, *, stdin=None, stderr=None, shell=False, input=None):
|
|
"""
|
|
Async version of subprocess.check_output
|
|
"""
|
|
|
|
out = await run(
|
|
args,
|
|
stdout=PIPE,
|
|
stdin=stdin,
|
|
stderr=stderr,
|
|
shell=shell,
|
|
check=True,
|
|
input=input,
|
|
)
|
|
return out.stdout
|
|
|
|
|
|
class ProcessLimiter:
|
|
"""
|
|
A class meant to constrain resource usage by limiting
|
|
the number of parallel processes that can be spawned.
|
|
Note however that processes are *not* reused!
|
|
|
|
:param max_workers: The maximum number of parallel
|
|
worker processes that can be running at any given
|
|
time
|
|
"""
|
|
|
|
def __init__(self, max_workers: int = cpu_count()):
|
|
self._closing = False
|
|
self.processes: Queue = Queue(max_workers)
|
|
self._sem = Semaphore(max_workers)
|
|
structio.current_loop().spawn_system_task(self._manage_processes)
|
|
structio.current_loop().add_shutdown_task(self._ensure_completed)
|
|
|
|
async def _wait_for_process(self, process: Process):
|
|
await process.wait()
|
|
await self._sem.release()
|
|
|
|
@property
|
|
def max_workers(self):
|
|
return self._sem.max_size
|
|
|
|
@property
|
|
def current_workers(self):
|
|
return self._sem.size
|
|
|
|
async def _manage_process(self, process: Process):
|
|
process.start()
|
|
await process.wait()
|
|
await self._sem.release()
|
|
|
|
async def _manage_processes(self):
|
|
async with structio.create_pool() as pool:
|
|
while True:
|
|
pool.spawn(self._manage_process, await self.processes.get())
|
|
|
|
async def _ensure_completed(self):
|
|
"""
|
|
Makes sure that all submitted processes are run to completion, even
|
|
if _manage_processes is cancelled (e.g. because the event loop is
|
|
shutting down)
|
|
"""
|
|
|
|
async with structio.create_pool() as pool:
|
|
while self.processes:
|
|
# If this raises WouldBlock, it's a bug
|
|
pool.spawn(self._manage_process, self.processes.get_noblock())
|
|
|
|
async def submit(self, *args, **kwargs) -> Process:
|
|
"""
|
|
Submits work to the process pool. All positional
|
|
and keyword arguments are passed along to the constructor
|
|
of `structio.parallel.Process`. If there are max_workers
|
|
processes already running, the call blocks until a spot
|
|
is freed. The process object is returned
|
|
"""
|
|
|
|
await self._sem.acquire()
|
|
process = Process(*args, **kwargs)
|
|
# By using non-blocking variants of the queue methods
|
|
# we can ensure processes are always submitted correctly:
|
|
# the semaphore already ensures there's enough space on the
|
|
# queue, so if this raises WouldBlock it's 100% a bug
|
|
self.processes.put_noblock(process)
|
|
return process
|
|
|
|
|
|
class PythonProcess:
|
|
"""
|
|
Run a separate python process asynchronously
|
|
"""
|
|
|
|
def __init__(self, target):
|
|
self.target = ObjectReference(target)
|
|
self._sock = structio.socket.socket()
|
|
self._remote: structio.AsyncSocket | None = None
|
|
self._started = structio.Event()
|
|
self.process: Process | None = None
|
|
|
|
async def send_message(self, data: dict):
|
|
"""
|
|
Sends a
|
|
"""
|
|
|
|
data = msgpack.dumps(data)
|
|
await self._remote.send_all(struct.pack("Q", len(data)))
|
|
await self._remote.send_all(data)
|
|
await self._ensure_ack()
|
|
|
|
async def _send_ref(self):
|
|
msg = {"msg": "EXEC", "ref": str(self.target), "file": None}
|
|
if not self.target.in_package:
|
|
msg["file"] = self.target.get_file()
|
|
await self.send_message(msg)
|
|
|
|
async def _do_setup(self):
|
|
with structio.TaskScope(shielded=True):
|
|
# We use a shielded task scope to ensure
|
|
# the setup always runs to completion
|
|
await self._sock.bind(("127.0.0.1", 0))
|
|
await self._sock.listen(1)
|
|
addr, port = self._sock.getsockname()
|
|
self.process = Process(
|
|
[sys.executable, "-m", "structio.util.child_process", addr, str(port)]
|
|
)
|
|
# If we didn't close the socket before calling wait(), we'd deadlock waiting for the
|
|
# process to exit while the process waits for us to send them a message
|
|
self.process.add_shutdown_handler(self.close, before_wait=True)
|
|
self.process.start()
|
|
await self.process.wait_started()
|
|
self._started.set()
|
|
sock, _addr = await self._sock.accept()
|
|
self._remote = sock
|
|
await self.send_sos()
|
|
await self._send_ref()
|
|
|
|
async def _ensure_ack(self):
|
|
try:
|
|
payload = await self.receive_message()
|
|
except StructIOException as e:
|
|
raise StructIOException("unable to get ACK from remote process") from e
|
|
if payload["msg"] != "ACK":
|
|
raise StructIOException(
|
|
f"invalid message type {payload['msg']!r} received from process (expecting "
|
|
f"'ACK'): {payload}"
|
|
)
|
|
|
|
async def send_sos(self):
|
|
"""
|
|
Sends a Start of Session command to the process, readying it for
|
|
execution of arbitrary jobs
|
|
"""
|
|
|
|
await self.send_message({"msg": "HELO"})
|
|
|
|
async def send_eos(self):
|
|
"""
|
|
Sends an End of Session command to the process, signaling to shut
|
|
it down
|
|
"""
|
|
|
|
await self.send_message({"msg": "CYA"})
|
|
|
|
async def close(self, graceful: bool = True):
|
|
"""
|
|
Terminate the remote process. If graceful equals
|
|
True, the default, a graceful shutdown is attempted
|
|
"""
|
|
|
|
if not await self.is_running():
|
|
return
|
|
|
|
if graceful:
|
|
await self.send_eos()
|
|
await self._remote.close()
|
|
await self._sock.close()
|
|
|
|
async def _ensure_started(self):
|
|
if not await self.is_running():
|
|
raise StructIOException("process is not running")
|
|
|
|
async def receive_message(self):
|
|
"""
|
|
Wait for a message from the subprocess and
|
|
return it
|
|
"""
|
|
|
|
await self._ensure_started()
|
|
data = await self._remote.receive(8)
|
|
if not data:
|
|
raise ResourceBroken("remote socket was closed abruptly")
|
|
size, *_ = struct.unpack("Q", data)
|
|
message = msgpack.unpackb(await self._remote.receive_exactly(size))
|
|
if not message["ok"]:
|
|
raise StructIOException(
|
|
f"got error response from remote process: {message}"
|
|
)
|
|
return message
|
|
|
|
def start(self):
|
|
structio.current_loop().spawn_system_task(self._do_setup)
|
|
|
|
async def is_running(self):
|
|
if not self.process:
|
|
await checkpoint()
|
|
return False
|
|
return await self.process.is_running()
|
|
|
|
async def wait(self):
|
|
await self._ensure_started()
|
|
return await self.process.wait()
|
|
|
|
async def wait_started(self):
|
|
# Can't use self.process.wait_started because it
|
|
# is likely to be None until _do_setup runs to
|
|
# completion
|
|
return await self._started.wait()
|