Fixed bug with channels and modified event channel test

This commit is contained in:
Mattia Giambirtone 2023-05-17 16:01:22 +02:00
parent 3e33f2732e
commit a0acce3ed3
Signed by: nocturn9x
GPG Key ID: 8270F9F467971E59
6 changed files with 38 additions and 21 deletions

View File

@ -1,4 +1,4 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="ProjectRootManager" version="2" project-jdk-name="Python 3.10 (StructuredIO)" project-jdk-type="Python SDK" />
<component name="ProjectRootManager" version="2" project-jdk-name="Python 3.10 (structio)" project-jdk-type="Python SDK" />
</project>

View File

@ -6,7 +6,7 @@ from structio.core.managers.signals.sigint import SigIntManager
from structio.core.time.clock import DefaultClock
from structio.core.syscalls import sleep, suspend as _suspend
from structio.core.context import TaskPool, TaskScope
from structio.core.exceptions import Cancelled, TimedOut
from structio.core.exceptions import Cancelled, TimedOut, ResourceClosed
from structio.core import task
from structio.core.task import Task, TaskState
from structio.sync import Event, Queue, MemoryChannel, Semaphore
@ -106,5 +106,6 @@ __all__ = ["run",
"Task",
"TaskState",
"TaskScope",
"TaskPool"
"TaskPool",
"ResourceClosed"
]

View File

@ -24,3 +24,10 @@ class TimedOut(StructIOException):
"""
scope: "TaskScope"
class ResourceClosed(StructIOException):
"""
Raised when an asynchronous resource is
closed and no longer usable
"""

View File

@ -1,5 +1,6 @@
# Task synchronization primitives
from structio.core.syscalls import suspend, checkpoint
from structio.core.exceptions import ResourceClosed
from structio.core.run import current_task, current_loop
from structio.core.abc import ChannelReader, ChannelWriter, Channel
from structio.util.ki import enable_ki_protection
@ -164,15 +165,15 @@ class MemorySendChannel(ChannelWriter):
@enable_ki_protection
async def send(self, value):
if self._closed:
raise IOError("cannot operate on a closed channel")
raise ResourceClosed("cannot operate on a closed channel")
await self.buffer.put(value)
@enable_ki_protection
async def close(self):
await checkpoint()
if self._closed:
raise IOError("cannot operate on a closed channel")
raise ResourceClosed("cannot operate on a closed channel")
self._closed = True
await checkpoint()
class MemoryReceiveChannel(ChannelReader):
@ -189,15 +190,15 @@ class MemoryReceiveChannel(ChannelReader):
@enable_ki_protection
async def receive(self):
if self._closed:
raise IOError("cannot operate on a closed channel")
raise ResourceClosed("cannot operate on a closed channel")
return await self.buffer.get()
@enable_ki_protection
async def close(self):
await checkpoint()
if self._closed:
raise IOError("cannot operate on a closed channel")
raise ResourceClosed("cannot operate on a closed channel")
self._closed = True
await checkpoint()
class MemoryChannel(Channel, MemorySendChannel, MemoryReceiveChannel):
@ -208,6 +209,7 @@ class MemoryChannel(Channel, MemorySendChannel, MemoryReceiveChannel):
def __init__(self, buffer_size):
self._buffer = Queue(buffer_size)
super().__init__(self._buffer)
self.reader = MemoryReceiveChannel(self._buffer)
self.writer = MemorySendChannel(self._buffer)

View File

@ -4,21 +4,24 @@ import random
async def waiter(ch: structio.ChannelReader):
print("[waiter] Waiter is alive!")
async with ch:
while True:
print("[waiter] Awaiting events")
while True:
print("[waiter] Awaiting events")
try:
evt: structio.Event = await ch.receive()
if not evt:
break
print("[waiter] Received event, waiting to be triggered")
await evt.wait()
print("[waiter] Event triggered")
except structio.ResourceClosed:
break
print("[waiter] Received event, waiting to be triggered")
t = structio.clock()
await evt.wait()
print(f"[waiter] Event triggered after {structio.clock() - t:.2f} seconds")
print("[waiter] Done!")
async def sender(ch: structio.ChannelWriter, n: int):
async def sender(ch: structio.Channel, n: int):
print("[sender] Sender is alive!")
async with ch:
# Channel is automatically closed when exiting
# the async with block
for _ in range(n):
print("[sender] Sending event")
ev = structio.Event()
@ -28,7 +31,6 @@ async def sender(ch: structio.ChannelWriter, n: int):
await structio.sleep(t)
print("[sender] Setting the event")
ev.set()
await ch.send(None)
print("[sender] Done!")
@ -36,8 +38,13 @@ async def main(n: int):
print("[main] Parent is alive")
channel = structio.MemoryChannel(1)
async with structio.create_pool() as pool:
# Each end of the channel can be used independently,
# and closing one does not also close the other (which
# is why we pass the full channel object to our sender
# so it can close both ends and cause the reader to catch
# the closing exception and exit cleanly)
pool.spawn(waiter, channel.reader)
pool.spawn(sender, channel.writer, n)
pool.spawn(sender, channel, n)
print("[main] Children spawned")
print("[main] Done!")

View File

@ -33,7 +33,7 @@ async def writer(ch: structio.ChannelWriter, objects: list[Any]):
async def main(objects: list[Any]):
print("[main] Parent is alive")
# We construct a new memory channel...
channel = structio.MemoryChannel(1)
channel = structio.MemoryChannel(1) # 1 is the size of the internal buffer
async with structio.create_pool() as pool:
# ... and dispatch the two ends to different
# tasks. Isn't this neat?