Preparation for I/O layer
This commit is contained in:
parent
4d709f0e86
commit
bb52655582
|
@ -1,5 +1,5 @@
|
|||
from collections import defaultdict
|
||||
from structio.abc import BaseIOManager, AsyncResource, BaseKernel
|
||||
from structio.abc import BaseIOManager, BaseKernel
|
||||
from structio.core.context import Task
|
||||
from structio.core.run import current_loop, current_task
|
||||
import select
|
||||
|
@ -16,13 +16,14 @@ class SimpleIOManager(BaseIOManager):
|
|||
Public object constructor
|
||||
"""
|
||||
|
||||
self.readers: dict[AsyncResource, Task] = {}
|
||||
self.writers: dict[AsyncResource, Task] = {}
|
||||
# Maps resources to tasks
|
||||
self.readers = {}
|
||||
self.writers = {}
|
||||
# This allows us to have a bidirectional mapping:
|
||||
# we know both which tasks are using which resources
|
||||
# and which resources are used by which tasks,
|
||||
# without having to go through too many hoops and jumps.
|
||||
self.tasks: dict[Task, list[AsyncResource]] = defaultdict(list)
|
||||
self.tasks: dict[Task, list] = defaultdict(list)
|
||||
|
||||
def pending(self):
|
||||
# We don't return bool(self.resources) because there is
|
||||
|
@ -30,7 +31,7 @@ class SimpleIOManager(BaseIOManager):
|
|||
# write, even if there's dangling resources around!
|
||||
return bool(self.readers or self.writers)
|
||||
|
||||
def _collect_readers(self) -> list[AsyncResource]:
|
||||
def _collect_readers(self) -> list:
|
||||
"""
|
||||
Collects all resources that need to be read from,
|
||||
so we can select() on them later
|
||||
|
@ -41,7 +42,7 @@ class SimpleIOManager(BaseIOManager):
|
|||
result.append(resource)
|
||||
return result
|
||||
|
||||
def _collect_writers(self) -> list[AsyncResource]:
|
||||
def _collect_writers(self) -> list:
|
||||
"""
|
||||
Collects all resources that need to be written to,
|
||||
so we can select() on them later
|
||||
|
@ -61,15 +62,15 @@ class SimpleIOManager(BaseIOManager):
|
|||
for write_ready in writable:
|
||||
kernel.reschedule(self.writers[write_ready])
|
||||
|
||||
def request_read(self, rsc: AsyncResource):
|
||||
def request_read(self, rsc):
|
||||
task = current_task()
|
||||
self.readers[rsc] = task
|
||||
|
||||
def request_write(self, rsc: AsyncResource):
|
||||
def request_write(self, rsc):
|
||||
task = current_task()
|
||||
self.writers[rsc] = task
|
||||
|
||||
def release(self, resource: AsyncResource):
|
||||
def release(self, resource):
|
||||
self.readers.pop(resource, None)
|
||||
self.writers.pop(resource, None)
|
||||
|
||||
|
|
|
@ -30,4 +30,11 @@ class ResourceClosed(StructIOException):
|
|||
"""
|
||||
Raised when an asynchronous resource is
|
||||
closed and no longer usable
|
||||
"""
|
||||
|
||||
|
||||
class ResourceBusy(StructIOException):
|
||||
"""
|
||||
Raised when an attempt is made to use an
|
||||
asynchronous resource that is currently busy
|
||||
"""
|
|
@ -84,8 +84,6 @@ class AsyncResourceWrapper(AsyncResource):
|
|||
if name in _FILE_ASYNC_METHODS:
|
||||
meth = getattr(self.handle, name)
|
||||
|
||||
# No async_wraps (honestly who cares)
|
||||
#@async_wraps(self.__class__, self._wrapped.__class__, name)
|
||||
async def wrapper(*args, **kwargs):
|
||||
func = partial(meth, *args, **kwargs)
|
||||
return await structio.thread.run_in_worker(func)
|
||||
|
|
|
@ -1,21 +1,13 @@
|
|||
# Module inspired by subprocess which allows for asynchronous
|
||||
# multiprocessing
|
||||
from dataclasses import dataclass, field
|
||||
from structio.abc import StreamWriter, StreamReader
|
||||
|
||||
|
||||
@dataclass
|
||||
class Process:
|
||||
"""
|
||||
An asynchronous process
|
||||
"""
|
||||
|
||||
args: str | list
|
||||
pid: int
|
||||
stdin: StreamWriter | None = field(default=None)
|
||||
stdout: StreamReader | None = field(default=None)
|
||||
stderr: StreamReader | None = field(default=None)
|
||||
returncode: int | None = field(default=None)
|
||||
# TODO
|
||||
|
||||
|
||||
|
||||
|
|
|
@ -177,7 +177,7 @@ async def _wait_for_thread(events, results: AsyncThreadQueue, evt: AsyncThreadEv
|
|||
pool.scope.shielded = not cancellable
|
||||
# Spawn a coroutine to process incoming requests from
|
||||
# the new async thread
|
||||
waiter = pool.spawn(_async_waiter, events, results)
|
||||
pool.spawn(_async_waiter, events, results)
|
||||
# Wait for the thread to terminate
|
||||
await evt.wait()
|
||||
# Worker thread has exited: we no longer need to process any
|
||||
|
@ -187,7 +187,9 @@ async def _wait_for_thread(events, results: AsyncThreadQueue, evt: AsyncThreadEv
|
|||
|
||||
@enable_ki_protection
|
||||
async def _async_runner(f, cancellable: bool = False, *args):
|
||||
# Thread termination event
|
||||
evt = AsyncThreadEvent()
|
||||
|
||||
queue = AsyncThreadQueue(1)
|
||||
# Request queue
|
||||
rq = AsyncThreadQueue(0)
|
||||
|
@ -198,7 +200,6 @@ async def _async_runner(f, cancellable: bool = False, *args):
|
|||
name="structio-worker-thread", daemon=cancellable)
|
||||
th.start()
|
||||
success, data = await queue.get()
|
||||
await rsq.put(None)
|
||||
if success:
|
||||
return data
|
||||
raise data
|
||||
|
|
|
@ -0,0 +1,27 @@
|
|||
from structio.exceptions import ResourceBusy
|
||||
|
||||
|
||||
# Yes, I stole trio's idea of the ConflictDetector class. Shut up
|
||||
class ThereCanBeOnlyOne:
|
||||
"""
|
||||
A simple context manager that raises an error when
|
||||
an attempt is made to acquire it from more than one
|
||||
task at a time. Can be used to protect sections of
|
||||
code handling some async resource that would need locking
|
||||
if they were allowed to be called from more than one task
|
||||
at a time, but that should never happen (for example, if you
|
||||
try to do call await send() on a socket from two different
|
||||
tasks at the same time)
|
||||
"""
|
||||
|
||||
def __init__(self, msg: str):
|
||||
self._acquired = False
|
||||
self.msg = msg
|
||||
|
||||
def __enter__(self):
|
||||
if self._acquired:
|
||||
raise ResourceBusy(self.msg)
|
||||
self._acquired = True
|
||||
|
||||
def __exit__(self, *args):
|
||||
self._acquired = False
|
Loading…
Reference in New Issue