""" Module inspired by subprocess which allows for asynchronous multiprocessing """ import os import struct import sys import structio import msgpack import platform import subprocess from itertools import count from structio.util.finder import ObjectReference from structio.io import FileStream from multiprocessing import cpu_count from structio import Semaphore, Queue from typing import Callable, Any, Coroutine from structio.core.syscalls import checkpoint from structio.exceptions import ResourceBroken, StructIOException from subprocess import CalledProcessError, CompletedProcess, DEVNULL, PIPE if platform.system() == "Windows": # Windows doesn't really support non-blocking file # descriptors (except sockets), so we just use threads from structio.io.files import AsyncFile as FileStream class Process: """ Class similar to subprocess.Popen, but async. The constructor is analogous to its synchronous counterpart """ def __init__(self, *args, **kwargs): if "universal_newlines" in kwargs: # Not sure why? But everyone else is doing it so :shrug: raise RuntimeError("universal_newlines is not supported") if stdin := kwargs.get("stdin"): if stdin not in {PIPE, DEVNULL}: # Curio mentions stuff breaking if the child process # is passed a stdin fd that is set to non-blocking mode if hasattr(os, "set_blocking"): os.set_blocking(stdin.fileno(), True) # Delegate to Popen's constructor self._process: subprocess.Popen | None = None self._args = args self._kwargs = kwargs self.stdin = None self.stdout = None self.stderr = None self.returncode = None self.pid = -1 self.shutdown_handlers: list[ tuple[int, bool, Callable[[Any, Any], Coroutine[Any, Any, Any]], args] ] = [] self._handler_id = count() self._taskid = None self._started = structio.Event() async def terminate(self): """ Terminates the subprocess asynchronously """ return await structio.thread.run_in_worker( self._process.terminate, cancellable=True ) async def _run_shutdown_handlers(self, before_wait: bool = False): for _, _, coro, args in filter( lambda h: h[1] is before_wait, self.shutdown_handlers ): await coro(*args) def add_shutdown_handler( self, func: Callable[[Any, Any], Coroutine[Any, Any, Any]], *args, before_wait: bool = False, ) -> int: """ Registers a coroutine to be executed either right after, or right before wait() is called. Shutdown handlers are executed one at a time in the order in which they are registered. All positional arguments are passed to the given coroutine. Returns a unique identifier that can be used to unregister the handler if necessary. Note that shutdown handlers are called only once (i.e. the first time wait() returns) :param func: The coroutine function to add as a shutdown handler :param before_wait: If False, the default, the handler is executed after wait() returns. If True, it is executed right before calling it """ handler_id = next(self._handler_id) self.shutdown_handlers.append((handler_id, before_wait, func, args)) return handler_id def start(self, *, attached: bool = True): """ Begin execution of the process """ self._process = subprocess.Popen(*self._args, **self._kwargs) self.pid = self._process.pid if self._process.stdin: self.stdin = FileStream(self._process.stdin) if self._process.stdout: self.stdout = FileStream(self._process.stdout) if self._process.stderr: self.stderr = FileStream(self._process.stderr) if attached: self._taskid = structio.current_loop().add_shutdown_task(self.wait) self._started.set() async def wait_started(self): """ Wait until the process has started. If the process has already been started or exited, this is a no-op """ await self._started.wait() def detach(self): """ Detaches the process from the event loop. This is necessary if the process needs to outlive the program that spawned it, but keep in mind that you'll have to handle process termination yourself. If the process is not attached to an event loop, StructIOException is raised """ if self._taskid is None: raise StructIOException("process is not attached to an event loop") structio.current_loop().remove_shutdown_task(self._taskid) self._taskid = None def attach(self): """ Attach the process to the current event loop. This makes it so that structio will wait for the process to exit, even if the main task terminates (unless it crashes). If the process is already attached, StructIOException is raised """ if self._taskid is not None: raise StructIOException("process is already attached to an event loop") self._taskid = structio.current_loop().add_shutdown_task(self.wait) async def is_running(self): """ Returns whether the process is currently running """ if self._process is None: return False return self._process.poll() is None async def wait(self): """ Async equivalent of subprocess.Popen.wait() """ if self._process is None: raise RuntimeError("process is not running yet") status = self._process.poll() if status is None: await self._run_shutdown_handlers(before_wait=True) status = await structio.thread.run_in_worker( self._process.wait, cancellable=True ) await self._run_shutdown_handlers() self.returncode = status return status async def communicate(self, input=b"") -> tuple[bytes, bytes]: if input: await self.stdin.write(input) await self.stdin.close() out = b"" err = b"" if self.stdout: out = await self.stdout.readall() if self.stderr: err = await self.stderr.readall() return out, err async def __aenter__(self): self.start() return self async def __aexit__(self, *args): if self.stdin: await self.stdin.close() if self.stdout: await self.stdout.close() if self.stderr: await self.stderr.close() await self.wait() def __getattr__(self, item): # Delegate to internal process object return getattr(self._process, item) async def run( args, *, stdin=None, input=None, stdout=None, stderr=None, shell=False, check=False ): """ Async version of subprocess.run() """ if input: stdin = subprocess.PIPE async with Process( args, stdin=stdin, stdout=stdout, stderr=stderr, shell=shell ) as process: try: stdout, stderr = await process.communicate(input) except: process.kill() raise if check and process.returncode: raise CalledProcessError( process.returncode, process.args, output=stdout, stderr=stderr ) return CompletedProcess(process.args, process.returncode, stdout, stderr) async def check_output(args, *, stdin=None, stderr=None, shell=False, input=None): """ Async version of subprocess.check_output """ out = await run( args, stdout=PIPE, stdin=stdin, stderr=stderr, shell=shell, check=True, input=input, ) return out.stdout class ProcessLimiter: """ A class meant to constrain resource usage by limiting the number of parallel processes that can be spawned. Note however that processes are *not* reused! :param max_workers: The maximum number of parallel worker processes that can be running at any given time """ def __init__(self, max_workers: int = cpu_count()): self._closing = False self.processes: Queue = Queue(max_workers) self._sem = Semaphore(max_workers) structio.current_loop().spawn_system_task(self._manage_processes) structio.current_loop().add_shutdown_task(self._ensure_completed) async def _wait_for_process(self, process: Process): await process.wait() await self._sem.release() @property def max_workers(self): return self._sem.max_size @property def current_workers(self): return self._sem.size async def _manage_process(self, process: Process): process.start() await process.wait() await self._sem.release() async def _manage_processes(self): async with structio.create_pool() as pool: while True: pool.spawn(self._manage_process, await self.processes.get()) async def _ensure_completed(self): """ Makes sure that all submitted processes are run to completion, even if _manage_processes is cancelled (e.g. because the event loop is shutting down) """ async with structio.create_pool() as pool: while self.processes: # If this raises WouldBlock, it's a bug pool.spawn(self._manage_process, self.processes.get_noblock()) async def submit(self, *args, **kwargs) -> Process: """ Submits work to the process pool. All positional and keyword arguments are passed along to the constructor of `structio.parallel.Process`. If there are max_workers processes already running, the call blocks until a spot is freed. The process object is returned """ await self._sem.acquire() process = Process(*args, **kwargs) # By using non-blocking variants of the queue methods # we can ensure processes are always submitted correctly: # the semaphore already ensures there's enough space on the # queue, so if this raises WouldBlock it's 100% a bug self.processes.put_noblock(process) return process class PythonProcess: """ Run a separate python process asynchronously """ def __init__(self, target): self.target = ObjectReference(target) self._sock = structio.socket.socket() self._remote: structio.AsyncSocket | None = None self._started = structio.Event() self.process: Process | None = None async def send_message(self, data: dict): """ Sends a """ data = msgpack.dumps(data) await self._remote.send_all(struct.pack("Q", len(data))) await self._remote.send_all(data) await self._ensure_ack() async def _send_ref(self): msg = {"msg": "EXEC", "ref": str(self.target), "file": None} if not self.target.in_package: msg["file"] = self.target.get_file() await self.send_message(msg) async def _do_setup(self): with structio.TaskScope(shielded=True): # We use a shielded task scope to ensure # the setup always runs to completion await self._sock.bind(("127.0.0.1", 0)) await self._sock.listen(1) addr, port = self._sock.getsockname() self.process = Process( [sys.executable, "-m", "structio.util.child_process", addr, str(port)] ) # If we didn't close the socket before calling wait(), we'd deadlock waiting for the # process to exit while the process waits for us to send them a message self.process.add_shutdown_handler(self.close, before_wait=True) self.process.start() await self.process.wait_started() self._started.set() sock, _addr = await self._sock.accept() self._remote = sock await self.send_sos() await self._send_ref() async def _ensure_ack(self): try: payload = await self.receive_message() except StructIOException as e: raise StructIOException("unable to get ACK from remote process") from e if payload["msg"] != "ACK": raise StructIOException( f"invalid message type {payload['msg']!r} received from process (expecting " f"'ACK'): {payload}" ) async def send_sos(self): """ Sends a Start of Session command to the process, readying it for execution of arbitrary jobs """ await self.send_message({"msg": "HELO"}) async def send_eos(self): """ Sends an End of Session command to the process, signaling to shut it down """ await self.send_message({"msg": "CYA"}) async def close(self, graceful: bool = True): """ Terminate the remote process. If graceful equals True, the default, a graceful shutdown is attempted """ if not await self.is_running(): return if graceful: await self.send_eos() await self._remote.close() await self._sock.close() async def _ensure_started(self): if not await self.is_running(): raise StructIOException("process is not running") async def receive_message(self): """ Wait for a message from the subprocess and return it """ await self._ensure_started() data = await self._remote.receive(8) if not data: raise ResourceBroken("remote socket was closed abruptly") size, *_ = struct.unpack("Q", data) message = msgpack.unpackb(await self._remote.receive_exactly(size)) if not message["ok"]: raise StructIOException( f"got error response from remote process: {message}" ) return message def start(self): structio.current_loop().spawn_system_task(self._do_setup) async def is_running(self): if not self.process: await checkpoint() return False return await self.process.is_running() async def wait(self): await self._ensure_started() return await self.process.wait() async def wait_started(self): # Can't use self.process.wait_started because it # is likely to be None until _do_setup runs to # completion return await self._started.wait()