structio/tests/semaphores.py

32 lines
1.1 KiB
Python

import structio
async def child(i: int, sem: structio.Semaphore):
print(f"[child {i}] I'm alive!")
async with sem:
print(f"[child {i}] Entered critical section")
await structio.sleep(1)
print(f"[child {i}] Exited critical section")
async def main(n: int, k):
assert isinstance(n, int) and n > 0
assert isinstance(k, int) and k > 1
print(f"[main] Parent is alive, creating semaphore of size {n}")
semaphore = structio.Semaphore(n)
t = structio.clock()
async with structio.create_pool() as pool:
print(f"[main] Spawning {n * k} children")
for i in range(1, (n * k) + 1):
pool.spawn(child, i, semaphore)
print("[main] All children spawned, waiting for completion")
# Since our semaphore has a limit of n tasks that
# can acquire it concurrently, we should see at most
# n instances of child running at any given time,
# like in batches
print(f"[main] Done in {structio.clock() - t:.2f} seconds")
# Should exit in about k seconds
structio.run(main, 3, 2)