Fixed most bugs with I/O (except dangling resources)
This commit is contained in:
parent
acc436d518
commit
3ac6ea58cb
|
@ -22,8 +22,7 @@ from aiosched.internals.syscalls import (
|
|||
cancel,
|
||||
set_context,
|
||||
close_context,
|
||||
join,
|
||||
current_task,
|
||||
join
|
||||
)
|
||||
from typing import Any, Coroutine, Callable
|
||||
|
||||
|
@ -88,7 +87,7 @@ class TaskContext(Task):
|
|||
"""
|
||||
|
||||
if isinstance(other, TaskContext):
|
||||
return super().__eq__(self, other)
|
||||
return super().__eq__(other)
|
||||
elif isinstance(other, Task):
|
||||
return other == self.entry_point
|
||||
return False
|
||||
|
|
|
@ -75,7 +75,7 @@ class AsyncStream:
|
|||
|
||||
async def write(self, data):
|
||||
"""
|
||||
Writes data b to the file.
|
||||
Writes data to the stream.
|
||||
Returns the number of bytes
|
||||
written
|
||||
"""
|
||||
|
|
|
@ -120,17 +120,18 @@ class FIFOKernel:
|
|||
# There's tasks sleeping and/or on the
|
||||
# ready queue!
|
||||
return False
|
||||
for key in self.selector.get_map().values():
|
||||
# We don't just do any([self.paused, self.run_ready, self.selector.get_map()])
|
||||
# because we don't want to just know if there's any resources we're waiting on,
|
||||
# but if there's at least one non-terminated task that owns a resource we're
|
||||
# waiting on. This avoids issues such as the event loop never exiting if the
|
||||
# user forgets to close a socket, for example
|
||||
key.data: Task
|
||||
if key.data.done():
|
||||
continue
|
||||
elif self.get_task_io(key.data):
|
||||
return False
|
||||
if self.selector.get_map():
|
||||
for key in self.selector.get_map().values():
|
||||
# We don't just do any([self.paused, self.run_ready, self.selector.get_map()])
|
||||
# because we don't want to just know if there's any resources we're waiting on,
|
||||
# but if there's at least one non-terminated task that owns a resource we're
|
||||
# waiting on. This avoids issues such as the event loop never exiting if the
|
||||
# user forgets to close a socket, for example
|
||||
key.data: Task
|
||||
if key.data.done():
|
||||
continue
|
||||
elif self.get_task_io(key.data):
|
||||
return False
|
||||
return True
|
||||
|
||||
def close(self, force: bool = False):
|
||||
|
@ -406,7 +407,11 @@ class FIFOKernel:
|
|||
lambda o: o.fileobj == stream,
|
||||
dict(self.selector.get_map()).values(),
|
||||
):
|
||||
self.handle_task_run(partial(k.data.throw, exc), k.data)
|
||||
if k.data != self.current_task:
|
||||
# We don't want to raise an error inside
|
||||
# the task that's trying to close the stream!
|
||||
self.handle_task_run(partial(k.data.throw, exc), k.data)
|
||||
self.reschedule_running()
|
||||
|
||||
def cancel(self, task: Task):
|
||||
"""
|
||||
|
|
|
@ -8,7 +8,7 @@ async def producer(c: aiosched.NetworkChannel, n: int):
|
|||
await c.write(str(i).encode())
|
||||
print(f"[producer] Sent {i}")
|
||||
await aiosched.sleep(0.5) # This makes the receiver wait on us!
|
||||
#await c.close()
|
||||
await c.close()
|
||||
print("[producer] Done")
|
||||
|
||||
|
||||
|
|
Reference in New Issue