From 8d3315d2cb400cd8fafaa17b4917d1e96b9200b0 Mon Sep 17 00:00:00 2001 From: Nocturn9x Date: Wed, 10 May 2023 13:53:11 +0200 Subject: [PATCH] Fixed issue with sleeping and I/O --- aiosched/kernel.py | 11 ++++++++++- aiosched/sync.py | 4 ++-- tests/network_channel.py | 9 +++++---- 3 files changed, 17 insertions(+), 7 deletions(-) diff --git a/aiosched/kernel.py b/aiosched/kernel.py index d6bc2fa..b94ca06 100644 --- a/aiosched/kernel.py +++ b/aiosched/kernel.py @@ -220,7 +220,16 @@ class FIFOKernel: for key, _ in self.selector.select(timeout): key.data: dict[int, Task] for task in key.data.values(): - self.run_ready.append(task) # Resource ready? Schedule its task + # Since we don't unschedule I/O + # resources after every operation, + # we may hold on to a socket while + # its owner task is sleeping on some + # synchronization primitive: if we + # rescheduled it now, it would cause + # all sort of nonsense! So we only + # schedule tasks waiting for I/O to happen + if task.state == TaskState.IO: + self.run_ready.append(task) # Resource ready? Schedule its task self.debugger.after_io(self.clock() - before_time) def awake_tasks(self): diff --git a/aiosched/sync.py b/aiosched/sync.py index 4ce6eb4..81de09d 100644 --- a/aiosched/sync.py +++ b/aiosched/sync.py @@ -223,7 +223,7 @@ class MemoryChannel(Channel): # We use a queue as our buffer self.buffer = Queue(maxsize=maxsize) - async def write(self, data: str): + async def write(self, data): """ Writes data to the channel. Blocks if the internal queue is full until a spot is available. Does nothing @@ -321,7 +321,7 @@ class NetworkChannel(Channel): if self.closed: return False - elif self.reader.fileno() == -1: + elif await self.reader.fileno() == -1: return False else: try: diff --git a/tests/network_channel.py b/tests/network_channel.py index 8ab7f67..7bf094a 100644 --- a/tests/network_channel.py +++ b/tests/network_channel.py @@ -1,7 +1,6 @@ import aiosched - async def producer(c: aiosched.NetworkChannel, n: int): print("[producer] Started") for i in range(n): @@ -18,7 +17,9 @@ async def consumer(c: aiosched.NetworkChannel): while await c.pending(): item = await c.read(1) print(f"[consumer] Received {item.decode()}") - # await aiosched.sleep(2) # If you uncomment this, the except block will be triggered + # In this example, the consumer is twice as slow + # as the producer + await aiosched.sleep(1) except aiosched.errors.ResourceClosed: print("[consumer] Stream has been closed early!") print("[consumer] Done") @@ -34,5 +35,5 @@ async def main(channel: aiosched.NetworkChannel, n: int): aiosched.run( - main, aiosched.NetworkChannel(), 5, debugger=() -) # 2 is the max size of the channel + main, aiosched.NetworkChannel(), 10, debugger=() +)