structio/tests/task_handling.py

52 lines
2.0 KiB
Python

import structio
from nested_pool_inner_raises import successful, failing
async def main_cancel(i):
print("[main] Parent is alive, spawning child")
t = structio.clock()
async with structio.create_pool() as pool:
task: structio.Task = pool.spawn(successful, "test", i * 2)
print(f"[main] Child spawned, waiting {i} seconds before canceling it")
await structio.sleep(i)
print("[main] Cancelling child")
# Tasks can be cancelled individually, if necessary
task.cancel()
print(f"[main] Exited in {structio.clock() - t:.2f} seconds")
async def main_wait_successful(i):
print("[main] Parent is alive, spawning (and explicitly waiting for) child")
t = structio.clock()
async with structio.create_pool() as pool:
# The spawn() method returns a Task object that can be
# independently managed if necessary. Awaiting the Task
# will wait for it to complete and return its return value,
# as well as propagate any exceptions it may raise. Note that
# in this example we could've just awaited the coroutine directly,
# so it's a bad show for the feature, but you could theoretically
# pass the object around somewhere else and do the awaiting there
print(f"[main] Child has returned: {await pool.spawn(successful, 'test', i)}")
print(f"[main] Exited in {structio.clock() - t:.2f} seconds")
async def main_wait_failing(i):
print("[main] Parent is alive, spawning (and explicitly waiting for) child")
t = structio.clock()
try:
async with structio.create_pool() as pool:
# This never completes
await pool.spawn(failing, "test", i)
print("This is never executed")
except TypeError:
print(f"[main] TypeError caught!")
print(f"[main] Exited in {structio.clock() - t:.2f} seconds")
# Total time should be about 15s
structio.run(main_cancel, 5)
structio.run(main_wait_successful, 5)
structio.run(main_wait_failing, 5)