diff --git a/aiosched/context.py b/aiosched/context.py index ddc96e6..c2ed835 100644 --- a/aiosched/context.py +++ b/aiosched/context.py @@ -22,8 +22,7 @@ from aiosched.internals.syscalls import ( cancel, set_context, close_context, - join, - current_task, + join ) from typing import Any, Coroutine, Callable @@ -88,7 +87,7 @@ class TaskContext(Task): """ if isinstance(other, TaskContext): - return super().__eq__(self, other) + return super().__eq__(other) elif isinstance(other, Task): return other == self.entry_point return False diff --git a/aiosched/io.py b/aiosched/io.py index f2ab17a..88900a9 100644 --- a/aiosched/io.py +++ b/aiosched/io.py @@ -75,7 +75,7 @@ class AsyncStream: async def write(self, data): """ - Writes data b to the file. + Writes data to the stream. Returns the number of bytes written """ diff --git a/aiosched/kernel.py b/aiosched/kernel.py index f718d74..0a06bae 100644 --- a/aiosched/kernel.py +++ b/aiosched/kernel.py @@ -120,17 +120,18 @@ class FIFOKernel: # There's tasks sleeping and/or on the # ready queue! return False - for key in self.selector.get_map().values(): - # We don't just do any([self.paused, self.run_ready, self.selector.get_map()]) - # because we don't want to just know if there's any resources we're waiting on, - # but if there's at least one non-terminated task that owns a resource we're - # waiting on. This avoids issues such as the event loop never exiting if the - # user forgets to close a socket, for example - key.data: Task - if key.data.done(): - continue - elif self.get_task_io(key.data): - return False + if self.selector.get_map(): + for key in self.selector.get_map().values(): + # We don't just do any([self.paused, self.run_ready, self.selector.get_map()]) + # because we don't want to just know if there's any resources we're waiting on, + # but if there's at least one non-terminated task that owns a resource we're + # waiting on. This avoids issues such as the event loop never exiting if the + # user forgets to close a socket, for example + key.data: Task + if key.data.done(): + continue + elif self.get_task_io(key.data): + return False return True def close(self, force: bool = False): @@ -406,7 +407,11 @@ class FIFOKernel: lambda o: o.fileobj == stream, dict(self.selector.get_map()).values(), ): - self.handle_task_run(partial(k.data.throw, exc), k.data) + if k.data != self.current_task: + # We don't want to raise an error inside + # the task that's trying to close the stream! + self.handle_task_run(partial(k.data.throw, exc), k.data) + self.reschedule_running() def cancel(self, task: Task): """ diff --git a/tests/network_channel.py b/tests/network_channel.py index 582af6c..c99d5cc 100644 --- a/tests/network_channel.py +++ b/tests/network_channel.py @@ -8,7 +8,7 @@ async def producer(c: aiosched.NetworkChannel, n: int): await c.write(str(i).encode()) print(f"[producer] Sent {i}") await aiosched.sleep(0.5) # This makes the receiver wait on us! - #await c.close() + await c.close() print("[producer] Done")