Fixed bugs with thread-related code

This commit is contained in:
Mattia Giambirtone 2023-05-18 11:23:34 +02:00 committed by nocturn9x
parent 0abd2c2364
commit 8a16bb41d6
Signed by: nocturn9x
GPG Key ID: 8270F9F467971E59
3 changed files with 50 additions and 20 deletions

View File

@ -126,7 +126,9 @@ class TaskPool:
raise exc_val raise exc_val
else: else:
await suspend() await suspend()
except Cancelled: except Cancelled as e:
if e.scope is not self.scope:
self.error = e
self.scope.cancelled = True self.scope.cancelled = True
except (Exception, KeyboardInterrupt) as e: except (Exception, KeyboardInterrupt) as e:
self.error = e self.error = e

View File

@ -114,7 +114,8 @@ class AsyncThreadQueue(Queue):
evt: AsyncThreadEvent | None = None evt: AsyncThreadEvent | None = None
with self._lock: with self._lock:
if self.maxsize and self.maxsize == len(self.container): if self.maxsize and self.maxsize == len(self.container):
self.putters.append(AsyncThreadEvent()) evt = AsyncThreadEvent()
self.putters.append(evt)
if self.getters: if self.getters:
self.getters.popleft().set() self.getters.popleft().set()
if evt: if evt:
@ -124,7 +125,7 @@ class AsyncThreadQueue(Queue):
@enable_ki_protection @enable_ki_protection
def get_sync(self): def get_sync(self):
""" """
Like get(), but asynchronous Like get(), but synchronous
""" """
evt: AsyncThreadEvent | None = None evt: AsyncThreadEvent | None = None
@ -147,6 +148,7 @@ def _threaded_runner(f, q: AsyncThreadQueue, parent_loop: BaseKernel, *args):
q.put_sync((False, e)) q.put_sync((False, e))
@enable_ki_protection
async def _async_runner(f, *args): async def _async_runner(f, *args):
queue = AsyncThreadQueue(1) queue = AsyncThreadQueue(1)
th = threading.Thread(target=_threaded_runner, args=(f, queue, current_loop(), *args), th = threading.Thread(target=_threaded_runner, args=(f, queue, current_loop(), *args),
@ -175,11 +177,10 @@ async def run_in_worker(sync_func,
if not hasattr(_storage, "parent_loop"): if not hasattr(_storage, "parent_loop"):
_storage.parent_loop = current_loop() _storage.parent_loop = current_loop()
async with _storage.max_workers: async with _storage.max_workers:
async with structio.create_pool() as pool:
# This will automatically block once # This will automatically block once
# we run out of slots and proceed once # we run out of slots and proceed once
# we have more # we have more
return await pool.spawn(_async_runner, sync_func, *args) return await current_loop().current_pool.spawn(_async_runner, sync_func, *args)
def set_max_worker_count(count: int): def set_max_worker_count(count: int):

View File

@ -1,5 +1,4 @@
import time import time
import threading
import structio import structio
@ -9,9 +8,9 @@ async def producer(q: structio.Queue, n: int):
# queue is emptied by the # queue is emptied by the
# consumer # consumer
await q.put(i) await q.put(i)
print(f"Produced {i}") print(f"[producer] Produced {i}")
await q.put(None) await q.put(None)
print("Producer done") print("[producer] Producer done")
async def consumer(q: structio.Queue): async def consumer(q: structio.Queue):
@ -20,9 +19,9 @@ async def consumer(q: structio.Queue):
# something on the queue # something on the queue
item = await q.get() item = await q.get()
if item is None: if item is None:
print("Consumer done") print("[consumer] Consumer done")
break break
print(f"Consumed {item}") print(f"[consumer] Consumed {item}")
# Simulates some work so the # Simulates some work so the
# producer waits before putting # producer waits before putting
# the next value # the next value
@ -35,29 +34,56 @@ def threaded_consumer(q: structio.thread.AsyncThreadQueue):
# something on the queue # something on the queue
item = q.get_sync() item = q.get_sync()
if item is None: if item is None:
print("Consumer done") print("[worker consumer] Consumer done")
break break
print(f"Consumed {item}") print(f"[worker consumer] Consumed {item}")
# Simulates some work so the # Simulates some work so the
# producer waits before putting # producer waits before putting
# the next value # the next value
time.sleep(1) time.sleep(1)
return 69
async def main(q: structio.Queue, n: int): async def main(q: structio.Queue, n: int):
print("Starting consumer and producer") print("[main] Starting consumer and producer")
t = structio.clock()
async with structio.create_pool() as ctx: async with structio.create_pool() as ctx:
ctx.spawn(producer, q, n) ctx.spawn(producer, q, n)
ctx.spawn(consumer, q) ctx.spawn(consumer, q)
print("Bye!") print(f"[main] Exited in {structio.clock() - t:.2f} seconds")
def threaded_producer(q: structio.thread.AsyncThreadQueue, n: int):
print("[worker producer] Producer started")
for i in range(n):
# This will wait until the
# queue is emptied by the
# consumer
q.put_sync(i)
print(f"[worker producer] Produced {i}")
q.put_sync(None)
print("[worker producer] Producer done")
return 42
async def main_threaded(q: structio.thread.AsyncThreadQueue, n: int): async def main_threaded(q: structio.thread.AsyncThreadQueue, n: int):
print("Starting consumer and producer") print("[main] Starting consumer and producer")
t = structio.clock()
async with structio.create_pool() as pool: async with structio.create_pool() as pool:
pool.spawn(producer, q, n) pool.spawn(producer, q, n)
await structio.thread.run_in_worker(threaded_consumer, q) val = await structio.thread.run_in_worker(threaded_consumer, q)
print("Bye!") print(f"[main] Thread returned {val!r}")
print(f"[main] Exited in {structio.clock() - t:.2f} seconds")
async def main_threaded_2(q: structio.thread.AsyncThreadQueue, n: int):
print("[main] Starting consumer and producer")
t = structio.clock()
async with structio.create_pool() as pool:
pool.spawn(consumer, q)
val = await structio.thread.run_in_worker(threaded_producer, q, n)
print(f"[main] Thread returned {val!r}")
print(f"[main] Exited in {structio.clock() - t:.2f} seconds")
if __name__ == "__main__": if __name__ == "__main__":
@ -65,3 +91,4 @@ if __name__ == "__main__":
structio.run(main, queue, 5) structio.run(main, queue, 5)
queue = structio.thread.AsyncThreadQueue(2) queue = structio.thread.AsyncThreadQueue(2)
structio.run(main_threaded, queue, 5) structio.run(main_threaded, queue, 5)
structio.run(main_threaded_2, queue, 5)