Thread workers are now partially cancellable

This commit is contained in:
Mattia Giambirtone 2023-05-18 15:19:13 +02:00 committed by nocturn9x
parent 42bf9d5daf
commit 5a18314dcc
Signed by: nocturn9x
GPG Key ID: 8270F9F467971E59
4 changed files with 62 additions and 28 deletions

View File

@ -63,6 +63,8 @@ class FIFOKernel(BaseKernel):
traceback.print_tb(e.__traceback__) traceback.print_tb(e.__traceback__)
def done(self): def done(self):
if self.entry_point.done():
return True
if any([self.run_queue, self.paused, self.io_manager.pending()]): if any([self.run_queue, self.paused, self.io_manager.pending()]):
return False return False
for scope in self.scopes: for scope in self.scopes:

View File

@ -11,7 +11,7 @@ from structio.core.syscalls import checkpoint
from structio.core.abc import BaseKernel from structio.core.abc import BaseKernel
from structio.core.run import current_loop from structio.core.run import current_loop
from typing import Callable, Any, Coroutine from typing import Callable, Any, Coroutine
from structio.core.exceptions import StructIOException from structio.core.exceptions import StructIOException, TimedOut, Cancelled
_storage = threading.local() _storage = threading.local()
@ -147,7 +147,7 @@ class AsyncThreadQueue(Queue):
def _threaded_runner(f, q: AsyncThreadQueue, parent_loop: BaseKernel, rq: AsyncThreadQueue, def _threaded_runner(f, q: AsyncThreadQueue, parent_loop: BaseKernel, rq: AsyncThreadQueue,
rsq: AsyncThreadQueue, *args): rsq: AsyncThreadQueue, evt: AsyncThreadEvent, *args):
try: try:
_storage.parent_loop = parent_loop _storage.parent_loop = parent_loop
_storage.rq = rq _storage.rq = rq
@ -155,35 +155,51 @@ def _threaded_runner(f, q: AsyncThreadQueue, parent_loop: BaseKernel, rq: AsyncT
q.put_sync((True, f(*args))) q.put_sync((True, f(*args)))
except BaseException as e: except BaseException as e:
q.put_sync((False, e)) q.put_sync((False, e))
finally:
evt.set()
@enable_ki_protection @enable_ki_protection
async def _wait_for_thread(events, results: AsyncThreadQueue): async def _async_waiter(events, results: AsyncThreadQueue):
with structio.TaskScope(shielded=True): while True:
while True: data = await events.get()
data = await events.get() if not data:
if not data: break
break coro = data
coro = data try:
try: await results.put((True, await coro))
await results.put((True, await coro)) except BaseException as e:
except BaseException as e: await results.put((False, e))
await results.put((False, e))
@enable_ki_protection @enable_ki_protection
async def _async_runner(f, *args): async def _wait_for_thread(events, results: AsyncThreadQueue, evt: AsyncThreadEvent, cancellable: bool = False):
async with structio.create_pool() as pool:
pool.scope.shielded = not cancellable
# Spawn a coroutine to process incoming requests from
# the new async thread
waiter = pool.spawn(_async_waiter, events, results)
# Wait for the thread to terminate
await evt.wait()
# Worker thread has exited: we no longer need to process any
# requests, so we shut our waiter down
await events.put(None)
@enable_ki_protection
async def _async_runner(f, cancellable: bool = False, *args):
evt = AsyncThreadEvent()
queue = AsyncThreadQueue(1) queue = AsyncThreadQueue(1)
# Request queue # Request queue
rq = AsyncThreadQueue(0) rq = AsyncThreadQueue(0)
# Results queue # Results queue
rsq = AsyncThreadQueue(0) rsq = AsyncThreadQueue(0)
current_loop().current_pool.spawn(_wait_for_thread, rq, rsq) current_loop().current_pool.spawn(_wait_for_thread, rq, rsq, evt, cancellable)
th = threading.Thread(target=_threaded_runner, args=(f, queue, current_loop(), rq, rsq, *args), th = threading.Thread(target=_threaded_runner, args=(f, queue, current_loop(), rq, rsq, evt, *args),
name="structio-worker-thread") name="structio-worker-thread", daemon=cancellable)
th.start() th.start()
success, data = await queue.get() success, data = await queue.get()
await rq.put(None) await rsq.put(None)
if success: if success:
return data return data
raise data raise data
@ -192,16 +208,24 @@ async def _async_runner(f, *args):
@enable_ki_protection @enable_ki_protection
async def run_in_worker(sync_func, async def run_in_worker(sync_func,
*args, *args,
cancellable: bool = False,
): ):
""" """
Call the given synchronous function in a separate Call the given synchronous function in a separate
worker thread, turning it into an async operation. worker thread, turning it into an async operation.
The result of the call is returned, and any exceptions The result of the call is returned, and any exceptions
are propagated back to the caller. Note that threaded are propagated back to the caller. If cancellable equals
operations are not usually cancellable (i.e. the async False, the default, then the operation cannot be canceled
operation will fail when cancelled, but the thread will in any way. If cancellable equals True, cancellation will
continue running until termination, as there is no simple cause this function to return early and to abruptly drop
and reliable way to stop a thread anywhere) the thread: keep in mind the thread is likely to keep running
in the background, as structio doesn't make any effort to stop
it (it can't). If you call this with cancellable=True, make sure
the operation you're performing is side-effect-free (for example,
the async version of getaddrinfo() uses run_in_worker with cancellable=True
to avoid hogging the event loop when doing domain name resolution but still
be able to fail properly, since no one really cares if a random DNS lookup
keeps running in the background)
""" """
if not hasattr(_storage, "parent_loop"): if not hasattr(_storage, "parent_loop"):
@ -215,10 +239,7 @@ async def run_in_worker(sync_func,
# we run out of slots and proceed once # we run out of slots and proceed once
# we have more # we have more
async with _storage.max_workers: async with _storage.max_workers:
# We inject a worker task into the current return await current_loop().current_pool.spawn(_async_runner, sync_func, cancellable, *args)
# pool so waiting for the thread is handled
# as if it were a task
return await current_loop().current_pool.spawn(_async_runner, sync_func, *args)
@enable_ki_protection @enable_ki_protection

View File

@ -46,5 +46,5 @@ async def main_async_thread(i):
print(f"[main] Exited in {structio.clock() - j:.2f} seconds") print(f"[main] Exited in {structio.clock() - j:.2f} seconds")
structio.run(main, 5) #structio.run(main, 5)
structio.run(main_async_thread, 5) structio.run(main_async_thread, 5)

View File

@ -21,4 +21,15 @@ async def main(n):
print(f"[main] Exited in {structio.clock() - t:.2f} seconds") print(f"[main] Exited in {structio.clock() - t:.2f} seconds")
async def main_timeout(n, k):
print(f"[main] Spawning worker thread, exiting in {k} seconds")
t = structio.clock()
with structio.skip_after(k):
# We need to make the operation explicitly cancellable if we want
# to be able to move on!
await structio.thread.run_in_worker(fake_async_sleeper, n, cancellable=True)
print(f"[main] Exited in {structio.clock() - t:.2f} seconds")
structio.run(main, 2) structio.run(main, 2)
structio.run(main_timeout, 5, 3)