Initial experimental work on running threaded async workers
This commit is contained in:
parent
1b4193ce79
commit
0abd2c2364
|
@ -2,10 +2,14 @@
|
|||
# coroutines into worker threads and to submit asynchronous
|
||||
# work to the event loop from a synchronous thread
|
||||
import threading
|
||||
from typing import Callable, Any, Coroutine
|
||||
from collections import deque
|
||||
from structio.sync import Event, Semaphore
|
||||
|
||||
import structio
|
||||
from structio.sync import Event, Semaphore, Queue
|
||||
from structio.util.ki import enable_ki_protection
|
||||
from structio.core.syscalls import checkpoint
|
||||
from structio.core.abc import BaseKernel
|
||||
from structio.core.run import current_loop, current_task
|
||||
|
||||
|
||||
_storage = threading.local()
|
||||
|
@ -51,22 +55,131 @@ class AsyncThreadEvent(Event):
|
|||
with self._lock:
|
||||
if self.is_set():
|
||||
return
|
||||
# Awakes all coroutines
|
||||
super().set()
|
||||
# We can't just call super().set() because that
|
||||
# will call current_loop(), and we may have been
|
||||
# called from a non-async thread
|
||||
loop: BaseKernel = _storage.parent_loop
|
||||
for task in self._tasks:
|
||||
loop.reschedule(task)
|
||||
# Awakes all threads
|
||||
for evt in self._workers:
|
||||
evt.set()
|
||||
self._set = True
|
||||
|
||||
|
||||
async def run_in_worker(func: Callable[[Any, Any], Coroutine[Any, Any, Any]],
|
||||
class AsyncThreadQueue(Queue):
|
||||
"""
|
||||
An extension of the regular queue
|
||||
class that is safe to use both from
|
||||
threaded and asynchronous code
|
||||
"""
|
||||
|
||||
def __init__(self, max_size):
|
||||
super().__init__(max_size)
|
||||
self._lock = threading.Lock()
|
||||
|
||||
@enable_ki_protection
|
||||
async def get(self):
|
||||
evt: AsyncThreadEvent | None = None
|
||||
with self._lock:
|
||||
if not self.container:
|
||||
self.getters.append(AsyncThreadEvent())
|
||||
evt = self.getters[-1]
|
||||
if self.putters:
|
||||
self.putters.popleft().set()
|
||||
if evt:
|
||||
await evt.wait()
|
||||
return self.container.popleft()
|
||||
|
||||
@enable_ki_protection
|
||||
async def put(self, item):
|
||||
evt: AsyncThreadEvent | None = None
|
||||
with self._lock:
|
||||
if self.maxsize and self.maxsize == len(self.container):
|
||||
self.putters.append(AsyncThreadEvent())
|
||||
evt = self.putters[-1]
|
||||
if self.getters:
|
||||
self.getters.popleft().set()
|
||||
if evt:
|
||||
await evt.wait()
|
||||
self.container.append(item)
|
||||
await checkpoint()
|
||||
|
||||
@enable_ki_protection
|
||||
def put_sync(self, item):
|
||||
"""
|
||||
Like put(), but synchronous
|
||||
"""
|
||||
|
||||
evt: AsyncThreadEvent | None = None
|
||||
with self._lock:
|
||||
if self.maxsize and self.maxsize == len(self.container):
|
||||
self.putters.append(AsyncThreadEvent())
|
||||
if self.getters:
|
||||
self.getters.popleft().set()
|
||||
if evt:
|
||||
evt.wait_sync()
|
||||
self.container.append(item)
|
||||
|
||||
@enable_ki_protection
|
||||
def get_sync(self):
|
||||
"""
|
||||
Like get(), but asynchronous
|
||||
"""
|
||||
|
||||
evt: AsyncThreadEvent | None = None
|
||||
with self._lock:
|
||||
if not self.container:
|
||||
self.getters.append(AsyncThreadEvent())
|
||||
evt = self.getters[-1]
|
||||
if self.putters:
|
||||
self.putters.popleft().set()
|
||||
if evt:
|
||||
evt.wait_sync()
|
||||
return self.container.popleft()
|
||||
|
||||
|
||||
def _threaded_runner(f, q: AsyncThreadQueue, parent_loop: BaseKernel, *args):
|
||||
try:
|
||||
_storage.parent_loop = parent_loop
|
||||
q.put_sync((True, f(*args)))
|
||||
except BaseException as e:
|
||||
q.put_sync((False, e))
|
||||
|
||||
|
||||
async def _async_runner(f, *args):
|
||||
queue = AsyncThreadQueue(1)
|
||||
th = threading.Thread(target=_threaded_runner, args=(f, queue, current_loop(), *args),
|
||||
name="structio-worker-thread")
|
||||
th.start()
|
||||
success, data = await queue.get()
|
||||
if success:
|
||||
return data
|
||||
raise data
|
||||
|
||||
|
||||
async def run_in_worker(sync_func,
|
||||
*args,
|
||||
**kwargs,
|
||||
):
|
||||
"""
|
||||
Call the given synchronous function in a separate
|
||||
worker thread, turning it into an async operation.
|
||||
The result of the call is returned, and any exceptions
|
||||
are propagated back to the caller. Note that threaded
|
||||
operations are not usually cancellable (i.e. the async
|
||||
operation will fail when cancelled, but the thread will
|
||||
continue running until termination, as there is no simple
|
||||
and reliable way to stop a thread anywhere)
|
||||
"""
|
||||
|
||||
if not hasattr(_storage, "parent_loop"):
|
||||
_storage.parent_loop = current_loop()
|
||||
async with _storage.max_workers:
|
||||
# This will automatically block once
|
||||
# we run out of slots and proceed once
|
||||
# we have more
|
||||
pass # TODO
|
||||
async with structio.create_pool() as pool:
|
||||
# This will automatically block once
|
||||
# we run out of slots and proceed once
|
||||
# we have more
|
||||
return await pool.spawn(_async_runner, sync_func, *args)
|
||||
|
||||
|
||||
def set_max_worker_count(count: int):
|
||||
|
|
|
@ -38,14 +38,11 @@ async def main_async_thread(i):
|
|||
# Identical to structio.Event, but this event
|
||||
# can talk to threads too
|
||||
evt = structio.thread.AsyncThreadEvent()
|
||||
th = threading.Thread(target=thread_worker, args=(evt,))
|
||||
th.start()
|
||||
print("[main] Spawning child")
|
||||
pool.spawn(child, evt, i)
|
||||
print("[main] Child spawned, waiting on the event")
|
||||
await evt.wait()
|
||||
print("[main] Child spawned, calling worker thread")
|
||||
await structio.thread.run_in_worker(thread_worker, evt)
|
||||
assert evt.is_set()
|
||||
th.join()
|
||||
print(f"[main] Exited in {structio.clock() - j:.2f} seconds")
|
||||
|
||||
|
||||
|
|
|
@ -1,3 +1,5 @@
|
|||
import time
|
||||
import threading
|
||||
import structio
|
||||
|
||||
|
||||
|
@ -27,6 +29,21 @@ async def consumer(q: structio.Queue):
|
|||
await structio.sleep(1)
|
||||
|
||||
|
||||
def threaded_consumer(q: structio.thread.AsyncThreadQueue):
|
||||
while True:
|
||||
# Hangs until there is
|
||||
# something on the queue
|
||||
item = q.get_sync()
|
||||
if item is None:
|
||||
print("Consumer done")
|
||||
break
|
||||
print(f"Consumed {item}")
|
||||
# Simulates some work so the
|
||||
# producer waits before putting
|
||||
# the next value
|
||||
time.sleep(1)
|
||||
|
||||
|
||||
async def main(q: structio.Queue, n: int):
|
||||
print("Starting consumer and producer")
|
||||
async with structio.create_pool() as ctx:
|
||||
|
@ -35,6 +52,16 @@ async def main(q: structio.Queue, n: int):
|
|||
print("Bye!")
|
||||
|
||||
|
||||
async def main_threaded(q: structio.thread.AsyncThreadQueue, n: int):
|
||||
print("Starting consumer and producer")
|
||||
async with structio.create_pool() as pool:
|
||||
pool.spawn(producer, q, n)
|
||||
await structio.thread.run_in_worker(threaded_consumer, q)
|
||||
print("Bye!")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
queue = structio.Queue(2) # Queue has size limit of 2
|
||||
structio.run(main, queue, 5)
|
||||
queue = structio.thread.AsyncThreadQueue(2)
|
||||
structio.run(main_threaded, queue, 5)
|
||||
|
|
Loading…
Reference in New Issue