Preliminary work on process pools. Remove smart events

This commit is contained in:
Mattia Giambirtone 2024-03-10 19:35:00 +01:00
parent 3a2edfba1f
commit 62b2f19527
8 changed files with 154 additions and 118 deletions

View File

@ -23,9 +23,6 @@ from structio.sync import (
Semaphore,
Lock,
RLock,
emit,
on_event,
register_event,
)
from structio.abc import Channel, Stream, ChannelReader, ChannelWriter
from structio.io import socket
@ -43,6 +40,8 @@ from structio.core.run import current_loop, current_task
from structio import thread, parallel
from structio.path import Path
from structio.signals import set_signal_handler, get_signal_handler
from structio.parallel import Process
from structio.pools import AsyncProcessPool
from structio import signals as _signals
from structio import util
@ -175,4 +174,7 @@ __all__ = [
"ResourceBusy",
"ResourceBroken",
"WouldBlock",
"Process",
"AsyncProcessPool",
"AsyncSocket",
]

View File

@ -3,6 +3,7 @@ import os
from abc import abstractmethod, ABC
from types import FrameType
import structio
from structio.core.task import Task
from structio.exceptions import StructIOException
from typing import Callable, Any, Coroutine
@ -136,6 +137,22 @@ class ChannelReader(AsyncResource, ABC):
return NotImplemented
def __aiter__(self):
"""
Implements asynchronous iteration
"""
return self
async def __anext__(self):
"""
Implements asynchronous iteration
"""
try:
return await self.receive()
except structio.ResourceClosed:
raise StopAsyncIteration()
@abstractmethod
def pending(self):
"""

View File

@ -1,5 +1,7 @@
"""Module inspired by subprocess which allows for asynchronous
multiprocessing"""
"""
Module inspired by subprocess which allows for asynchronous
multiprocessing
"""
import os
import structio
@ -7,6 +9,8 @@ import platform
import subprocess
from subprocess import CalledProcessError, CompletedProcess, DEVNULL, PIPE
from structio.io import FileStream
from structio.core.syscalls import checkpoint
if platform.system() == "Windows":
# Windows doesn't really support non-blocking file
@ -14,9 +18,9 @@ if platform.system() == "Windows":
from structio.io.files import AsyncFile as FileStream
class Popen:
class Process:
"""
Wrapper around subprocess.Popen, but async
Class similar to subprocess.Popen, but async
"""
def __init__(self, *args, **kwargs):
@ -30,23 +34,57 @@ class Popen:
if hasattr(os, "set_blocking"):
os.set_blocking(stdin.fileno(), True)
# Delegate to Popen's constructor
self._process: subprocess.Popen = subprocess.Popen(*args, **kwargs)
self._process: subprocess.Popen | None = None
self._args = args
self._kwargs = kwargs
self.stdin = None
self.stdout = None
self.stderr = None
self.returncode = None
self.pid = -1
async def start(self):
"""
Begin execution of the process. The reason this
method exists is to prevent the breaking of the
structured concurrency model
"""
self._process = subprocess.Popen(*self._args, **self._kwargs)
self.pid = self._process.pid
if self._process.stdin:
self.stdin = FileStream(self._process.stdin)
if self._process.stdout:
self.stdout = FileStream(self._process.stdout)
if self._process.stderr:
self.stderr = FileStream(self._process.stderr)
await checkpoint()
async def is_running(self):
"""
Returns whether the process is currently running
"""
if self._process is None:
return False
elif self._process.poll() is None:
return False
return True
async def wait(self):
"""
Async equivalent of subprocess.Popen.wait()
"""
if self._process is None:
raise RuntimeError("process is not running yet")
status = self._process.poll()
if status is None:
status = await structio.thread.run_in_worker(
self._process.wait, cancellable=True
)
self.returncode = status
return status
async def communicate(self, input=b"") -> tuple[bytes, bytes]:
@ -67,6 +105,7 @@ class Popen:
return out, err
async def __aenter__(self):
await self.start()
return self
async def __aexit__(self, *args):
@ -92,7 +131,7 @@ async def run(
if input:
stdin = subprocess.PIPE
async with Popen(
async with Process(
args, stdin=stdin, stdout=stdout, stderr=stderr, shell=shell
) as process:
try:
@ -101,10 +140,9 @@ async def run(
process.kill()
raise
status = process.poll()
if check and status:
raise CalledProcessError(status, process.args, output=stdout, stderr=stderr)
return CompletedProcess(process.args, status, stdout, stderr)
if check and process.returncode:
raise CalledProcessError(process.returncode, process.args, output=stdout, stderr=stderr)
return CompletedProcess(process.args, process.returncode, stdout, stderr)
async def check_output(args, *, stdin=None, stderr=None, shell=False, input=None):

View File

@ -1,19 +1,18 @@
"""
A module implementing worker pools
A module implementing worker process pools
"""
from multiprocessing import cpu_count
from typing import Callable, Any
from structio.parallel import Popen
from structio import Queue
import structio
from structio.parallel import Process
from structio import Semaphore, MemoryChannel
class AsyncProcessPool:
"""
A worker pool for asynchronous processes.
:param max_workers: The maximum number of concurrent
:param max_workers: The maximum number of parallel
worker processes that can be running at any given
time
"""
@ -23,12 +22,63 @@ class AsyncProcessPool:
Public object constructor
"""
self.processes: Queue = Queue(max_workers)
self._closing = False
self.processes: MemoryChannel = MemoryChannel(0)
self._sem = Semaphore(max_workers)
# We need this to be running in the background, so we
# create it as a system task
structio.current_loop().spawn_system_task(self._manage_processes)
async def _manage_process(self, process: Process):
await process.start()
await process.wait()
await self._sem.release()
async def _manage_processes(self):
async with structio.create_pool() as pool:
async for process in self.processes.reader:
pool.spawn(self._manage_process, process)
@property
def max_workers(self):
return self.processes.maxsize
return self._sem.max_size
async def submit(self, *args, **kwargs) -> Process:
"""
Submits work to the process pool. All positional
and keyword arguments are passed along to the constructor
of `structio.parallel.Process`. If there are max_workers
processes already running, the call blocks until a spot
is freed. The process object is returned
"""
await self._sem.acquire()
process = Process(*args, **kwargs)
try:
await self.processes.writer.send(process)
except structio.ResourceClosed:
await self._sem.release()
raise
return process
async def close(self):
"""
Close the process pool so that no more
work processes can be submitted. Currently
running processes will run to completion
"""
self._closing = True
await self.processes.close()
async def wait(self):
"""
Wait until the pool has no more work
to do. Must be called after close()
"""
if not self._closing:
raise RuntimeError("call close() first")
while self.processes.reader.pending():
await structio.sleep(0)
async def submit(self, fn: Callable[[Any, Any], Any], *args, block: bool = True):
if block:
await self.processes.put((fn, args))

