Various socket bugfixes. Initial communication layer for remote async python processes
This commit is contained in:
parent
e6f3f9b8eb
commit
e4f50fe95a
|
@ -1 +1,2 @@
|
|||
sniffio==1.3.0
|
||||
sniffio==1.3.0
|
||||
msgpack==1.0.8
|
|
@ -0,0 +1,69 @@
|
|||
"""
|
||||
Helper module to spawn asynchronous Python processes via
|
||||
structio
|
||||
"""
|
||||
|
||||
import sys
|
||||
import struct
|
||||
import structio
|
||||
from structio.exceptions import ResourceBroken
|
||||
import msgpack
|
||||
|
||||
|
||||
async def send_message(sock: structio.AsyncSocket, payload: dict):
|
||||
"""
|
||||
Sends a payload to the remote event loop
|
||||
"""
|
||||
|
||||
data = msgpack.dumps(payload)
|
||||
await sock.send_all(struct.pack("Q", len(data)))
|
||||
await sock.send_all(data)
|
||||
|
||||
|
||||
async def receive_message(sock: structio.AsyncSocket) -> dict:
|
||||
"""
|
||||
Wait for a message from the control process
|
||||
"""
|
||||
|
||||
data = await sock.receive(8)
|
||||
if not data:
|
||||
raise ResourceBroken("socket was closed unexpectedly by remote loop")
|
||||
size, *_ = struct.unpack("Q", data)
|
||||
return msgpack.unpackb(await sock.receive_exactly(size))
|
||||
|
||||
|
||||
async def dispatch(sock: structio.AsyncSocket, message: dict):
|
||||
"""
|
||||
Dispatches commands received from the remote event loop
|
||||
|
||||
:param sock: The socket used to communicate with the remote
|
||||
event loop
|
||||
:param message: The message received from the remote event
|
||||
loop
|
||||
"""
|
||||
|
||||
match message["msg"]:
|
||||
case "HELO":
|
||||
# SOS: Start of session
|
||||
print("Received SOS from remote event loop")
|
||||
await send_message(sock, {"ok": True, "msg": "ACK"})
|
||||
case "CYA":
|
||||
# EOS: End of session (aka the process can and should exit)
|
||||
print("Received EOS from remote event, shutting down")
|
||||
await send_message(sock, {"ok": True, "msg": "ACK"})
|
||||
sys.exit(0)
|
||||
case _:
|
||||
print(f"Unknown message type {message['msg']!r}: {message}")
|
||||
await send_message(sock, {"ok": False, "msg": "ACK", "error": f"unknown message type {message['msg']!r}"})
|
||||
|
||||
|
||||
async def main(addr: tuple[str, int]):
|
||||
socket = structio.socket.socket()
|
||||
# Connect to the remote event loop
|
||||
await socket.connect(addr)
|
||||
async with socket:
|
||||
while True:
|
||||
await dispatch(socket, await receive_message(socket))
|
||||
|
||||
|
||||
structio.run(main, (sys.argv[1], int(sys.argv[2])))
|
|
@ -47,7 +47,7 @@ class FIFOKernel(BaseKernel):
|
|||
)
|
||||
# Tasks that are ready to run
|
||||
self.run_queue: deque[Task] = deque()
|
||||
self.shutdown_tasks: list[tuple[int, Callable[[Any, Any], Coroutine[Any, Any, Any]], list[Any]]] = []
|
||||
self.shutdown_tasks: deque[tuple[int, Callable[[Any, Any], Coroutine[Any, Any, Any]], list[Any]]] = deque()
|
||||
self._shutdown_task_ident = count(0)
|
||||
# Data to send back to tasks
|
||||
self.data: dict[Task, Any] = {}
|
||||
|
@ -221,7 +221,6 @@ class FIFOKernel(BaseKernel):
|
|||
def throw(self, task: Task, err: BaseException):
|
||||
if task.done():
|
||||
return
|
||||
self.release(task)
|
||||
self.handle_errors(partial(task.coroutine.throw, err), task)
|
||||
|
||||
def reschedule(self, task: Task):
|
||||
|
@ -229,12 +228,12 @@ class FIFOKernel(BaseKernel):
|
|||
return
|
||||
self.run_queue.append(task)
|
||||
|
||||
def check_cancelled(self):
|
||||
def check_cancelled(self, schedule: bool = True):
|
||||
if self._sigint_handled:
|
||||
self.throw(self.entry_point, KeyboardInterrupt())
|
||||
elif self.current_task.pending_cancellation:
|
||||
self.cancel_task(self.current_task)
|
||||
else:
|
||||
elif schedule:
|
||||
# We reschedule the caller immediately!
|
||||
self.run_queue.appendleft(self.current_task)
|
||||
|
||||
|
@ -257,7 +256,7 @@ class FIFOKernel(BaseKernel):
|
|||
# If sleep is called with 0 as argument,
|
||||
# then it's just a checkpoint!
|
||||
self.schedule_point()
|
||||
self.check_cancelled()
|
||||
self.check_cancelled(schedule=False)
|
||||
|
||||
def check_scopes(self):
|
||||
expired = set()
|
||||
|
@ -307,10 +306,21 @@ class FIFOKernel(BaseKernel):
|
|||
while not self.done():
|
||||
self._tick()
|
||||
if self.entry_point.state == TaskState.FINISHED:
|
||||
for _, func, args in self.shutdown_tasks:
|
||||
self.spawn_system_task(func, *args)
|
||||
while not self.done():
|
||||
self._tick()
|
||||
while True:
|
||||
# Spawn all the shutdown tasks that are currently registered
|
||||
while self.shutdown_tasks:
|
||||
# Shutdown tasks can be added even while the
|
||||
# event loop is shutting down!
|
||||
_, func, args = self.shutdown_tasks.popleft()
|
||||
self.spawn_system_task(func, *args)
|
||||
# Run all the shutdown tasks to completion
|
||||
while not self.done():
|
||||
self._tick()
|
||||
# Were more shutdown tasks scheduled? If not, we're done.
|
||||
# Otherwise, we'll run them at the next iteration
|
||||
if not self.shutdown_tasks:
|
||||
break
|
||||
|
||||
self.io_manager.close()
|
||||
self.close()
|
||||
|
||||
|
@ -408,11 +418,11 @@ class FIFOKernel(BaseKernel):
|
|||
self.close(force=True)
|
||||
raise task.exc from StructIOException(f"system task {task} crashed")
|
||||
scope = task.pool.scope
|
||||
if task is not scope.owner:
|
||||
self.reschedule(scope.owner)
|
||||
self.cancel_scope(scope)
|
||||
self.throw(scope.owner, task.exc)
|
||||
self.release(task)
|
||||
self.cancel_scope(scope)
|
||||
if task is not scope.owner:
|
||||
self.throw(scope.owner, task.exc)
|
||||
self.reschedule(scope.owner)
|
||||
|
||||
def on_cancel(self, task: Task):
|
||||
"""
|
||||
|
@ -509,8 +519,8 @@ class FIFOKernel(BaseKernel):
|
|||
return ident
|
||||
|
||||
def remove_shutdown_task(self, ident: int) -> bool:
|
||||
for i, (task_id, func, args) in enumerate(self.shutdown_tasks):
|
||||
for task_id, func, args in self.shutdown_tasks:
|
||||
if ident == task_id:
|
||||
self.shutdown_tasks.pop(i)
|
||||
self.shutdown_tasks.remove((task_id, func, args))
|
||||
return True
|
||||
return False
|
||||
|
|
|
@ -1,15 +1,12 @@
|
|||
import itertools
|
||||
import time
|
||||
|
||||
import warnings
|
||||
|
||||
import structio
|
||||
from structio.abc import BaseIOManager, BaseKernel
|
||||
from structio.core.task import Task, TaskState
|
||||
from structio.util.ki import CTRLC_PROTECTION_ENABLED
|
||||
from structio.core.run import current_loop, current_task
|
||||
from structio.io import FdWrapper
|
||||
import select
|
||||
import signal
|
||||
|
||||
|
||||
class SimpleIOManager(BaseIOManager):
|
||||
|
@ -80,16 +77,17 @@ class SimpleIOManager(BaseIOManager):
|
|||
elif deadline > 0:
|
||||
deadline -= current_time
|
||||
deadline = max(0, deadline)
|
||||
current = time.time()
|
||||
current = kernel.clock.current_time()
|
||||
readers = self._collect_readers()
|
||||
writers = self._collect_writers()
|
||||
kernel.event("before_io", deadline)
|
||||
readable, writable, exceptional = select.select(
|
||||
self._collect_readers(),
|
||||
writers,
|
||||
readers,
|
||||
writers,
|
||||
writers + readers,
|
||||
deadline,
|
||||
)
|
||||
kernel.event("after_io", time.time() - current)
|
||||
kernel.event("after_io", kernel.clock.current_time() - current)
|
||||
# On Windows, a successful connection is marked
|
||||
# as an exceptional event rather than a write
|
||||
# one
|
||||
|
|
|
@ -65,9 +65,8 @@ async def wrap_socket_with_ssl(
|
|||
# blocks the entire event loop
|
||||
raw_ssl = context.wrap_socket(sock, *args, do_handshake_on_connect=False, **kwargs)
|
||||
wrapped = AsyncSocket(raw_ssl, do_handshake_on_connect=do_handshake_on_connect)
|
||||
if raw_ssl._connected:
|
||||
wrapped.connected = True
|
||||
if wrapped.do_handshake_on_connect and wrapped.connected:
|
||||
wrapped.socket: _ssl.SSLSocket
|
||||
if wrapped.do_handshake_on_connect and wrapped.socket._connected:
|
||||
await wrapped.do_handshake()
|
||||
return wrapped
|
||||
|
||||
|
@ -82,7 +81,10 @@ def socketpair(
|
|||
if family is None and platform.system() == "Windows":
|
||||
family = _socket.AF_INET
|
||||
a, b = _socket.socketpair(family, type, proto)
|
||||
return AsyncSocket(a), AsyncSocket(b)
|
||||
result = AsyncSocket(a), AsyncSocket(b)
|
||||
result[0].connected = True
|
||||
result[1].connected = True
|
||||
return result
|
||||
|
||||
|
||||
@wraps(_socket.getaddrinfo)
|
||||
|
@ -264,14 +266,14 @@ async def connect_tcp_socket(
|
|||
# FIXME: Could this block forever? I mean, maybe it's not
|
||||
# such a huge deal since you can always wrap the call in
|
||||
# a timeout or something, but it may be something worth
|
||||
#
|
||||
# investigating
|
||||
await sock.close()
|
||||
except BaseException as e:
|
||||
# Again, we shouldn't be ignoring
|
||||
# errors willy-nilly like that, but
|
||||
# hey beta software am I right?
|
||||
warnings.warn(
|
||||
f"Failed to close {sock!r} in call to connect_socket -> {type(e).__name__}: {e}"
|
||||
f"Failed to close {sock!r} in call to connect_tcp_socket -> {type(e).__name__}: {e}"
|
||||
)
|
||||
continue
|
||||
if not successful:
|
||||
|
@ -329,7 +331,6 @@ class AsyncSocket(AsyncResource):
|
|||
self.do_handshake_on_connect = do_handshake_on_connect
|
||||
self.socket = sock
|
||||
self.socket.setblocking(False)
|
||||
self.connected: bool = False
|
||||
self.write_lock = structio.util.misc.ThereCanBeOnlyOne(
|
||||
"another task is writing on this socket"
|
||||
)
|
||||
|
@ -383,12 +384,13 @@ class AsyncSocket(AsyncResource):
|
|||
if self._fd == -1:
|
||||
raise ResourceClosed("I/O operation on closed socket")
|
||||
with self.write_lock, self.read_lock:
|
||||
while not self.connected:
|
||||
connected = False
|
||||
while not connected:
|
||||
try:
|
||||
self.socket.connect(address)
|
||||
if self.do_handshake_on_connect:
|
||||
await self.do_handshake()
|
||||
self.connected = True
|
||||
connected = True
|
||||
await checkpoint()
|
||||
except WantRead:
|
||||
await wait_readable(self._fd)
|
||||
|
@ -400,7 +402,7 @@ class AsyncSocket(AsyncResource):
|
|||
Wrapper socket method
|
||||
"""
|
||||
|
||||
if self.connected:
|
||||
if self._fd.fileno() != -1:
|
||||
# We set our own fd to -1
|
||||
# before calling any async
|
||||
# primitive so that any further
|
||||
|
|
|
@ -2,16 +2,22 @@
|
|||
Module inspired by subprocess which allows for asynchronous
|
||||
multiprocessing
|
||||
"""
|
||||
|
||||
import os
|
||||
import struct
|
||||
import sys
|
||||
|
||||
import structio
|
||||
import msgpack
|
||||
import platform
|
||||
import subprocess
|
||||
from itertools import count
|
||||
|
||||
from structio.io import FileStream
|
||||
from multiprocessing import cpu_count
|
||||
from structio import Semaphore, Queue
|
||||
from typing import Callable, Any, Coroutine
|
||||
from structio.core.syscalls import checkpoint
|
||||
from structio.exceptions import ResourceBroken, StructIOException
|
||||
from subprocess import CalledProcessError, CompletedProcess, DEVNULL, PIPE
|
||||
|
||||
if platform.system() == "Windows":
|
||||
|
@ -45,9 +51,10 @@ class Process:
|
|||
self.stderr = None
|
||||
self.returncode = None
|
||||
self.pid = -1
|
||||
self.shutdown_handlers: list[tuple[int, Callable[[Any, Any], Coroutine[Any, Any, Any]], args]] = []
|
||||
self.shutdown_handlers: list[tuple[int, bool, Callable[[Any, Any], Coroutine[Any, Any, Any]], args]] = []
|
||||
self._handler_id = count()
|
||||
self._taskid = None
|
||||
self._started = structio.Event()
|
||||
|
||||
async def terminate(self):
|
||||
"""
|
||||
|
@ -58,24 +65,29 @@ class Process:
|
|||
self._process.terminate, cancellable=True
|
||||
)
|
||||
|
||||
async def _run_shutdown_handlers(self):
|
||||
for _, coro, args in self.shutdown_handlers:
|
||||
async def _run_shutdown_handlers(self, before_wait: bool = False):
|
||||
for _, _, coro, args in filter(lambda h: h[1] is before_wait, self.shutdown_handlers):
|
||||
await coro(*args)
|
||||
|
||||
def add_shutdown_handler(self, func: Callable[[Any, Any], Coroutine[Any, Any, Any]], *args) -> int:
|
||||
def add_shutdown_handler(self, func: Callable[[Any, Any], Coroutine[Any, Any, Any]], *args,
|
||||
before_wait: bool = False) -> int:
|
||||
"""
|
||||
Registers a coroutine to be executed after the process terminates. Shutdown handlers
|
||||
are executed in the order in which they are registered. All positional arguments are
|
||||
passed to the given coroutine. Returns a unique identifier that can be used to remove
|
||||
the handler later
|
||||
Registers a coroutine to be executed either right after, or right before wait() is called.
|
||||
Shutdown handlers are executed one at a time in the order in which they are registered. All
|
||||
positional arguments are passed to the given coroutine. Returns a unique identifier that can
|
||||
be used to unregister the handler if necessary. Note that shutdown handlers are called only
|
||||
once (i.e. the first time wait() returns)
|
||||
|
||||
:param func: The coroutine function to add as a shutdown handler
|
||||
:param before_wait: If False, the default, the handler is executed after wait()
|
||||
returns. If True, it is executed right before calling it
|
||||
"""
|
||||
|
||||
handler_id = next(self._handler_id)
|
||||
self.shutdown_handlers.append((handler_id, func, args))
|
||||
self.shutdown_handlers.append((handler_id, before_wait, func, args))
|
||||
return handler_id
|
||||
|
||||
def start(self):
|
||||
def start(self, *, attached: bool = True):
|
||||
"""
|
||||
Begin execution of the process
|
||||
"""
|
||||
|
@ -88,7 +100,17 @@ class Process:
|
|||
self.stdout = FileStream(self._process.stdout)
|
||||
if self._process.stderr:
|
||||
self.stderr = FileStream(self._process.stderr)
|
||||
self._taskid = structio.current_loop().add_shutdown_task(self.wait)
|
||||
if attached:
|
||||
self._taskid = structio.current_loop().add_shutdown_task(self.wait)
|
||||
self._started.set()
|
||||
|
||||
async def wait_started(self):
|
||||
"""
|
||||
Wait until the process has started. If the process
|
||||
has already been started or exited, this is a no-op
|
||||
"""
|
||||
|
||||
await self._started.wait()
|
||||
|
||||
def detach(self):
|
||||
"""
|
||||
|
@ -100,7 +122,7 @@ class Process:
|
|||
"""
|
||||
|
||||
if self._taskid is None:
|
||||
raise structio.exceptions.StructIOException("process is not attached to an event loop")
|
||||
raise StructIOException("process is not attached to an event loop")
|
||||
structio.current_loop().remove_shutdown_task(self._taskid)
|
||||
self._taskid = None
|
||||
|
||||
|
@ -112,7 +134,7 @@ class Process:
|
|||
"""
|
||||
|
||||
if self._taskid is not None:
|
||||
raise structio.exceptions.StructIOException("process is already attached to an event loop")
|
||||
raise StructIOException("process is already attached to an event loop")
|
||||
self._taskid = structio.current_loop().add_shutdown_task(self.wait)
|
||||
|
||||
async def is_running(self):
|
||||
|
@ -122,9 +144,7 @@ class Process:
|
|||
|
||||
if self._process is None:
|
||||
return False
|
||||
elif self._process.poll() is None:
|
||||
return False
|
||||
return True
|
||||
return self._process.poll() is None
|
||||
|
||||
async def wait(self):
|
||||
"""
|
||||
|
@ -136,11 +156,12 @@ class Process:
|
|||
|
||||
status = self._process.poll()
|
||||
if status is None:
|
||||
await self._run_shutdown_handlers(before_wait=True)
|
||||
status = await structio.thread.run_in_worker(
|
||||
self._process.wait, cancellable=True
|
||||
)
|
||||
await self._run_shutdown_handlers()
|
||||
self.returncode = status
|
||||
await self._run_shutdown_handlers()
|
||||
return status
|
||||
|
||||
async def communicate(self, input=b"") -> tuple[bytes, bytes]:
|
||||
|
@ -287,3 +308,115 @@ class ProcessLimiter:
|
|||
# queue, so if this raises WouldBlock it's 100% a bug
|
||||
self.processes.put_noblock(process)
|
||||
return process
|
||||
|
||||
|
||||
class PythonProcess:
|
||||
"""
|
||||
Run a separate python process asynchronously
|
||||
"""
|
||||
|
||||
def __init__(self):
|
||||
self._sock = structio.socket.socket()
|
||||
self._remote: structio.AsyncSocket | None = None
|
||||
self._started = structio.Event()
|
||||
self.process: Process | None = None
|
||||
|
||||
async def send_message(self, data: dict):
|
||||
"""
|
||||
Sends a
|
||||
"""
|
||||
data = msgpack.dumps(data)
|
||||
await self._remote.send_all(struct.pack("Q", len(data)))
|
||||
await self._remote.send_all(data)
|
||||
await self._ensure_ack()
|
||||
|
||||
async def _do_setup(self):
|
||||
if not self._remote:
|
||||
await self._sock.bind(("127.0.0.1", 0))
|
||||
await self._sock.listen(1)
|
||||
addr, port = self._sock.getsockname()
|
||||
self.process = Process([sys.executable, "-m", "structio._child_process", addr, str(port)])
|
||||
# If we didn't close the socket before calling wait(), we'd deadlock!
|
||||
self.process.add_shutdown_handler(self.close, before_wait=True)
|
||||
self.process.start()
|
||||
await self.process.wait_started()
|
||||
self._started.set()
|
||||
sock, _addr = await self._sock.accept()
|
||||
self._remote = sock
|
||||
await self.send_sos()
|
||||
|
||||
async def _ensure_ack(self):
|
||||
payload = await self.receive_message()
|
||||
if payload["msg"] != "ACK":
|
||||
raise StructIOException(f"invalid message type {payload['msg']!r} received from process (expecting 'ACK')"
|
||||
f": {payload}")
|
||||
|
||||
async def send_sos(self):
|
||||
"""
|
||||
Sends a Start of Session command to the process, readying it for
|
||||
execution of arbitrary jobs
|
||||
"""
|
||||
|
||||
await self.send_message({"msg": "HELO"})
|
||||
|
||||
async def send_eos(self):
|
||||
"""
|
||||
Sends an End of Session command to the process, signaling to shut
|
||||
it down
|
||||
"""
|
||||
|
||||
await self.send_message({"msg": "CYA"})
|
||||
|
||||
async def close(self, graceful: bool = True):
|
||||
"""
|
||||
Terminate the remote process. If graceful equals
|
||||
True, the default, a graceful shutdown is attempted
|
||||
"""
|
||||
|
||||
if graceful:
|
||||
await self.send_eos()
|
||||
await self._remote.close()
|
||||
await self._sock.close()
|
||||
|
||||
async def _ensure_started(self):
|
||||
if not await self.is_running():
|
||||
raise StructIOException("process is not running")
|
||||
|
||||
async def receive_message(self):
|
||||
"""
|
||||
Wait for a message from the subprocess and
|
||||
return it
|
||||
"""
|
||||
|
||||
await self._ensure_started()
|
||||
data = await self._remote.receive(8)
|
||||
if not data:
|
||||
raise ResourceBroken("something went wrong when communicating with the remote process")
|
||||
size, *_ = struct.unpack("Q", data)
|
||||
return msgpack.unpackb(await self._remote.receive_exactly(size))
|
||||
|
||||
def start(self):
|
||||
# We both spawn this as a system task and schedule it as a shutdown
|
||||
# handler so that the remote process always has a chance to connect
|
||||
# to us if the entry point exits with no error but without waiting for
|
||||
# the process to terminate
|
||||
structio.current_loop().spawn_system_task(self._do_setup)
|
||||
structio.current_loop().add_shutdown_task(self._do_setup)
|
||||
|
||||
async def is_running(self):
|
||||
if not self.process:
|
||||
await checkpoint()
|
||||
return False
|
||||
return await self.process.is_running()
|
||||
|
||||
async def wait(self):
|
||||
await self._ensure_started()
|
||||
return await self.process.wait()
|
||||
|
||||
async def wait_started(self):
|
||||
# Can't use self.process.wait_started because it
|
||||
# is likely to be None until _do_setup runs to
|
||||
# completion
|
||||
return await self._started.wait()
|
||||
|
||||
|
||||
|
|
|
@ -495,4 +495,4 @@ class RLock(Lock):
|
|||
if self._acquire_count == 0:
|
||||
await super().release()
|
||||
else:
|
||||
await checkpoint()
|
||||
await checkpoint()
|
||||
|
|
|
@ -58,6 +58,19 @@ async def main_limiter():
|
|||
print(f"Submitted {i + 1} processes")
|
||||
|
||||
|
||||
async def main_python():
|
||||
print("[main] Starting python process test")
|
||||
# Spawns a new Python process
|
||||
p = structio.parallel.PythonProcess()
|
||||
p.start()
|
||||
# TODO: Allow for calling of arbitrary python objects in the spawned process, except bound methods
|
||||
# (which are tricky due to the shared state they carry), lambdas (they cannot looked up by name in
|
||||
# the newly created process and would need to be serialized, which is not the intended design) and
|
||||
# idk maybe a few others?
|
||||
print("[main] Pyhon process test complete")
|
||||
|
||||
|
||||
structio.run(main_simple, "owo")
|
||||
structio.run(main_limiter)
|
||||
structio.run(main_python)
|
||||
|
||||
|
|
Loading…
Reference in New Issue