Various bug fixes

This commit is contained in:
Nocturn9x 2023-02-22 12:18:45 +01:00
parent 3ac6ea58cb
commit a1b40bd340
2 changed files with 66 additions and 50 deletions

View File

@ -31,7 +31,7 @@ from aiosched.errors import (
ResourceBroken,
)
from aiosched.context import TaskContext
from selectors import DefaultSelector, BaseSelector
from selectors import DefaultSelector, BaseSelector, EVENT_READ, EVENT_WRITE
class FIFOKernel:
@ -120,18 +120,13 @@ class FIFOKernel:
# There's tasks sleeping and/or on the
# ready queue!
return False
if self.selector.get_map():
for key in self.selector.get_map().values():
# We don't just do any([self.paused, self.run_ready, self.selector.get_map()])
# because we don't want to just know if there's any resources we're waiting on,
# but if there's at least one non-terminated task that owns a resource we're
# waiting on. This avoids issues such as the event loop never exiting if the
# user forgets to close a socket, for example
key.data: Task
if key.data.done():
continue
elif self.get_task_io(key.data):
return False
if self.get_active_io_count():
# We don't just do any([self.paused, self.run_ready, self.selector.get_map()])
# because we don't want to just know if there's any resources we're waiting on,
# but if there's at least one non-terminated task that owns a resource we're
# waiting on. This avoids issues such as the event loop never exiting if the
# user forgets to close a socket, for example
return False
return True
def close(self, force: bool = False):
@ -193,15 +188,15 @@ class FIFOKernel:
self.debugger.before_io(timeout)
# Get sockets that are ready and schedule their tasks
for key, _ in self.selector.select(timeout):
key.data: Task
if key.data.state == TaskState.IO:
key.data: dict[int, Task]
for task in key.data.values():
# We don't reschedule a task that wasn't
# blocking on I/O before: this way if a
# task waits on a socket and then goes to
# sleep, it won't be woken up early if the
# resource becomes available before its
# deadline expires
self.run_ready.append(key.data) # Resource ready? Schedule its task
self.run_ready.append(task) # Resource ready? Schedule its task
self.debugger.after_io(self.clock() - before_time)
def awake_tasks(self):
@ -261,10 +256,15 @@ class FIFOKernel:
# Sets the currently running task
self.current_task = self.run_ready.popleft()
while self.current_task.done():
# We need to make sure we don't try to execute
# exited tasks that are on the running queue
# We make sure not to schedule
# any terminated tasks. Might want
# to eventually get rid of this code,
# but for now it does the job
if not self.run_ready:
return # No more tasks to run!
# We'll let run() handle the I/O
# or the shutdown if necessary, as
# there are no more runnable tasks
return
self.current_task = self.run_ready.popleft()
# We nullify the exception object just in case the
# entry point raised and caught an error so that
@ -326,13 +326,13 @@ class FIFOKernel:
# the closest deadline to avoid starving sleeping tasks
# or missing deadlines
if self.selector.get_map():
self.wait_io()
self.handle_errors(self.wait_io)
if self.paused:
# Next we check for deadlines
self.awake_tasks()
self.handle_errors(self.awake_tasks)
else:
# Otherwise, while there are tasks ready to run, we run them!
self.handle_task_run(self.run_task_step)
self.handle_errors(self.run_task_step)
def start(
self, func: Callable[..., Coroutine[Any, Any, Any]], *args, **kwargs
@ -369,6 +369,7 @@ class FIFOKernel:
if self.selector.get_map() and resource in self.selector.get_map():
self.selector.unregister(resource)
self.debugger.on_io_unschedule(resource)
self.reschedule_running()
def io_release_task(self, task: Task):
"""
@ -377,19 +378,26 @@ class FIFOKernel:
"""
for key in filter(
lambda k: k.data == task, dict(self.selector.get_map()).values()
lambda k: task in k.data.values(), dict(self.selector.get_map()).values()
):
self.notify_closing(key.fileobj, broken=True)
self.selector.unregister(key.fileobj)
task.last_io = ()
def get_task_io(self, task: Task) -> list:
def get_active_io_count(self) -> int:
"""
Returns the streams currently in use by
the given task
Returns the number of streams that are currently
being used by any active task
"""
return list(map(lambda k: k.fileobj, filter(lambda k: k.data == task, self.selector.get_map().values())))
result = 0
for key in (self.selector.get_map() or {}).values():
key.data: dict[int, Task]
for task in key.data.values():
if task.done():
continue
result += 1
return result
def notify_closing(self, stream, broken: bool = False):
"""
@ -407,10 +415,11 @@ class FIFOKernel:
lambda o: o.fileobj == stream,
dict(self.selector.get_map()).values(),
):
if k.data != self.current_task:
# We don't want to raise an error inside
# the task that's trying to close the stream!
self.handle_task_run(partial(k.data.throw, exc), k.data)
for task in k.data.values():
if task is not self.current_task:
# We don't want to raise an error inside
# the task that's trying to close the stream!
self.handle_errors(partial(k.data.throw, exc), k.data)
self.reschedule_running()
def cancel(self, task: Task):
@ -420,14 +429,14 @@ class FIFOKernel:
it fails
"""
self.io_release_task(task)
self.paused.discard(task)
self.handle_task_run(partial(task.throw, Cancelled(task)), task)
self.handle_errors(partial(task.throw, Cancelled(task)), task)
if task.state != TaskState.CANCELLED:
task.pending_cancellation = True
self.io_release_task(task)
self.paused.discard(task)
self.reschedule_running()
def handle_task_run(self, func: Callable, task: Task | None = None):
def handle_errors(self, func: Callable, task: Task | None = None):
"""
Convenience method for handling various exceptions
from tasks
@ -470,6 +479,7 @@ class FIFOKernel:
task = task or self.current_task
task.exc = err
task.state = TaskState.CRASHED
self.debugger.on_exception_raised(task, err)
self.wait(task)
def sleep(self, seconds: int | float):
@ -499,7 +509,8 @@ class FIFOKernel:
self.paused.discard(task)
self.io_release_task(task)
self.run_ready.extend(task.joiners)
self.reschedule_running()
if task is not self.current_task:
self.reschedule_running()
def join(self, task: Task):
"""
@ -575,19 +586,19 @@ class FIFOKernel:
self.current_task.state = TaskState.IO
if self.current_task.last_io:
# Since, most of the time, tasks will perform multiple
# Since most of the time tasks will perform multiple
# I/O operations on a given resource, unregistering them
# every time isn't a sensible approach. A quick and
# easy optimization to address this problem is to
# store the last I/O operation that the task performed
# store the last I/O operation that the task performed,
# together with the resource itself, inside the task
# object. If the task then tries to perform the same
# operation on the same resource again, then this method
# returns immediately as it is already being watched by
# the selector. If the resource is the same, but the
# operation on the same resource again, this method then
# returns immediately as the resource is already being watched
# by the selector. If the resource is the same, but the
# event type has changed, then we modify the resource's
# associated event. Only if the resource is different from
# the last one used, then this method will register a new
# the last one used then this method will register a new
# one
if self.current_task.last_io == (evt_type, resource):
# Selector is already listening for that event on
@ -595,7 +606,7 @@ class FIFOKernel:
return
elif self.current_task.last_io[1] == resource:
# If the event to listen for has changed we just modify it
self.selector.modify(resource, evt_type, self.current_task)
self.selector.modify(resource, evt_type, {evt_type: self.current_task})
self.current_task.last_io = (evt_type, resource)
self.debugger.on_io_schedule(resource, evt_type)
elif not self.current_task.last_io or self.current_task.last_io[1] != resource:
@ -603,21 +614,26 @@ class FIFOKernel:
# I/O for the first time
self.current_task.last_io = evt_type, resource
try:
self.selector.register(resource, evt_type, self.current_task)
self.selector.register(resource, evt_type, {evt_type: self.current_task})
self.debugger.on_io_schedule(resource, evt_type)
except KeyError:
# The stream is already being used
key = self.selector.get_key(resource)
if key.data == self.current_task or evt_type != key.events:
if key.data[key.events] == self.current_task:
# If the task that registered the stream
# changed their mind on what they want
# to do with it, who are we to deny their
# request? We also modify the event in
# request?
self.selector.modify(resource, key.events | evt_type, {EVENT_READ: self.current_task,
EVENT_WRITE: self.current_task})
self.debugger.on_io_schedule(resource, evt_type)
elif key.events != evt_type:
# We also modify the event in
# our selector so that one task can read
# off a given stream while another one is
# writing to it
self.selector.modify(resource, evt_type, self.current_task)
self.debugger.on_io_schedule(resource, evt_type)
self.selector.modify(resource, key.events | evt_type, {evt_type: self.current_task,
key.events: list(key.data.values())[0]})
else:
# One task reading and one writing on the same
# resource is fine (think producer-consumer),

View File

@ -1,7 +1,7 @@
import sys
import logging
import aiosched
from debugger import Debugger
import logging
import sys
# A test to check for asynchronous I/O