Further work on streams and multiprocessing

This commit is contained in:
Mattia Giambirtone 2023-06-12 11:42:07 +02:00 committed by nocturn9x
parent 2da89cf138
commit f9e56cffc4
Signed by: nocturn9x
GPG Key ID: 8270F9F467971E59
3 changed files with 18 additions and 50 deletions

View File

@ -1,3 +1,5 @@
import io
import os
from abc import abstractmethod, ABC
from structio.core.task import Task
from structio.exceptions import StructIOException
@ -65,26 +67,7 @@ class StreamWriter(AsyncResource, ABC):
"""
@abstractmethod
async def write_all(self, data: bytes):
"""
Write the given data onto the stream,
possibly blocking
"""
return NotImplemented
@abstractmethod
async def wait_for_write(self):
"""
Wait until the underlying resource is
ready to be written on. Implementations
of this method should try their best not
to return until the underlying resource is
known to be ready, but it's not guaranteed
that a call to write_all() will not block
after calling this method
"""
async def write(self, data):
return NotImplemented
@ -97,34 +80,7 @@ class StreamReader(AsyncResource, ABC):
"""
@abstractmethod
async def read_some(self, max_size: int | None = None) -> bytes:
"""
Read up to max_size bytes from the underlying
resource and return it. When max_size is None,
implementors should pick a reasonable default.
Returns b"" iff the stream has reached end-of-file
"""
async def __aiter__(self):
return self
async def __anext__(self):
if not (data := await self.read_some()):
raise StopAsyncIteration()
return data
@abstractmethod
async def wait_for_read(self):
"""
Wait until the underlying resource is
ready to be read from. Implementations
of this method should try their best not
to return until the underlying resource is
known to be ready, but it's not guaranteed
that a call to read_some() will not block
after calling this method
"""
async def _read(self, size: int = -1):
return NotImplemented
@ -133,6 +89,13 @@ class Stream(StreamReader, StreamWriter, ABC):
A generic, asynchronous, readable/writable binary stream
"""
def __init__(self, f):
if isinstance(f, io.TextIOBase):
raise TypeError("only binary files can be streamed")
self.fileobj = f
self.buf = bytearray()
os.set_blocking(self.fileobj.fileno(), False)
@abstractmethod
async def flush(self):
"""

View File

@ -1,9 +1,12 @@
import io
import os
import sys
import structio
from functools import partial
from structio.abc import AsyncResource
from structio.core.syscalls import check_cancelled
from structio.abc import AsyncResource, Stream
from structio.core.syscalls import check_cancelled, wait_writable, wait_readable, checkpoint
from structio.io import WantRead, WantWrite
from structio.exceptions import ResourceClosed
# Stolen from Trio
_FILE_SYNC_ATTRS = {

View File

@ -32,6 +32,8 @@ class Popen:
os.set_blocking(stdin.fileno(), True)
# Delegate to Popen's constructor
self._process = subprocess.Popen(*args, **kwargs)
if self._process.stdin:
self.stdin = None
def __getattr__(self, item):
# Delegate to internal process object