From f9e56cffc4b0f0ca2ccab8b7f54e7b5aa4b62b14 Mon Sep 17 00:00:00 2001 From: Nocturn9x Date: Mon, 12 Jun 2023 11:42:07 +0200 Subject: [PATCH] Further work on streams and multiprocessing --- structio/abc.py | 59 +++++++++----------------------------------- structio/io/files.py | 7 ++++-- structio/parallel.py | 2 ++ 3 files changed, 18 insertions(+), 50 deletions(-) diff --git a/structio/abc.py b/structio/abc.py index 4cd24f0..97fd04e 100644 --- a/structio/abc.py +++ b/structio/abc.py @@ -1,3 +1,5 @@ +import io +import os from abc import abstractmethod, ABC from structio.core.task import Task from structio.exceptions import StructIOException @@ -65,26 +67,7 @@ class StreamWriter(AsyncResource, ABC): """ @abstractmethod - async def write_all(self, data: bytes): - """ - Write the given data onto the stream, - possibly blocking - """ - - return NotImplemented - - @abstractmethod - async def wait_for_write(self): - """ - Wait until the underlying resource is - ready to be written on. Implementations - of this method should try their best not - to return until the underlying resource is - known to be ready, but it's not guaranteed - that a call to write_all() will not block - after calling this method - """ - + async def write(self, data): return NotImplemented @@ -97,34 +80,7 @@ class StreamReader(AsyncResource, ABC): """ @abstractmethod - async def read_some(self, max_size: int | None = None) -> bytes: - """ - Read up to max_size bytes from the underlying - resource and return it. When max_size is None, - implementors should pick a reasonable default. - Returns b"" iff the stream has reached end-of-file - """ - - async def __aiter__(self): - return self - - async def __anext__(self): - if not (data := await self.read_some()): - raise StopAsyncIteration() - return data - - @abstractmethod - async def wait_for_read(self): - """ - Wait until the underlying resource is - ready to be read from. Implementations - of this method should try their best not - to return until the underlying resource is - known to be ready, but it's not guaranteed - that a call to read_some() will not block - after calling this method - """ - + async def _read(self, size: int = -1): return NotImplemented @@ -133,6 +89,13 @@ class Stream(StreamReader, StreamWriter, ABC): A generic, asynchronous, readable/writable binary stream """ + def __init__(self, f): + if isinstance(f, io.TextIOBase): + raise TypeError("only binary files can be streamed") + self.fileobj = f + self.buf = bytearray() + os.set_blocking(self.fileobj.fileno(), False) + @abstractmethod async def flush(self): """ diff --git a/structio/io/files.py b/structio/io/files.py index 85e00fc..9680bf4 100644 --- a/structio/io/files.py +++ b/structio/io/files.py @@ -1,9 +1,12 @@ import io +import os import sys import structio from functools import partial -from structio.abc import AsyncResource -from structio.core.syscalls import check_cancelled +from structio.abc import AsyncResource, Stream +from structio.core.syscalls import check_cancelled, wait_writable, wait_readable, checkpoint +from structio.io import WantRead, WantWrite +from structio.exceptions import ResourceClosed # Stolen from Trio _FILE_SYNC_ATTRS = { diff --git a/structio/parallel.py b/structio/parallel.py index bda8db9..eed266e 100644 --- a/structio/parallel.py +++ b/structio/parallel.py @@ -32,6 +32,8 @@ class Popen: os.set_blocking(stdin.fileno(), True) # Delegate to Popen's constructor self._process = subprocess.Popen(*args, **kwargs) + if self._process.stdin: + self.stdin = None def __getattr__(self, item): # Delegate to internal process object