From a0acce3ed3dd53cb11a8ed2f6a1cda97af00d759 Mon Sep 17 00:00:00 2001 From: nocturn9x Date: Wed, 17 May 2023 16:01:22 +0200 Subject: [PATCH] Fixed bug with channels and modified event channel test --- .idea/misc.xml | 2 +- structio/__init__.py | 5 +++-- structio/core/exceptions.py | 7 +++++++ structio/sync.py | 14 ++++++++------ tests/event_channel.py | 29 ++++++++++++++++++----------- tests/memory_channel.py | 2 +- 6 files changed, 38 insertions(+), 21 deletions(-) diff --git a/.idea/misc.xml b/.idea/misc.xml index 00cc026..df4a621 100644 --- a/.idea/misc.xml +++ b/.idea/misc.xml @@ -1,4 +1,4 @@ - + \ No newline at end of file diff --git a/structio/__init__.py b/structio/__init__.py index b8b75ea..e33a67e 100644 --- a/structio/__init__.py +++ b/structio/__init__.py @@ -6,7 +6,7 @@ from structio.core.managers.signals.sigint import SigIntManager from structio.core.time.clock import DefaultClock from structio.core.syscalls import sleep, suspend as _suspend from structio.core.context import TaskPool, TaskScope -from structio.core.exceptions import Cancelled, TimedOut +from structio.core.exceptions import Cancelled, TimedOut, ResourceClosed from structio.core import task from structio.core.task import Task, TaskState from structio.sync import Event, Queue, MemoryChannel, Semaphore @@ -106,5 +106,6 @@ __all__ = ["run", "Task", "TaskState", "TaskScope", - "TaskPool" + "TaskPool", + "ResourceClosed" ] diff --git a/structio/core/exceptions.py b/structio/core/exceptions.py index 75f9146..eccb527 100644 --- a/structio/core/exceptions.py +++ b/structio/core/exceptions.py @@ -24,3 +24,10 @@ class TimedOut(StructIOException): """ scope: "TaskScope" + + +class ResourceClosed(StructIOException): + """ + Raised when an asynchronous resource is + closed and no longer usable + """ \ No newline at end of file diff --git a/structio/sync.py b/structio/sync.py index 59f30f6..8d52d2f 100644 --- a/structio/sync.py +++ b/structio/sync.py @@ -1,5 +1,6 @@ # Task synchronization primitives from structio.core.syscalls import suspend, checkpoint +from structio.core.exceptions import ResourceClosed from structio.core.run import current_task, current_loop from structio.core.abc import ChannelReader, ChannelWriter, Channel from structio.util.ki import enable_ki_protection @@ -164,15 +165,15 @@ class MemorySendChannel(ChannelWriter): @enable_ki_protection async def send(self, value): if self._closed: - raise IOError("cannot operate on a closed channel") + raise ResourceClosed("cannot operate on a closed channel") await self.buffer.put(value) @enable_ki_protection async def close(self): - await checkpoint() if self._closed: - raise IOError("cannot operate on a closed channel") + raise ResourceClosed("cannot operate on a closed channel") self._closed = True + await checkpoint() class MemoryReceiveChannel(ChannelReader): @@ -189,15 +190,15 @@ class MemoryReceiveChannel(ChannelReader): @enable_ki_protection async def receive(self): if self._closed: - raise IOError("cannot operate on a closed channel") + raise ResourceClosed("cannot operate on a closed channel") return await self.buffer.get() @enable_ki_protection async def close(self): - await checkpoint() if self._closed: - raise IOError("cannot operate on a closed channel") + raise ResourceClosed("cannot operate on a closed channel") self._closed = True + await checkpoint() class MemoryChannel(Channel, MemorySendChannel, MemoryReceiveChannel): @@ -208,6 +209,7 @@ class MemoryChannel(Channel, MemorySendChannel, MemoryReceiveChannel): def __init__(self, buffer_size): self._buffer = Queue(buffer_size) + super().__init__(self._buffer) self.reader = MemoryReceiveChannel(self._buffer) self.writer = MemorySendChannel(self._buffer) diff --git a/tests/event_channel.py b/tests/event_channel.py index 0f061eb..524a64b 100644 --- a/tests/event_channel.py +++ b/tests/event_channel.py @@ -4,21 +4,24 @@ import random async def waiter(ch: structio.ChannelReader): print("[waiter] Waiter is alive!") - async with ch: - while True: - print("[waiter] Awaiting events") + while True: + print("[waiter] Awaiting events") + try: evt: structio.Event = await ch.receive() - if not evt: - break - print("[waiter] Received event, waiting to be triggered") - await evt.wait() - print("[waiter] Event triggered") + except structio.ResourceClosed: + break + print("[waiter] Received event, waiting to be triggered") + t = structio.clock() + await evt.wait() + print(f"[waiter] Event triggered after {structio.clock() - t:.2f} seconds") print("[waiter] Done!") -async def sender(ch: structio.ChannelWriter, n: int): +async def sender(ch: structio.Channel, n: int): print("[sender] Sender is alive!") async with ch: + # Channel is automatically closed when exiting + # the async with block for _ in range(n): print("[sender] Sending event") ev = structio.Event() @@ -28,7 +31,6 @@ async def sender(ch: structio.ChannelWriter, n: int): await structio.sleep(t) print("[sender] Setting the event") ev.set() - await ch.send(None) print("[sender] Done!") @@ -36,8 +38,13 @@ async def main(n: int): print("[main] Parent is alive") channel = structio.MemoryChannel(1) async with structio.create_pool() as pool: + # Each end of the channel can be used independently, + # and closing one does not also close the other (which + # is why we pass the full channel object to our sender + # so it can close both ends and cause the reader to catch + # the closing exception and exit cleanly) pool.spawn(waiter, channel.reader) - pool.spawn(sender, channel.writer, n) + pool.spawn(sender, channel, n) print("[main] Children spawned") print("[main] Done!") diff --git a/tests/memory_channel.py b/tests/memory_channel.py index 642736f..b971a59 100644 --- a/tests/memory_channel.py +++ b/tests/memory_channel.py @@ -33,7 +33,7 @@ async def writer(ch: structio.ChannelWriter, objects: list[Any]): async def main(objects: list[Any]): print("[main] Parent is alive") # We construct a new memory channel... - channel = structio.MemoryChannel(1) + channel = structio.MemoryChannel(1) # 1 is the size of the internal buffer async with structio.create_pool() as pool: # ... and dispatch the two ends to different # tasks. Isn't this neat?