View File

@ -224,6 +224,7 @@ class PriorityQueue(Queue):
def _put_item(self, item):
heappush(self.container, item)
class MemoryReceiveChannel(ChannelReader):
"""
An in-memory one-way channel to read
@ -498,65 +499,4 @@ class RLock(Lock):
if self._acquire_count == 0:
await super().release()
else:
await checkpoint()
_events: dict[str, list[Callable[[Any, Any], Coroutine[Any, Any, Any]]]] = defaultdict(
list
)
async def emit(evt: str, *args, **kwargs):
"""
Fire the event and call all of its handlers with
the event name as the first argument and all other
positional and keyword arguments passed to this
function after that. Returns once all events have
completed execution
"""
async with structio.create_pool() as pool:
for func in _events[evt]:
pool.spawn(partial(func, evt, *args, **kwargs))
def register_event(evt: str, func: Callable[[Any, Any], Coroutine[Any, Any, Any]]):
"""
Register the given async function for the given event name.
Note that if the given async function is already registered
for the chosen event, it will be called once for each time
this function is called once the associated event is fired
"""
_events[evt].append(func)
def unregister_event(evt: str, func: Callable[[Any, Any], Coroutine[Any, Any, Any]]):
"""
Unregisters the given async function from the given event.
Nothing happens if the given event or async functions are
not registered yet
"""
try:
_events[evt].remove(func)
except IndexError:
pass
def on_event(evt: str):
"""
Convenience decorator to
register async functions
to events
"""
def decorator(f):
@wraps
def wrapper(*args, **kwargs):
f(*args, **kwargs)
register_event(evt, f)
return wrapper
return decorator
await checkpoint()

View File

@ -23,7 +23,7 @@ async def serve(bind_address: tuple):
try:
conn, address_tuple = await sock.accept()
logging.info(f"{address_tuple[0]}:{address_tuple[1]} connected")
await ctx.spawn(handler, conn, address_tuple)
ctx.spawn(handler, conn, address_tuple)
except Exception as err:
# Because exceptions just *work*
logging.info(

View File

@ -3,11 +3,15 @@ import subprocess
import shlex
# In the interest of compatibility, structio.parallel
# tries to mirror the subprocess module. You can even
# pass the constants such as DEVNULL, PIPE, etc. to it
# and it'll work
# tries to be compatible with the subprocess module. You
# can even pass the constants such as DEVNULL, PIPE, etc.
# to it and it'll work
# TODO: Implement a higher-level multiprocessing module that does not expose the
# internal Process object directly so that there's no chance of leaving dangling
# processes running in the background (unless explicitly desired, of course)
async def main(data: str):
cmd = shlex.split("python -c 'print(input())'")
to_send = data.encode(errors="ignore")
@ -20,14 +24,24 @@ async def main(data: str):
out = out.decode().rstrip("\r").rstrip("\r\n").rstrip("\n")
assert out == data
# Other, other option :D
process = structio.parallel.Popen(
process = structio.parallel.Process(
cmd, stdin=subprocess.PIPE, stdout=subprocess.PIPE
)
# Note that the process is spawned as soon as the object is
# created!
# Note that the process is spawned only after we run start()!
await process.start()
out, _ = await process.communicate(to_send)
out = out.decode().rstrip("\r").rstrip("\r\n").rstrip("\n")
assert out == data
# We can also use process pools to spawn multiple processes
# at the same time while limiting resource usage
pool = structio.AsyncProcessPool(4) # Max. 4 processes at a time
for i in range(0, (4 * 5) + 1): # We iterate 21 instead of 20 times: this is important to the test!
# This will stop every 4 iterations
await pool.submit(shlex.split(f"""python -c 'print({i}); __import__("time").sleep(1)'"""))
# Since we're exiting the program right after the last iteration, we need
# to wait for the pool to complete.
await pool.close()
await pool.wait()
structio.run(main, "owo")

View File

@ -1,25 +0,0 @@
import structio
from functools import partial
@structio.on_event("on_message")
async def test(evt, *args, **kwargs):
print(f"[test] New event {evt!r} with arguments: {args}, {kwargs}")
# Simulate some work
await structio.sleep(1)
async def main():
print("[main] Firing two events synchronously")
t = structio.clock()
await structio.emit("on_message", 1, 2, 3, **{"foo": "bar"})
await structio.emit("on_message", 1, 2, 4, **{"foo": "baz"})
print(f"[main] Done in {structio.clock() - t:.2f} seconds. Firing two events in parallel")
t = structio.clock()
async with structio.create_pool() as pool:
pool.spawn(partial(structio.emit, "on_message", 1, 2, 3, **{"foo": "bar"}))
pool.spawn(partial(structio.emit, "on_message", 1, 2, 4, **{"foo": "baz"}))
print(f"[main] Done in {structio.clock() - t:.2f} seconds")
structio.run(main)