From bfa69a6db2e000ca78236ba252cfb314a38f4e39 Mon Sep 17 00:00:00 2001 From: Nocturn9x Date: Mon, 22 May 2023 09:22:37 +0200 Subject: [PATCH] Added test for pathlib wrapper --- .idea/misc.xml | 2 +- structio/__init__.py | 104 ++++++++++++++--------- structio/abc.py | 17 ++-- structio/core/context.py | 14 +-- structio/core/kernels/fifo.py | 62 ++++++++++---- structio/core/managers/io/simple.py | 9 +- structio/core/managers/signals/sigint.py | 6 +- structio/core/run.py | 47 +++++++--- structio/core/task.py | 2 +- structio/core/time/clock.py | 1 - structio/core/tooling.py | 1 - structio/exceptions.py | 2 +- structio/io/files.py | 31 +++---- structio/io/socket.py | 1 - structio/parallel.py | 3 - structio/path.py | 103 ++++++++++++---------- structio/sync.py | 2 +- structio/thread.py | 63 +++++++++----- structio/util/ki.py | 1 - tests/files.py | 42 +++++---- 20 files changed, 319 insertions(+), 194 deletions(-) diff --git a/.idea/misc.xml b/.idea/misc.xml index df4a621..00cc026 100644 --- a/.idea/misc.xml +++ b/.idea/misc.xml @@ -1,4 +1,4 @@ - + \ No newline at end of file diff --git a/structio/__init__.py b/structio/__init__.py index ad4c107..aa64ccd 100644 --- a/structio/__init__.py +++ b/structio/__init__.py @@ -11,18 +11,36 @@ from structio.core import task from structio.core.task import Task, TaskState from structio.sync import Event, Queue, MemoryChannel, Semaphore, Lock, RLock from structio.abc import Channel, Stream, ChannelReader, ChannelWriter -from structio import thread -from structio.io.files import open_file, wrap_file, aprint, stdout, stderr, stdin, ainput +from structio.io.files import ( + open_file, + wrap_file, + aprint, + stdout, + stderr, + stdin, + ainput, +) from structio.core.run import current_loop, current_task +from structio import thread +from structio.path import Path -def run(func: Callable[[Any, Any], Coroutine[Any, Any, Any]], +def run( + func: Callable[[Any, Any], Coroutine[Any, Any, Any]], + *args, + restrict_ki_to_checkpoints: bool = False, + tools: list | None = None, +): + result = _run.run( + func, + FIFOKernel, + SimpleIOManager(), + [SigIntManager()], + DefaultClock(), + tools, + restrict_ki_to_checkpoints, *args, - restrict_ki_to_checkpoints: bool = False, - tools: list | None = None, - ): - result = _run.run(func, FIFOKernel, SimpleIOManager(), [SigIntManager()], DefaultClock(), tools, - restrict_ki_to_checkpoints, *args) + ) return result @@ -90,37 +108,39 @@ _cancel.__doc__ = Task.cancel.__doc__ Task.cancel = _cancel -__all__ = ["run", - "sleep", - "create_pool", - "clock", - "Cancelled", - "skip_after", - "with_timeout", - "Event", - "Queue", - "MemoryChannel", - "Channel", - "Stream", - "ChannelReader", - "ChannelWriter", - "Semaphore", - "TimedOut", - "Task", - "TaskState", - "TaskScope", - "TaskPool", - "ResourceClosed", - "Lock", - "RLock", - "thread", - "open_file", - "wrap_file", - "aprint", - "stderr", - "stdin", - "stdout", - "ainput", - "current_loop", - "current_task" - ] +__all__ = [ + "run", + "sleep", + "create_pool", + "clock", + "Cancelled", + "skip_after", + "with_timeout", + "Event", + "Queue", + "MemoryChannel", + "Channel", + "Stream", + "ChannelReader", + "ChannelWriter", + "Semaphore", + "TimedOut", + "Task", + "TaskState", + "TaskScope", + "TaskPool", + "ResourceClosed", + "Lock", + "RLock", + "thread", + "open_file", + "wrap_file", + "aprint", + "stderr", + "stdin", + "stdout", + "ainput", + "current_loop", + "current_task", + "Path", +] diff --git a/structio/abc.py b/structio/abc.py index d67d540..a1cba26 100644 --- a/structio/abc.py +++ b/structio/abc.py @@ -491,10 +491,14 @@ class BaseKernel(ABC): Abstract kernel base class """ - def __init__(self, clock: BaseClock, io_manager: BaseIOManager, - signal_managers: list[SignalManager], - tools: list[BaseDebugger] | None = None, - restrict_ki_to_checkpoints: bool = False): + def __init__( + self, + clock: BaseClock, + io_manager: BaseIOManager, + signal_managers: list[SignalManager], + tools: list[BaseDebugger] | None = None, + restrict_ki_to_checkpoints: bool = False, + ): self.clock = clock self.current_task: Task | None = None self.current_pool: "TaskPool" = None @@ -541,7 +545,9 @@ class BaseKernel(ABC): return NotImplemented @abstractmethod - def spawn_system_task(self, func: Callable[[Any, Any], Coroutine[Any, Any, Any]], *args): + def spawn_system_task( + self, func: Callable[[Any, Any], Coroutine[Any, Any, Any]], *args + ): """ Spawns a system task. System tasks run in a special internal task pool and begin execution in a scope shielded by cancellations @@ -730,4 +736,3 @@ class BaseKernel(ABC): if not self.done() and not force: raise StructIOException("the event loop is running") - diff --git a/structio/core/context.py b/structio/core/context.py index 856f19b..96be873 100644 --- a/structio/core/context.py +++ b/structio/core/context.py @@ -11,7 +11,12 @@ class TaskScope: A task scope """ - def __init__(self, timeout: int | float | None = None, silent: bool = False, shielded: bool = False): + def __init__( + self, + timeout: int | float | None = None, + silent: bool = False, + shielded: bool = False, + ): """ Public object constructor """ @@ -143,7 +148,9 @@ class TaskPool: return self.scope.done() - def spawn(self, func: Callable[[Any, Any], Coroutine[Any, Any, Any]], *args) -> Task: + def spawn( + self, func: Callable[[Any, Any], Coroutine[Any, Any, Any]], *args + ) -> Task: """ Schedule a new concurrent task for execution in the task pool from the given async function. All positional arguments are passed to the underlying coroutine @@ -153,6 +160,3 @@ class TaskPool: """ return current_loop().spawn(func, *args) - - - diff --git a/structio/core/kernels/fifo.py b/structio/core/kernels/fifo.py index 56d8e95..251aa1a 100644 --- a/structio/core/kernels/fifo.py +++ b/structio/core/kernels/fifo.py @@ -1,7 +1,13 @@ import traceback import warnings from types import FrameType -from structio.abc import BaseKernel, BaseClock, BaseDebugger, BaseIOManager, SignalManager +from structio.abc import ( + BaseKernel, + BaseClock, + BaseDebugger, + BaseIOManager, + SignalManager, +) from structio.core.context import TaskPool, TaskScope from structio.core.task import Task, TaskState from structio.util.ki import CTRLC_PROTECTION_ENABLED @@ -19,11 +25,17 @@ class FIFOKernel(BaseKernel): with a FIFO scheduling policy """ - def __init__(self, clock: BaseClock, io_manager: BaseIOManager, - signal_managers: list[SignalManager], - tools: list[BaseDebugger] | None = None, - restrict_ki_to_checkpoints: bool = False): - super().__init__(clock, io_manager, signal_managers, tools, restrict_ki_to_checkpoints) + def __init__( + self, + clock: BaseClock, + io_manager: BaseIOManager, + signal_managers: list[SignalManager], + tools: list[BaseDebugger] | None = None, + restrict_ki_to_checkpoints: bool = False, + ): + super().__init__( + clock, io_manager, signal_managers, tools, restrict_ki_to_checkpoints + ) self.skip: bool = False # Tasks that are ready to run self.run_queue: deque[Task] = deque() @@ -43,7 +55,12 @@ class FIFOKernel(BaseKernel): self._closing = False def get_closest_deadline(self): - return min([self.current_scope.get_actual_timeout(), self.paused.get_closest_deadline()]) + return min( + [ + self.current_scope.get_actual_timeout(), + self.paused.get_closest_deadline(), + ] + ) def get_closest_deadline_owner(self): return self.paused.peek() @@ -59,8 +76,9 @@ class FIFOKernel(BaseKernel): except BaseException as e: # We really can't afford to have our internals explode, # sorry! - warnings.warn(f"Exception during debugging event delivery ({evt_name!r}): {type(e).__name__} -> {e}", - ) + warnings.warn( + f"Exception during debugging event delivery ({evt_name!r}): {type(e).__name__} -> {e}", + ) traceback.print_tb(e.__traceback__) def done(self): @@ -83,7 +101,9 @@ class FIFOKernel(BaseKernel): self.event("on_task_spawn") return task - def spawn_system_task(self, func: Callable[[Any, Any], Coroutine[Any, Any, Any]], *args): + def spawn_system_task( + self, func: Callable[[Any, Any], Coroutine[Any, Any, Any]], *args + ): task = Task(func.__name__ or repr(func), func(*args), self.pool) task.coroutine.cr_frame.f_locals.setdefault(CTRLC_PROTECTION_ENABLED, True) task.pool.scope.tasks.append(task) @@ -109,7 +129,9 @@ class FIFOKernel(BaseKernel): if not self.run_queue: return self.current_task = self.run_queue.popleft() - runner = partial(self.current_task.coroutine.send, self.data.pop(self.current_task, None)) + runner = partial( + self.current_task.coroutine.send, self.data.pop(self.current_task, None) + ) if self.current_task.pending_cancellation: _runner = partial(self.current_task.coroutine.throw, Cancelled()) elif self._sigint_handled: @@ -122,8 +144,13 @@ class FIFOKernel(BaseKernel): # libraries, which most likely have different method names and behaviors # compared to us. If you get this exception, and you're 100% sure you're # not mixing async primitives from other libraries, then it's a bug! - self.throw(self.current_task, StructIOException("Uh oh! Something bad just happened: did you try to mix " - "primitives from other async libraries?")) + self.throw( + self.current_task, + StructIOException( + "Uh oh! Something bad just happened: did you try to mix " + "primitives from other async libraries?" + ), + ) # Sneaky method call, thanks to David Beazley for this ;) getattr(self, method)(*args, **kwargs) self.event("after_task_step", self.current_task) @@ -178,10 +205,15 @@ class FIFOKernel(BaseKernel): self.throw(scope.owner, error) def wakeup(self): - while self.paused and self.paused.peek().next_deadline <= self.clock.current_time(): + while ( + self.paused + and self.paused.peek().next_deadline <= self.clock.current_time() + ): task, _ = self.paused.get() task.next_deadline = 0 - self.event("after_sleep", task, task.paused_when - self.clock.current_time()) + self.event( + "after_sleep", task, task.paused_when - self.clock.current_time() + ) self.reschedule(task) def run(self): diff --git a/structio/core/managers/io/simple.py b/structio/core/managers/io/simple.py index 7913c6b..2b789cb 100644 --- a/structio/core/managers/io/simple.py +++ b/structio/core/managers/io/simple.py @@ -55,8 +55,12 @@ class SimpleIOManager(BaseIOManager): def wait_io(self): kernel: BaseKernel = current_loop() - readable, writable, _ = select.select(self._collect_readers(), self._collect_writers(), - [], kernel.get_closest_deadline()) + readable, writable, _ = select.select( + self._collect_readers(), + self._collect_writers(), + [], + kernel.get_closest_deadline(), + ) for read_ready in readable: kernel.reschedule(self.readers[read_ready]) for write_ready in writable: @@ -77,4 +81,3 @@ class SimpleIOManager(BaseIOManager): def release_task(self, task: Task): for resource in self.tasks[task]: self.release(resource) - diff --git a/structio/core/managers/signals/sigint.py b/structio/core/managers/signals/sigint.py index bcab1de..9c148f0 100644 --- a/structio/core/managers/signals/sigint.py +++ b/structio/core/managers/signals/sigint.py @@ -23,8 +23,10 @@ class SigIntManager(SignalManager): def install(self): if signal.getsignal(signal.SIGINT) != signal.default_int_handler: - warnings.warn(f"structio has detected a custom SIGINT handler and won't touch it: keep in mind" - f" this is likely to break KeyboardInterrupt delivery!") + warnings.warn( + f"structio has detected a custom SIGINT handler and won't touch it: keep in mind" + f" this is likely to break KeyboardInterrupt delivery!" + ) return signal.signal(signal.SIGINT, self.handle) self.installed = True diff --git a/structio/core/run.py b/structio/core/run.py index f5a7222..b77b38d 100644 --- a/structio/core/run.py +++ b/structio/core/run.py @@ -1,7 +1,13 @@ import inspect import functools from threading import local -from structio.abc import BaseKernel, BaseDebugger, BaseClock, SignalManager, BaseIOManager +from structio.abc import ( + BaseKernel, + BaseDebugger, + BaseClock, + SignalManager, + BaseIOManager, +) from structio.exceptions import StructIOException from structio.core.task import Task from typing import Callable, Any, Coroutine @@ -42,14 +48,21 @@ def new_event_loop(kernel: BaseKernel): _RUN.kernel = kernel else: if not _RUN.kernel.done(): - raise StructIOException("cannot be called from running async context") from None + raise StructIOException( + "cannot be called from running async context" + ) from None -def run(func: Callable[[Any, Any], Coroutine[Any, Any, Any]], - kernel: type, io_manager: BaseIOManager, signal_managers: list[SignalManager], - clock: BaseClock, tools: list[BaseDebugger] | None = None, - restrict_ki_to_checkpoints: bool = False, - *args): +def run( + func: Callable[[Any, Any], Coroutine[Any, Any, Any]], + kernel: type, + io_manager: BaseIOManager, + signal_managers: list[SignalManager], + clock: BaseClock, + tools: list[BaseDebugger] | None = None, + restrict_ki_to_checkpoints: bool = False, + *args, +): """ Starts the event loop from a synchronous entry point. All positional arguments are passed to the given coroutine @@ -58,7 +71,9 @@ def run(func: Callable[[Any, Any], Coroutine[Any, Any, Any]], """ if not issubclass(kernel, BaseKernel): - raise TypeError(f"kernel must be a subclass of {BaseKernel.__module__}.{BaseKernel.__qualname__}!") + raise TypeError( + f"kernel must be a subclass of {BaseKernel.__module__}.{BaseKernel.__qualname__}!" + ) check = func if isinstance(func, functools.partial): check = func.func @@ -68,8 +83,16 @@ def run(func: Callable[[Any, Any], Coroutine[Any, Any, Any]], "\nWhat you wanna do, instead, is this: structio.run(your_func, arg1, arg2, ...)" ) elif not inspect.iscoroutinefunction(check): - raise StructIOException("structio.run() requires an async function as its first argument!") - new_event_loop(kernel(clock=clock, restrict_ki_to_checkpoints=restrict_ki_to_checkpoints, - io_manager=io_manager, signal_managers=signal_managers, - tools=tools)) + raise StructIOException( + "structio.run() requires an async function as its first argument!" + ) + new_event_loop( + kernel( + clock=clock, + restrict_ki_to_checkpoints=restrict_ki_to_checkpoints, + io_manager=io_manager, + signal_managers=signal_managers, + tools=tools, + ) + ) return current_loop().start(func, *args) diff --git a/structio/core/task.py b/structio/core/task.py index 71960ff..a3939fe 100644 --- a/structio/core/task.py +++ b/structio/core/task.py @@ -52,7 +52,7 @@ class Task: return self.state in [ TaskState.CRASHED, TaskState.FINISHED, - TaskState.CANCELLED + TaskState.CANCELLED, ] def __hash__(self): diff --git a/structio/core/time/clock.py b/structio/core/time/clock.py index 2893357..ddbce20 100644 --- a/structio/core/time/clock.py +++ b/structio/core/time/clock.py @@ -4,7 +4,6 @@ from structio.abc import BaseClock class DefaultClock(BaseClock): - def __init__(self): super().__init__() # We add a large random offset to our timer value diff --git a/structio/core/tooling.py b/structio/core/tooling.py index cbac5cb..7dea674 100644 --- a/structio/core/tooling.py +++ b/structio/core/tooling.py @@ -7,4 +7,3 @@ class SimpleDebugger(BaseDebugger): def on_exit(self): print(f"<< Stopped") - diff --git a/structio/exceptions.py b/structio/exceptions.py index 0244fd4..9212bbc 100644 --- a/structio/exceptions.py +++ b/structio/exceptions.py @@ -37,4 +37,4 @@ class ResourceBusy(StructIOException): """ Raised when an attempt is made to use an asynchronous resource that is currently busy - """ \ No newline at end of file + """ diff --git a/structio/io/files.py b/structio/io/files.py index 714bf63..963e068 100644 --- a/structio/io/files.py +++ b/structio/io/files.py @@ -114,20 +114,25 @@ class AsyncResourceWrapper(AsyncResource): await check_cancelled() -async def open_file(file, - mode="r", - buffering=-1, - encoding=None, - errors=None, - newline=None, - closefd=True, - opener=None) -> AsyncResourceWrapper: +async def open_file( + file, + mode="r", + buffering=-1, + encoding=None, + errors=None, + newline=None, + closefd=True, + opener=None, +) -> AsyncResourceWrapper: """ Like io.open(), but async. Magic """ - return wrap_file(await structio.thread.run_in_worker(io.open, file, mode, buffering, encoding, errors, newline, - closefd, opener)) + return wrap_file( + await structio.thread.run_in_worker( + io.open, file, mode, buffering, encoding, errors, newline, closefd, opener + ) + ) def wrap_file(file) -> AsyncResourceWrapper: @@ -144,7 +149,7 @@ stdout = wrap_file(sys.stdout) stderr = wrap_file(sys.stderr) -async def aprint(*args, sep=' ', end='\n', file=stdout, flush=False): +async def aprint(*args, sep=" ", end="\n", file=stdout, flush=False): """ Like print(), but asynchronous """ @@ -161,7 +166,3 @@ async def ainput(prompt=None, /): await aprint(prompt, end="", flush=True) return (await stdin.readline()).rstrip("\n") - - - - diff --git a/structio/io/socket.py b/structio/io/socket.py index 9684eee..6eae711 100644 --- a/structio/io/socket.py +++ b/structio/io/socket.py @@ -1,4 +1,3 @@ import structio from structio.abc import AsyncResource from structio.core.syscalls import check_cancelled - diff --git a/structio/parallel.py b/structio/parallel.py index 8dfb5fd..7479e8a 100644 --- a/structio/parallel.py +++ b/structio/parallel.py @@ -8,6 +8,3 @@ class Process: """ # TODO - - - diff --git a/structio/path.py b/structio/path.py index 28eb9a8..81c0a3e 100644 --- a/structio/path.py +++ b/structio/path.py @@ -6,47 +6,52 @@ import pathlib from structio.core.syscalls import checkpoint -_SYNC = {"as_posix", - "as_uri", - "is_absolute", - "is_reserved", - "joinpath", - "match", - "relative_to", - "with_name", - "with_suffix"} -_ASYNC = {"chmod", - "exists", - "expanduser", - "glob", - "group", - "is_block_device", - "is_char_device", - "is_dir", - "is_fifo", - "is_file", - "is_mount", - "is_socket", - "is_symlink", - "lchmod", - "lstat", - "mkdir", - "owner", - "read_bytes", - "read_text", - "rename", - "replace", - "resolve", - "rglob", - "rmdir", - "samefile", - "stat", - "symlink_to", - "touch", - "unlink", - "write_text", - "write_bytes" - } +_SYNC = { + "as_posix", + "as_uri", + "is_absolute", + "is_reserved", + "joinpath", + "match", + "relative_to", + "with_name", + "with_suffix", +} + +_ASYNC = { + "chmod", + "exists", + "expanduser", + "glob", + "group", + "is_block_device", + "is_char_device", + "is_dir", + "is_fifo", + "is_file", + "is_mount", + "is_socket", + "is_symlink", + "lchmod", + "lstat", + "mkdir", + "owner", + "read_bytes", + "read_text", + "rename", + "replace", + "resolve", + "rglob", + "rmdir", + "samefile", + "stat", + "symlink_to", + "touch", + "unlink", + "rmdir", + "write_text", + "write_bytes", +} def _wrap(v): @@ -74,7 +79,9 @@ class Path: # This method is special and can't be just forwarded like the others because # it is a class method and I don't feel like doing all the wild metaprogramming # stuff that Trio did (which is cool but gooood luck debugging that), so here ya go. - return _wrap(await structio.thread.run_in_worker(pathlib.Path.cwd, *args, **kwargs)) + return _wrap( + await structio.thread.run_in_worker(pathlib.Path.cwd, *args, **kwargs) + ) @classmethod @wraps(pathlib.Path.home) @@ -83,7 +90,9 @@ class Path: Like pathlib.Path.home(), but async """ - return _wrap(await structio.thread.run_in_worker(pathlib.Path.home, *args, **kwargs)) + return _wrap( + await structio.thread.run_in_worker(pathlib.Path.home, *args, **kwargs) + ) @wraps(pathlib.Path.open) async def open(self, *args, **kwargs): @@ -121,6 +130,12 @@ class Path: def __fspath__(self): return os.fspath(self._wrapped) + def __truediv__(self, other): + return _wrap(self._sync_path.__truediv__(other)) + + def __rtruediv__(self, other): + return _wrap(self._sync_path.__rtruediv__(other)) + def __getattr__(self, attr: str): # We use a similar trick to the one we stole from # Trio for async files, except we also wrap sync @@ -152,5 +167,3 @@ class Path: # Falls down to __getattribute__, which may find a cached # method we generated earlier! raise AttributeError(attr) - - diff --git a/structio/sync.py b/structio/sync.py index ad49e4e..c3aa127 100644 --- a/structio/sync.py +++ b/structio/sync.py @@ -40,7 +40,7 @@ class Event: await checkpoint() return self._tasks.append(current_task()) - await suspend() # We get re-scheduled by set() + await suspend() # We get re-scheduled by set() @enable_ki_protection def set(self): diff --git a/structio/thread.py b/structio/thread.py index 10728f6..6cecdd2 100644 --- a/structio/thread.py +++ b/structio/thread.py @@ -147,7 +147,15 @@ class AsyncThreadQueue(Queue): # Just a bunch of private helpers to run sync/async functions -def _threaded_runner(f, parent_loop: BaseKernel, rq: AsyncThreadQueue, rsq: AsyncThreadQueue, evt: AsyncThreadEvent, *args): + +def _threaded_runner( + f, + parent_loop: BaseKernel, + rq: AsyncThreadQueue, + rsq: AsyncThreadQueue, + evt: AsyncThreadEvent, + *args, +): try: # Setup thread-local storage so future calls # to run_coro() can find this stuff @@ -166,7 +174,9 @@ def _threaded_runner(f, parent_loop: BaseKernel, rq: AsyncThreadQueue, rsq: Asyn @enable_ki_protection -async def _coroutine_request_handler(events: AsyncThreadQueue, results: AsyncThreadQueue): +async def _coroutine_request_handler( + events: AsyncThreadQueue, results: AsyncThreadQueue +): """ Runs coroutines on behalf of a thread spawned by structio and submits the outcome back to the thread @@ -186,8 +196,12 @@ async def _coroutine_request_handler(events: AsyncThreadQueue, results: AsyncThr @enable_ki_protection -async def _wait_for_thread(events: AsyncThreadQueue, results: AsyncThreadQueue, - termination_event: AsyncThreadEvent, cancellable: bool = False): +async def _wait_for_thread( + events: AsyncThreadQueue, + results: AsyncThreadQueue, + termination_event: AsyncThreadEvent, + cancellable: bool = False, +): """ Waits for a thread spawned by structio to complete and returns its result. Exceptions are also propagated @@ -236,22 +250,27 @@ async def _spawn_supervised_thread(f, cancellable: bool = False, *args): # next iteration in the loop, it would (likely, but not always, as it depends on how things get # scheduled) immediately call get() again, get something out of queue that it doesn't expect and # crash horribly. So this separation is necessary to retain my sanity - threading.Thread(target=_threaded_runner, args=(f, current_loop(), rq, rsq, terminate, *args), - # We start cancellable threads in daemonic mode so that - # the main thread doesn't get stuck waiting on them forever - # when their associated async counterpart gets cancelled. This - # is due to the fact that there's really no way to "kill" a thread - # (and for good reason!), so we just pretend nothing happened and go - # about our merry way, hoping the thread dies eventually I guess - name="structio-worker-thread", daemon=cancellable).start() + threading.Thread( + target=_threaded_runner, + args=(f, current_loop(), rq, rsq, terminate, *args), + # We start cancellable threads in daemonic mode so that + # the main thread doesn't get stuck waiting on them forever + # when their associated async counterpart gets cancelled. This + # is due to the fact that there's really no way to "kill" a thread + # (and for good reason!), so we just pretend nothing happened and go + # about our merry way, hoping the thread dies eventually I guess + name="structio-worker-thread", + daemon=cancellable, + ).start() return await _wait_for_thread(rq, rsq, terminate, cancellable) @enable_ki_protection -async def run_in_worker(sync_func, - *args, - cancellable: bool = False, - ): +async def run_in_worker( + sync_func, + *args, + cancellable: bool = False, +): """ Call the given synchronous function in a separate worker thread, turning it into an async operation. @@ -306,12 +325,15 @@ async def run_in_worker(sync_func, # event loop has its own secret "root" task pool which is a parent to all # others and where the call to structio.run() as well as any other system # task run) - return await current_loop().current_pool.spawn(_spawn_supervised_thread, sync_func, cancellable, *args) + return await current_loop().current_pool.spawn( + _spawn_supervised_thread, sync_func, cancellable, *args + ) @enable_ki_protection -def run_coro(async_func: Callable[[Any, Any], Coroutine[Any, Any, Any]], - *args, **kwargs): +def run_coro( + async_func: Callable[[Any, Any], Coroutine[Any, Any, Any]], *args, **kwargs +): """ Submits a coroutine for execution to the event loop, passing any arguments along the way. Return values and exceptions are propagated @@ -350,6 +372,3 @@ def get_max_worker_count() -> int: """ return _storage.max_workers.max_size - - - diff --git a/structio/util/ki.py b/structio/util/ki.py index 6b2b27d..f709a46 100644 --- a/structio/util/ki.py +++ b/structio/util/ki.py @@ -124,4 +124,3 @@ enable_ki_protection.__doc__ = "Decorator to enable keyboard interrupt protectio disable_ki_protection = _ki_protection_decorator(False) disable_ki_protection.__name__ = "disable_ki_protection" disable_ki_protection.__doc__ = "Decorator to disable keyboard interrupt protection" - diff --git a/tests/files.py b/tests/files.py index bcdf81b..171a605 100644 --- a/tests/files.py +++ b/tests/files.py @@ -1,32 +1,42 @@ import structio import tempfile import os - - -async def main(): - await structio.aprint("[main] Threaded async I/O is working!") - t = structio.clock() - stuff = await structio.ainput("Type something: ") - await structio.aprint(f"[main] Output from ainput(): {stuff}") - print(f"[main] Exited in {structio.clock() - t:.2f} seconds") +from structio import aprint async def main_2(data: bytes): t = structio.clock() + await aprint("[main] Using low level os module") async with await structio.open_file(os.path.join(tempfile.gettempdir(), "structio_test.txt"), "wb+") as f: - print(f"[main] Opened async file {f.name!r}, writing payload of {len(data)} bytes") + await aprint(f"[main] Opened async file {f.name!r}, writing payload of {len(data)} bytes") await f.write(data) await f.seek(0) assert await f.read(len(data)) == data await f.flush() - print(f"[main] Deleting {f.name!r}") + await aprint(f"[main] Deleting {f.name!r}") await structio.thread.run_in_worker(os.unlink, f.name) assert not await structio.thread.run_in_worker(os.path.isfile, f.name) - print(f"[main] Done in {structio.clock() - t:.2f} seconds") + await aprint(f"[main] Done in {structio.clock() - t:.2f} seconds") -#structio.run(main) -MB = 1_048_576 -GB = 1 -# Write 1GB of data (too much?) -structio.run(main_2, b"a" * (GB * 1024 * MB)) +async def main_3(data: bytes): + t = structio.clock() + await aprint("[main] Using high level pathlib wrapper") + path = structio.Path(tempfile.gettempdir()) / "structio_test.txt" + async with await path.open("wb+") as f: + await aprint(f"[main] Opened async file {f.name!r}, writing payload of {len(data)} bytes") + await f.write(data) + await f.seek(0) + assert await f.read(len(data)) == data + await f.flush() + await aprint(f"[main] Deleting {f.name!r}") + await path.unlink() + assert not await path.exists() + await aprint(f"[main] Done in {structio.clock() - t:.2f} seconds") + +MB = 1048576 +payload = b"a" * MB * 100 +# Write 100MiB of data (too much?) +structio.run(main_2, payload) +structio.run(main_3, payload) +