2023-05-15 18:25:02 +02:00
|
|
|
import structio
|
|
|
|
from typing import Any
|
|
|
|
|
|
|
|
|
|
|
|
async def reader(ch: structio.ChannelReader):
|
|
|
|
print("[reader] Reader is alive!")
|
|
|
|
async with ch:
|
|
|
|
while True:
|
|
|
|
print(f"[reader] Awaiting messages")
|
|
|
|
data = await ch.receive()
|
|
|
|
if not data:
|
|
|
|
break
|
|
|
|
print(f"[reader] Got: {data}")
|
|
|
|
# Simulates some work
|
|
|
|
await structio.sleep(1)
|
|
|
|
print("[reader] Done!")
|
|
|
|
|
|
|
|
|
|
|
|
async def writer(ch: structio.ChannelWriter, objects: list[Any]):
|
|
|
|
print("[writer] Writer is alive!")
|
|
|
|
async with ch:
|
|
|
|
for obj in objects:
|
|
|
|
print(f"[writer] Sending {obj!r}")
|
|
|
|
await ch.send(obj)
|
|
|
|
print(f"[writer] Sent {obj!r}")
|
|
|
|
# Let's make the writer twice as fast as the receiver
|
|
|
|
# to test backpressure :)
|
|
|
|
await structio.sleep(0.5)
|
|
|
|
await ch.send(None)
|
|
|
|
print("[writer] Done!")
|
|
|
|
|
|
|
|
|
|
|
|
async def main(objects: list[Any]):
|
|
|
|
print("[main] Parent is alive")
|
|
|
|
# We construct a new memory channel...
|
2023-05-17 16:01:22 +02:00
|
|
|
channel = structio.MemoryChannel(1) # 1 is the size of the internal buffer
|
2023-05-15 18:25:02 +02:00
|
|
|
async with structio.create_pool() as pool:
|
|
|
|
# ... and dispatch the two ends to different
|
|
|
|
# tasks. Isn't this neat?
|
|
|
|
pool.spawn(reader, channel.reader)
|
|
|
|
pool.spawn(writer, channel.writer, objects)
|
|
|
|
print("[main] Children spawned")
|
|
|
|
print("[main] Done!")
|
|
|
|
|
|
|
|
|
|
|
|
structio.run(main, [1, 2, 3, 4])
|
|
|
|
|