diff --git a/structio/core/kernels/fifo.py b/structio/core/kernels/fifo.py index 8c4aedf..e82efab 100644 --- a/structio/core/kernels/fifo.py +++ b/structio/core/kernels/fifo.py @@ -63,6 +63,8 @@ class FIFOKernel(BaseKernel): traceback.print_tb(e.__traceback__) def done(self): + if self.entry_point.done(): + return True if any([self.run_queue, self.paused, self.io_manager.pending()]): return False for scope in self.scopes: diff --git a/structio/thread.py b/structio/thread.py index 3ca613e..06fa4df 100644 --- a/structio/thread.py +++ b/structio/thread.py @@ -11,7 +11,7 @@ from structio.core.syscalls import checkpoint from structio.core.abc import BaseKernel from structio.core.run import current_loop from typing import Callable, Any, Coroutine -from structio.core.exceptions import StructIOException +from structio.core.exceptions import StructIOException, TimedOut, Cancelled _storage = threading.local() @@ -147,7 +147,7 @@ class AsyncThreadQueue(Queue): def _threaded_runner(f, q: AsyncThreadQueue, parent_loop: BaseKernel, rq: AsyncThreadQueue, - rsq: AsyncThreadQueue, *args): + rsq: AsyncThreadQueue, evt: AsyncThreadEvent, *args): try: _storage.parent_loop = parent_loop _storage.rq = rq @@ -155,35 +155,51 @@ def _threaded_runner(f, q: AsyncThreadQueue, parent_loop: BaseKernel, rq: AsyncT q.put_sync((True, f(*args))) except BaseException as e: q.put_sync((False, e)) + finally: + evt.set() @enable_ki_protection -async def _wait_for_thread(events, results: AsyncThreadQueue): - with structio.TaskScope(shielded=True): - while True: - data = await events.get() - if not data: - break - coro = data - try: - await results.put((True, await coro)) - except BaseException as e: - await results.put((False, e)) +async def _async_waiter(events, results: AsyncThreadQueue): + while True: + data = await events.get() + if not data: + break + coro = data + try: + await results.put((True, await coro)) + except BaseException as e: + await results.put((False, e)) @enable_ki_protection -async def _async_runner(f, *args): +async def _wait_for_thread(events, results: AsyncThreadQueue, evt: AsyncThreadEvent, cancellable: bool = False): + async with structio.create_pool() as pool: + pool.scope.shielded = not cancellable + # Spawn a coroutine to process incoming requests from + # the new async thread + waiter = pool.spawn(_async_waiter, events, results) + # Wait for the thread to terminate + await evt.wait() + # Worker thread has exited: we no longer need to process any + # requests, so we shut our waiter down + await events.put(None) + + +@enable_ki_protection +async def _async_runner(f, cancellable: bool = False, *args): + evt = AsyncThreadEvent() queue = AsyncThreadQueue(1) # Request queue rq = AsyncThreadQueue(0) # Results queue rsq = AsyncThreadQueue(0) - current_loop().current_pool.spawn(_wait_for_thread, rq, rsq) - th = threading.Thread(target=_threaded_runner, args=(f, queue, current_loop(), rq, rsq, *args), - name="structio-worker-thread") + current_loop().current_pool.spawn(_wait_for_thread, rq, rsq, evt, cancellable) + th = threading.Thread(target=_threaded_runner, args=(f, queue, current_loop(), rq, rsq, evt, *args), + name="structio-worker-thread", daemon=cancellable) th.start() success, data = await queue.get() - await rq.put(None) + await rsq.put(None) if success: return data raise data @@ -192,16 +208,24 @@ async def _async_runner(f, *args): @enable_ki_protection async def run_in_worker(sync_func, *args, + cancellable: bool = False, ): """ Call the given synchronous function in a separate worker thread, turning it into an async operation. The result of the call is returned, and any exceptions - are propagated back to the caller. Note that threaded - operations are not usually cancellable (i.e. the async - operation will fail when cancelled, but the thread will - continue running until termination, as there is no simple - and reliable way to stop a thread anywhere) + are propagated back to the caller. If cancellable equals + False, the default, then the operation cannot be canceled + in any way. If cancellable equals True, cancellation will + cause this function to return early and to abruptly drop + the thread: keep in mind the thread is likely to keep running + in the background, as structio doesn't make any effort to stop + it (it can't). If you call this with cancellable=True, make sure + the operation you're performing is side-effect-free (for example, + the async version of getaddrinfo() uses run_in_worker with cancellable=True + to avoid hogging the event loop when doing domain name resolution but still + be able to fail properly, since no one really cares if a random DNS lookup + keeps running in the background) """ if not hasattr(_storage, "parent_loop"): @@ -215,10 +239,7 @@ async def run_in_worker(sync_func, # we run out of slots and proceed once # we have more async with _storage.max_workers: - # We inject a worker task into the current - # pool so waiting for the thread is handled - # as if it were a task - return await current_loop().current_pool.spawn(_async_runner, sync_func, *args) + return await current_loop().current_pool.spawn(_async_runner, sync_func, cancellable, *args) @enable_ki_protection diff --git a/tests/events.py b/tests/events.py index 370f67a..4b1cbd0 100644 --- a/tests/events.py +++ b/tests/events.py @@ -46,5 +46,5 @@ async def main_async_thread(i): print(f"[main] Exited in {structio.clock() - j:.2f} seconds") -structio.run(main, 5) +#structio.run(main, 5) structio.run(main_async_thread, 5) diff --git a/tests/threads.py b/tests/threads.py index 1c9fd2c..67e68cd 100644 --- a/tests/threads.py +++ b/tests/threads.py @@ -21,4 +21,15 @@ async def main(n): print(f"[main] Exited in {structio.clock() - t:.2f} seconds") +async def main_timeout(n, k): + print(f"[main] Spawning worker thread, exiting in {k} seconds") + t = structio.clock() + with structio.skip_after(k): + # We need to make the operation explicitly cancellable if we want + # to be able to move on! + await structio.thread.run_in_worker(fake_async_sleeper, n, cancellable=True) + print(f"[main] Exited in {structio.clock() - t:.2f} seconds") + + structio.run(main, 2) +structio.run(main_timeout, 5, 3)