Fixed issue with sleeping and I/O
This commit is contained in:
parent
7272516e78
commit
8d3315d2cb
|
@ -220,7 +220,16 @@ class FIFOKernel:
|
||||||
for key, _ in self.selector.select(timeout):
|
for key, _ in self.selector.select(timeout):
|
||||||
key.data: dict[int, Task]
|
key.data: dict[int, Task]
|
||||||
for task in key.data.values():
|
for task in key.data.values():
|
||||||
self.run_ready.append(task) # Resource ready? Schedule its task
|
# Since we don't unschedule I/O
|
||||||
|
# resources after every operation,
|
||||||
|
# we may hold on to a socket while
|
||||||
|
# its owner task is sleeping on some
|
||||||
|
# synchronization primitive: if we
|
||||||
|
# rescheduled it now, it would cause
|
||||||
|
# all sort of nonsense! So we only
|
||||||
|
# schedule tasks waiting for I/O to happen
|
||||||
|
if task.state == TaskState.IO:
|
||||||
|
self.run_ready.append(task) # Resource ready? Schedule its task
|
||||||
self.debugger.after_io(self.clock() - before_time)
|
self.debugger.after_io(self.clock() - before_time)
|
||||||
|
|
||||||
def awake_tasks(self):
|
def awake_tasks(self):
|
||||||
|
|
|
@ -223,7 +223,7 @@ class MemoryChannel(Channel):
|
||||||
# We use a queue as our buffer
|
# We use a queue as our buffer
|
||||||
self.buffer = Queue(maxsize=maxsize)
|
self.buffer = Queue(maxsize=maxsize)
|
||||||
|
|
||||||
async def write(self, data: str):
|
async def write(self, data):
|
||||||
"""
|
"""
|
||||||
Writes data to the channel. Blocks if the internal
|
Writes data to the channel. Blocks if the internal
|
||||||
queue is full until a spot is available. Does nothing
|
queue is full until a spot is available. Does nothing
|
||||||
|
@ -321,7 +321,7 @@ class NetworkChannel(Channel):
|
||||||
|
|
||||||
if self.closed:
|
if self.closed:
|
||||||
return False
|
return False
|
||||||
elif self.reader.fileno() == -1:
|
elif await self.reader.fileno() == -1:
|
||||||
return False
|
return False
|
||||||
else:
|
else:
|
||||||
try:
|
try:
|
||||||
|
|
|
@ -1,7 +1,6 @@
|
||||||
import aiosched
|
import aiosched
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
async def producer(c: aiosched.NetworkChannel, n: int):
|
async def producer(c: aiosched.NetworkChannel, n: int):
|
||||||
print("[producer] Started")
|
print("[producer] Started")
|
||||||
for i in range(n):
|
for i in range(n):
|
||||||
|
@ -18,7 +17,9 @@ async def consumer(c: aiosched.NetworkChannel):
|
||||||
while await c.pending():
|
while await c.pending():
|
||||||
item = await c.read(1)
|
item = await c.read(1)
|
||||||
print(f"[consumer] Received {item.decode()}")
|
print(f"[consumer] Received {item.decode()}")
|
||||||
# await aiosched.sleep(2) # If you uncomment this, the except block will be triggered
|
# In this example, the consumer is twice as slow
|
||||||
|
# as the producer
|
||||||
|
await aiosched.sleep(1)
|
||||||
except aiosched.errors.ResourceClosed:
|
except aiosched.errors.ResourceClosed:
|
||||||
print("[consumer] Stream has been closed early!")
|
print("[consumer] Stream has been closed early!")
|
||||||
print("[consumer] Done")
|
print("[consumer] Done")
|
||||||
|
@ -34,5 +35,5 @@ async def main(channel: aiosched.NetworkChannel, n: int):
|
||||||
|
|
||||||
|
|
||||||
aiosched.run(
|
aiosched.run(
|
||||||
main, aiosched.NetworkChannel(), 5, debugger=()
|
main, aiosched.NetworkChannel(), 10, debugger=()
|
||||||
) # 2 is the max size of the channel
|
)
|
||||||
|
|
Reference in New Issue