structio/tests/events.py

26 lines
682 B
Python

import structio
async def child(ev: structio.Event, n):
print(f"[child] I'm alive! Waiting {n} seconds before setting the event")
await structio.sleep(n)
print("[child] Slept! Setting the event")
ev.set()
assert ev.is_set()
async def main(i):
print("[main] Parent is alive")
j = structio.clock()
async with structio.create_pool() as pool:
evt = structio.Event()
print("[main] Spawning child")
pool.spawn(child, evt, i)
print("[main] Child spawned, waiting on the event")
await evt.wait()
assert evt.is_set()
print(f"[main] Exited in {structio.clock() - j:.2f} seconds")
structio.run(main, 5)