From 62b2f19527bb4e7e55148a09a200ff6ae86673b0 Mon Sep 17 00:00:00 2001 From: Mattia Giambirtone Date: Sun, 10 Mar 2024 19:35:00 +0100 Subject: [PATCH] Preliminary work on process pools. Remove smart events --- structio/__init__.py | 8 +++-- structio/abc.py | 17 ++++++++++ structio/parallel.py | 58 ++++++++++++++++++++++++++++------ structio/pools.py | 72 ++++++++++++++++++++++++++++++++++++------- structio/sync.py | 64 ++------------------------------------ tests/echo_server.py | 2 +- tests/processes.py | 26 ++++++++++++---- tests/smart_events.py | 25 --------------- 8 files changed, 154 insertions(+), 118 deletions(-) delete mode 100644 tests/smart_events.py diff --git a/structio/__init__.py b/structio/__init__.py index c133982..879ff93 100644 --- a/structio/__init__.py +++ b/structio/__init__.py @@ -23,9 +23,6 @@ from structio.sync import ( Semaphore, Lock, RLock, - emit, - on_event, - register_event, ) from structio.abc import Channel, Stream, ChannelReader, ChannelWriter from structio.io import socket @@ -43,6 +40,8 @@ from structio.core.run import current_loop, current_task from structio import thread, parallel from structio.path import Path from structio.signals import set_signal_handler, get_signal_handler +from structio.parallel import Process +from structio.pools import AsyncProcessPool from structio import signals as _signals from structio import util @@ -175,4 +174,7 @@ __all__ = [ "ResourceBusy", "ResourceBroken", "WouldBlock", + "Process", + "AsyncProcessPool", + "AsyncSocket", ] diff --git a/structio/abc.py b/structio/abc.py index 9235786..0dc8e0d 100644 --- a/structio/abc.py +++ b/structio/abc.py @@ -3,6 +3,7 @@ import os from abc import abstractmethod, ABC from types import FrameType +import structio from structio.core.task import Task from structio.exceptions import StructIOException from typing import Callable, Any, Coroutine @@ -136,6 +137,22 @@ class ChannelReader(AsyncResource, ABC): return NotImplemented + def __aiter__(self): + """ + Implements asynchronous iteration + """ + return self + + async def __anext__(self): + """ + Implements asynchronous iteration + """ + + try: + return await self.receive() + except structio.ResourceClosed: + raise StopAsyncIteration() + @abstractmethod def pending(self): """ diff --git a/structio/parallel.py b/structio/parallel.py index 30525fb..54490f4 100644 --- a/structio/parallel.py +++ b/structio/parallel.py @@ -1,5 +1,7 @@ -"""Module inspired by subprocess which allows for asynchronous -multiprocessing""" +""" +Module inspired by subprocess which allows for asynchronous +multiprocessing +""" import os import structio @@ -7,6 +9,8 @@ import platform import subprocess from subprocess import CalledProcessError, CompletedProcess, DEVNULL, PIPE from structio.io import FileStream +from structio.core.syscalls import checkpoint + if platform.system() == "Windows": # Windows doesn't really support non-blocking file @@ -14,9 +18,9 @@ if platform.system() == "Windows": from structio.io.files import AsyncFile as FileStream -class Popen: +class Process: """ - Wrapper around subprocess.Popen, but async + Class similar to subprocess.Popen, but async """ def __init__(self, *args, **kwargs): @@ -30,23 +34,57 @@ class Popen: if hasattr(os, "set_blocking"): os.set_blocking(stdin.fileno(), True) # Delegate to Popen's constructor - self._process: subprocess.Popen = subprocess.Popen(*args, **kwargs) + self._process: subprocess.Popen | None = None + self._args = args + self._kwargs = kwargs self.stdin = None self.stdout = None self.stderr = None + self.returncode = None + self.pid = -1 + + async def start(self): + """ + Begin execution of the process. The reason this + method exists is to prevent the breaking of the + structured concurrency model + """ + + self._process = subprocess.Popen(*self._args, **self._kwargs) + self.pid = self._process.pid if self._process.stdin: self.stdin = FileStream(self._process.stdin) if self._process.stdout: self.stdout = FileStream(self._process.stdout) if self._process.stderr: self.stderr = FileStream(self._process.stderr) + await checkpoint() + + async def is_running(self): + """ + Returns whether the process is currently running + """ + + if self._process is None: + return False + elif self._process.poll() is None: + return False + return True async def wait(self): + """ + Async equivalent of subprocess.Popen.wait() + """ + + if self._process is None: + raise RuntimeError("process is not running yet") + status = self._process.poll() if status is None: status = await structio.thread.run_in_worker( self._process.wait, cancellable=True ) + self.returncode = status return status async def communicate(self, input=b"") -> tuple[bytes, bytes]: @@ -67,6 +105,7 @@ class Popen: return out, err async def __aenter__(self): + await self.start() return self async def __aexit__(self, *args): @@ -92,7 +131,7 @@ async def run( if input: stdin = subprocess.PIPE - async with Popen( + async with Process( args, stdin=stdin, stdout=stdout, stderr=stderr, shell=shell ) as process: try: @@ -101,10 +140,9 @@ async def run( process.kill() raise - status = process.poll() - if check and status: - raise CalledProcessError(status, process.args, output=stdout, stderr=stderr) - return CompletedProcess(process.args, status, stdout, stderr) + if check and process.returncode: + raise CalledProcessError(process.returncode, process.args, output=stdout, stderr=stderr) + return CompletedProcess(process.args, process.returncode, stdout, stderr) async def check_output(args, *, stdin=None, stderr=None, shell=False, input=None): diff --git a/structio/pools.py b/structio/pools.py index 99992d5..7045bd1 100644 --- a/structio/pools.py +++ b/structio/pools.py @@ -1,19 +1,18 @@ """ -A module implementing worker pools +A module implementing worker process pools """ from multiprocessing import cpu_count -from typing import Callable, Any - -from structio.parallel import Popen -from structio import Queue +import structio +from structio.parallel import Process +from structio import Semaphore, MemoryChannel class AsyncProcessPool: """ A worker pool for asynchronous processes. - :param max_workers: The maximum number of concurrent + :param max_workers: The maximum number of parallel worker processes that can be running at any given time """ @@ -23,12 +22,63 @@ class AsyncProcessPool: Public object constructor """ - self.processes: Queue = Queue(max_workers) + self._closing = False + self.processes: MemoryChannel = MemoryChannel(0) + self._sem = Semaphore(max_workers) + # We need this to be running in the background, so we + # create it as a system task + structio.current_loop().spawn_system_task(self._manage_processes) + + async def _manage_process(self, process: Process): + await process.start() + await process.wait() + await self._sem.release() + + async def _manage_processes(self): + async with structio.create_pool() as pool: + async for process in self.processes.reader: + pool.spawn(self._manage_process, process) @property def max_workers(self): - return self.processes.maxsize + return self._sem.max_size + + async def submit(self, *args, **kwargs) -> Process: + """ + Submits work to the process pool. All positional + and keyword arguments are passed along to the constructor + of `structio.parallel.Process`. If there are max_workers + processes already running, the call blocks until a spot + is freed. The process object is returned + """ + + await self._sem.acquire() + process = Process(*args, **kwargs) + try: + await self.processes.writer.send(process) + except structio.ResourceClosed: + await self._sem.release() + raise + return process + + async def close(self): + """ + Close the process pool so that no more + work processes can be submitted. Currently + running processes will run to completion + """ + + self._closing = True + await self.processes.close() + + async def wait(self): + """ + Wait until the pool has no more work + to do. Must be called after close() + """ + + if not self._closing: + raise RuntimeError("call close() first") + while self.processes.reader.pending(): + await structio.sleep(0) - async def submit(self, fn: Callable[[Any, Any], Any], *args, block: bool = True): - if block: - await self.processes.put((fn, args)) \ No newline at end of file diff --git a/structio/sync.py b/structio/sync.py index 5e51129..19871f2 100644 --- a/structio/sync.py +++ b/structio/sync.py @@ -224,6 +224,7 @@ class PriorityQueue(Queue): def _put_item(self, item): heappush(self.container, item) + class MemoryReceiveChannel(ChannelReader): """ An in-memory one-way channel to read @@ -498,65 +499,4 @@ class RLock(Lock): if self._acquire_count == 0: await super().release() else: - await checkpoint() - - -_events: dict[str, list[Callable[[Any, Any], Coroutine[Any, Any, Any]]]] = defaultdict( - list -) - - -async def emit(evt: str, *args, **kwargs): - """ - Fire the event and call all of its handlers with - the event name as the first argument and all other - positional and keyword arguments passed to this - function after that. Returns once all events have - completed execution - """ - - async with structio.create_pool() as pool: - for func in _events[evt]: - pool.spawn(partial(func, evt, *args, **kwargs)) - - -def register_event(evt: str, func: Callable[[Any, Any], Coroutine[Any, Any, Any]]): - """ - Register the given async function for the given event name. - Note that if the given async function is already registered - for the chosen event, it will be called once for each time - this function is called once the associated event is fired - """ - - _events[evt].append(func) - - -def unregister_event(evt: str, func: Callable[[Any, Any], Coroutine[Any, Any, Any]]): - """ - Unregisters the given async function from the given event. - Nothing happens if the given event or async functions are - not registered yet - """ - - try: - _events[evt].remove(func) - except IndexError: - pass - - -def on_event(evt: str): - """ - Convenience decorator to - register async functions - to events - """ - - def decorator(f): - @wraps - def wrapper(*args, **kwargs): - f(*args, **kwargs) - - register_event(evt, f) - return wrapper - - return decorator + await checkpoint() \ No newline at end of file diff --git a/tests/echo_server.py b/tests/echo_server.py index 3581692..696cec6 100644 --- a/tests/echo_server.py +++ b/tests/echo_server.py @@ -23,7 +23,7 @@ async def serve(bind_address: tuple): try: conn, address_tuple = await sock.accept() logging.info(f"{address_tuple[0]}:{address_tuple[1]} connected") - await ctx.spawn(handler, conn, address_tuple) + ctx.spawn(handler, conn, address_tuple) except Exception as err: # Because exceptions just *work* logging.info( diff --git a/tests/processes.py b/tests/processes.py index 39d6acd..8b98460 100644 --- a/tests/processes.py +++ b/tests/processes.py @@ -3,11 +3,15 @@ import subprocess import shlex # In the interest of compatibility, structio.parallel -# tries to mirror the subprocess module. You can even -# pass the constants such as DEVNULL, PIPE, etc. to it -# and it'll work +# tries to be compatible with the subprocess module. You +# can even pass the constants such as DEVNULL, PIPE, etc. +# to it and it'll work +# TODO: Implement a higher-level multiprocessing module that does not expose the +# internal Process object directly so that there's no chance of leaving dangling +# processes running in the background (unless explicitly desired, of course) + async def main(data: str): cmd = shlex.split("python -c 'print(input())'") to_send = data.encode(errors="ignore") @@ -20,14 +24,24 @@ async def main(data: str): out = out.decode().rstrip("\r").rstrip("\r\n").rstrip("\n") assert out == data # Other, other option :D - process = structio.parallel.Popen( + process = structio.parallel.Process( cmd, stdin=subprocess.PIPE, stdout=subprocess.PIPE ) - # Note that the process is spawned as soon as the object is - # created! + # Note that the process is spawned only after we run start()! + await process.start() out, _ = await process.communicate(to_send) out = out.decode().rstrip("\r").rstrip("\r\n").rstrip("\n") assert out == data + # We can also use process pools to spawn multiple processes + # at the same time while limiting resource usage + pool = structio.AsyncProcessPool(4) # Max. 4 processes at a time + for i in range(0, (4 * 5) + 1): # We iterate 21 instead of 20 times: this is important to the test! + # This will stop every 4 iterations + await pool.submit(shlex.split(f"""python -c 'print({i}); __import__("time").sleep(1)'""")) + # Since we're exiting the program right after the last iteration, we need + # to wait for the pool to complete. + await pool.close() + await pool.wait() structio.run(main, "owo") diff --git a/tests/smart_events.py b/tests/smart_events.py deleted file mode 100644 index 051d083..0000000 --- a/tests/smart_events.py +++ /dev/null @@ -1,25 +0,0 @@ -import structio -from functools import partial - - -@structio.on_event("on_message") -async def test(evt, *args, **kwargs): - print(f"[test] New event {evt!r} with arguments: {args}, {kwargs}") - # Simulate some work - await structio.sleep(1) - - -async def main(): - print("[main] Firing two events synchronously") - t = structio.clock() - await structio.emit("on_message", 1, 2, 3, **{"foo": "bar"}) - await structio.emit("on_message", 1, 2, 4, **{"foo": "baz"}) - print(f"[main] Done in {structio.clock() - t:.2f} seconds. Firing two events in parallel") - t = structio.clock() - async with structio.create_pool() as pool: - pool.spawn(partial(structio.emit, "on_message", 1, 2, 3, **{"foo": "bar"})) - pool.spawn(partial(structio.emit, "on_message", 1, 2, 4, **{"foo": "baz"})) - print(f"[main] Done in {structio.clock() - t:.2f} seconds") - - -structio.run(main)