From 0abd2c2364e0054743c60e617fc81c409e4d916e Mon Sep 17 00:00:00 2001 From: Nocturn9x Date: Thu, 18 May 2023 09:55:10 +0200 Subject: [PATCH] Initial experimental work on running threaded async workers --- structio/thread.py | 133 +++++++++++++++++++++++++++++++++++++++++---- tests/events.py | 7 +-- tests/queue.py | 27 +++++++++ 3 files changed, 152 insertions(+), 15 deletions(-) diff --git a/structio/thread.py b/structio/thread.py index 6f7cc5d..01dc8ab 100644 --- a/structio/thread.py +++ b/structio/thread.py @@ -2,10 +2,14 @@ # coroutines into worker threads and to submit asynchronous # work to the event loop from a synchronous thread import threading -from typing import Callable, Any, Coroutine from collections import deque -from structio.sync import Event, Semaphore + +import structio +from structio.sync import Event, Semaphore, Queue from structio.util.ki import enable_ki_protection +from structio.core.syscalls import checkpoint +from structio.core.abc import BaseKernel +from structio.core.run import current_loop, current_task _storage = threading.local() @@ -51,22 +55,131 @@ class AsyncThreadEvent(Event): with self._lock: if self.is_set(): return - # Awakes all coroutines - super().set() + # We can't just call super().set() because that + # will call current_loop(), and we may have been + # called from a non-async thread + loop: BaseKernel = _storage.parent_loop + for task in self._tasks: + loop.reschedule(task) # Awakes all threads for evt in self._workers: evt.set() + self._set = True -async def run_in_worker(func: Callable[[Any, Any], Coroutine[Any, Any, Any]], +class AsyncThreadQueue(Queue): + """ + An extension of the regular queue + class that is safe to use both from + threaded and asynchronous code + """ + + def __init__(self, max_size): + super().__init__(max_size) + self._lock = threading.Lock() + + @enable_ki_protection + async def get(self): + evt: AsyncThreadEvent | None = None + with self._lock: + if not self.container: + self.getters.append(AsyncThreadEvent()) + evt = self.getters[-1] + if self.putters: + self.putters.popleft().set() + if evt: + await evt.wait() + return self.container.popleft() + + @enable_ki_protection + async def put(self, item): + evt: AsyncThreadEvent | None = None + with self._lock: + if self.maxsize and self.maxsize == len(self.container): + self.putters.append(AsyncThreadEvent()) + evt = self.putters[-1] + if self.getters: + self.getters.popleft().set() + if evt: + await evt.wait() + self.container.append(item) + await checkpoint() + + @enable_ki_protection + def put_sync(self, item): + """ + Like put(), but synchronous + """ + + evt: AsyncThreadEvent | None = None + with self._lock: + if self.maxsize and self.maxsize == len(self.container): + self.putters.append(AsyncThreadEvent()) + if self.getters: + self.getters.popleft().set() + if evt: + evt.wait_sync() + self.container.append(item) + + @enable_ki_protection + def get_sync(self): + """ + Like get(), but asynchronous + """ + + evt: AsyncThreadEvent | None = None + with self._lock: + if not self.container: + self.getters.append(AsyncThreadEvent()) + evt = self.getters[-1] + if self.putters: + self.putters.popleft().set() + if evt: + evt.wait_sync() + return self.container.popleft() + + +def _threaded_runner(f, q: AsyncThreadQueue, parent_loop: BaseKernel, *args): + try: + _storage.parent_loop = parent_loop + q.put_sync((True, f(*args))) + except BaseException as e: + q.put_sync((False, e)) + + +async def _async_runner(f, *args): + queue = AsyncThreadQueue(1) + th = threading.Thread(target=_threaded_runner, args=(f, queue, current_loop(), *args), + name="structio-worker-thread") + th.start() + success, data = await queue.get() + if success: + return data + raise data + + +async def run_in_worker(sync_func, *args, - **kwargs, ): + """ + Call the given synchronous function in a separate + worker thread, turning it into an async operation. + The result of the call is returned, and any exceptions + are propagated back to the caller. Note that threaded + operations are not usually cancellable (i.e. the async + operation will fail when cancelled, but the thread will + continue running until termination, as there is no simple + and reliable way to stop a thread anywhere) + """ + + if not hasattr(_storage, "parent_loop"): + _storage.parent_loop = current_loop() async with _storage.max_workers: - # This will automatically block once - # we run out of slots and proceed once - # we have more - pass # TODO + async with structio.create_pool() as pool: + # This will automatically block once + # we run out of slots and proceed once + # we have more + return await pool.spawn(_async_runner, sync_func, *args) def set_max_worker_count(count: int): diff --git a/tests/events.py b/tests/events.py index a25198c..370f67a 100644 --- a/tests/events.py +++ b/tests/events.py @@ -38,14 +38,11 @@ async def main_async_thread(i): # Identical to structio.Event, but this event # can talk to threads too evt = structio.thread.AsyncThreadEvent() - th = threading.Thread(target=thread_worker, args=(evt,)) - th.start() print("[main] Spawning child") pool.spawn(child, evt, i) - print("[main] Child spawned, waiting on the event") - await evt.wait() + print("[main] Child spawned, calling worker thread") + await structio.thread.run_in_worker(thread_worker, evt) assert evt.is_set() - th.join() print(f"[main] Exited in {structio.clock() - j:.2f} seconds") diff --git a/tests/queue.py b/tests/queue.py index 41f762a..daf6779 100644 --- a/tests/queue.py +++ b/tests/queue.py @@ -1,3 +1,5 @@ +import time +import threading import structio @@ -27,6 +29,21 @@ async def consumer(q: structio.Queue): await structio.sleep(1) +def threaded_consumer(q: structio.thread.AsyncThreadQueue): + while True: + # Hangs until there is + # something on the queue + item = q.get_sync() + if item is None: + print("Consumer done") + break + print(f"Consumed {item}") + # Simulates some work so the + # producer waits before putting + # the next value + time.sleep(1) + + async def main(q: structio.Queue, n: int): print("Starting consumer and producer") async with structio.create_pool() as ctx: @@ -35,6 +52,16 @@ async def main(q: structio.Queue, n: int): print("Bye!") +async def main_threaded(q: structio.thread.AsyncThreadQueue, n: int): + print("Starting consumer and producer") + async with structio.create_pool() as pool: + pool.spawn(producer, q, n) + await structio.thread.run_in_worker(threaded_consumer, q) + print("Bye!") + + if __name__ == "__main__": queue = structio.Queue(2) # Queue has size limit of 2 structio.run(main, queue, 5) + queue = structio.thread.AsyncThreadQueue(2) + structio.run(main_threaded, queue, 5)