Minor fixes for Windows support and httpx compatibility

This commit is contained in:
Mattia Giambirtone 2023-09-04 19:08:09 +02:00
parent 51a5cd072a
commit 6b098b7c46
Signed by: nocturn9x
GPG Key ID: 8270F9F467971E59
3 changed files with 72 additions and 14 deletions

View File

@ -1,5 +1,6 @@
# Module inspired by subprocess which allows for asynchronous """Module inspired by subprocess which allows for asynchronous
# multiprocessing multiprocessing"""
import os import os
import structio import structio
import platform import platform
@ -8,7 +9,7 @@ from subprocess import CalledProcessError, CompletedProcess, DEVNULL, PIPE
from structio.io import FileStream from structio.io import FileStream
if platform.system() == "Windows": if platform.system() == "Windows":
# Windows doesn't really support non-blocking file # Windows doesn't really support non-blocking file
# descriptors, so we just use threads # descriptors (except sockets), so we just use threads
from structio.io.files import AsyncFile as FileStream from structio.io.files import AsyncFile as FileStream

View File

@ -52,7 +52,7 @@ class Event:
""" """
if self.is_set(): if self.is_set():
raise RuntimeError("the event has already been set") raise RuntimeError("this event has already been set: create a new Event object instead")
self._set = True self._set = True
for waiter in self._tasks: for waiter in self._tasks:
current_loop().reschedule(waiter) current_loop().reschedule(waiter)
@ -109,8 +109,8 @@ class Queue:
async def put(self, item: Any): async def put(self, item: Any):
""" """
Pushes an element onto the queue. If the Pushes an element onto the queue. If the
queue is full, waits until there's queue is full, waits until a slot is
enough space for the queue available
""" """
if self.maxsize and len(self.container) == self.maxsize: if self.maxsize and len(self.container) == self.maxsize:
@ -125,8 +125,7 @@ class Queue:
async def get(self) -> Any: async def get(self) -> Any:
""" """
Pops an element off the queue. Blocks until Pops an element off the queue. Blocks until
an element is put onto it again if the queue an element is put onto it if the queue is empty
is empty
""" """
if not self.container: if not self.container:
@ -191,6 +190,59 @@ class MemorySendChannel(ChannelWriter):
data data
""" """
def __init__(self, buffer: Queue):
self._buffer = buffer
self._closed = False
@enable_ki_protection
async def send(self, value):
if self._closed:
raise ResourceClosed("cannot operate on a closed channel")
await self._buffer.put(value)
@enable_ki_protection
async def close(self):
self._closed = True
await checkpoint()
def writers(self):
return len(self._buffer.putters)
class MemoryReceiveChannel(ChannelReader):
"""
An in-memory one-way channel to read
data
"""
def __init__(self, buffer):
self._buffer = buffer
self._closed = False
@enable_ki_protection
async def receive(self):
if self._closed:
raise ResourceClosed("cannot operate on a closed channel")
return await self._buffer.get()
@enable_ki_protection
async def close(self):
self._closed = True
await checkpoint()
def pending(self):
return bool(self._buffer)
def readers(self):
return len(self._buffer.getters)
class NetworkSendChannel(ChannelWriter):
"""
A socket-based one-way channel to send
data
"""
def __init__(self, buffer): def __init__(self, buffer):
self._buffer = buffer self._buffer = buffer
self._closed = False self._closed = False
@ -238,6 +290,7 @@ class MemoryReceiveChannel(ChannelReader):
return len(self._buffer.getters) return len(self._buffer.getters)
class MemoryChannel(Channel, MemorySendChannel, MemoryReceiveChannel): class MemoryChannel(Channel, MemorySendChannel, MemoryReceiveChannel):
""" """
An in-memory, two-way channel between An in-memory, two-way channel between
@ -245,10 +298,9 @@ class MemoryChannel(Channel, MemorySendChannel, MemoryReceiveChannel):
""" """
def __init__(self, buffer_size): def __init__(self, buffer_size):
self._buffer = Queue(buffer_size) self._send_buffer, self._receive_buffer = Queue(buffer_size)
super().__init__(self._buffer) self.reader = MemoryReceiveChannel(self._receive_buffer)
self.reader = MemoryReceiveChannel(self._buffer) self.writer = MemorySendChannel(self._send_buffer)
self.writer = MemorySendChannel(self._buffer)
@enable_ki_protection @enable_ki_protection
async def close(self): async def close(self):

View File

@ -1,9 +1,12 @@
import platform
import structio import structio
import subprocess import subprocess
import shlex import shlex
# In the interest of compatibility, structio.parallel
# tries to mirror the subprocess module. You can even
# pass the constants such as DEVNULL, PIPE, etc. to it
# and it'll work
async def main(data: str): async def main(data: str):
cmd = shlex.split("python -c 'print(input())'") cmd = shlex.split("python -c 'print(input())'")
@ -20,6 +23,8 @@ async def main(data: str):
process = structio.parallel.Popen( process = structio.parallel.Popen(
cmd, stdin=subprocess.PIPE, stdout=subprocess.PIPE cmd, stdin=subprocess.PIPE, stdout=subprocess.PIPE
) )
# Note that the process is spawned as soon as the object is
# created!
out, _ = await process.communicate(to_send) out, _ = await process.communicate(to_send)
out = out.decode().rstrip("\r").rstrip("\r\n").rstrip("\n") out = out.decode().rstrip("\r").rstrip("\r\n").rstrip("\n")
assert out == data assert out == data