From a1b40bd34029b798dfc3195257fb6a9d2ac97aae Mon Sep 17 00:00:00 2001 From: Nocturn9x Date: Wed, 22 Feb 2023 12:18:45 +0100 Subject: [PATCH] Various bug fixes --- aiosched/kernel.py | 112 ++++++++++++++++++++++++------------------- tests/echo_server.py | 4 +- 2 files changed, 66 insertions(+), 50 deletions(-) diff --git a/aiosched/kernel.py b/aiosched/kernel.py index 0a06bae..4a5b640 100644 --- a/aiosched/kernel.py +++ b/aiosched/kernel.py @@ -31,7 +31,7 @@ from aiosched.errors import ( ResourceBroken, ) from aiosched.context import TaskContext -from selectors import DefaultSelector, BaseSelector +from selectors import DefaultSelector, BaseSelector, EVENT_READ, EVENT_WRITE class FIFOKernel: @@ -120,18 +120,13 @@ class FIFOKernel: # There's tasks sleeping and/or on the # ready queue! return False - if self.selector.get_map(): - for key in self.selector.get_map().values(): - # We don't just do any([self.paused, self.run_ready, self.selector.get_map()]) - # because we don't want to just know if there's any resources we're waiting on, - # but if there's at least one non-terminated task that owns a resource we're - # waiting on. This avoids issues such as the event loop never exiting if the - # user forgets to close a socket, for example - key.data: Task - if key.data.done(): - continue - elif self.get_task_io(key.data): - return False + if self.get_active_io_count(): + # We don't just do any([self.paused, self.run_ready, self.selector.get_map()]) + # because we don't want to just know if there's any resources we're waiting on, + # but if there's at least one non-terminated task that owns a resource we're + # waiting on. This avoids issues such as the event loop never exiting if the + # user forgets to close a socket, for example + return False return True def close(self, force: bool = False): @@ -193,15 +188,15 @@ class FIFOKernel: self.debugger.before_io(timeout) # Get sockets that are ready and schedule their tasks for key, _ in self.selector.select(timeout): - key.data: Task - if key.data.state == TaskState.IO: + key.data: dict[int, Task] + for task in key.data.values(): # We don't reschedule a task that wasn't # blocking on I/O before: this way if a # task waits on a socket and then goes to # sleep, it won't be woken up early if the # resource becomes available before its # deadline expires - self.run_ready.append(key.data) # Resource ready? Schedule its task + self.run_ready.append(task) # Resource ready? Schedule its task self.debugger.after_io(self.clock() - before_time) def awake_tasks(self): @@ -261,10 +256,15 @@ class FIFOKernel: # Sets the currently running task self.current_task = self.run_ready.popleft() while self.current_task.done(): - # We need to make sure we don't try to execute - # exited tasks that are on the running queue + # We make sure not to schedule + # any terminated tasks. Might want + # to eventually get rid of this code, + # but for now it does the job if not self.run_ready: - return # No more tasks to run! + # We'll let run() handle the I/O + # or the shutdown if necessary, as + # there are no more runnable tasks + return self.current_task = self.run_ready.popleft() # We nullify the exception object just in case the # entry point raised and caught an error so that @@ -326,13 +326,13 @@ class FIFOKernel: # the closest deadline to avoid starving sleeping tasks # or missing deadlines if self.selector.get_map(): - self.wait_io() + self.handle_errors(self.wait_io) if self.paused: # Next we check for deadlines - self.awake_tasks() + self.handle_errors(self.awake_tasks) else: # Otherwise, while there are tasks ready to run, we run them! - self.handle_task_run(self.run_task_step) + self.handle_errors(self.run_task_step) def start( self, func: Callable[..., Coroutine[Any, Any, Any]], *args, **kwargs @@ -369,6 +369,7 @@ class FIFOKernel: if self.selector.get_map() and resource in self.selector.get_map(): self.selector.unregister(resource) self.debugger.on_io_unschedule(resource) + self.reschedule_running() def io_release_task(self, task: Task): """ @@ -377,19 +378,26 @@ class FIFOKernel: """ for key in filter( - lambda k: k.data == task, dict(self.selector.get_map()).values() + lambda k: task in k.data.values(), dict(self.selector.get_map()).values() ): self.notify_closing(key.fileobj, broken=True) self.selector.unregister(key.fileobj) task.last_io = () - def get_task_io(self, task: Task) -> list: + def get_active_io_count(self) -> int: """ - Returns the streams currently in use by - the given task + Returns the number of streams that are currently + being used by any active task """ - return list(map(lambda k: k.fileobj, filter(lambda k: k.data == task, self.selector.get_map().values()))) + result = 0 + for key in (self.selector.get_map() or {}).values(): + key.data: dict[int, Task] + for task in key.data.values(): + if task.done(): + continue + result += 1 + return result def notify_closing(self, stream, broken: bool = False): """ @@ -407,10 +415,11 @@ class FIFOKernel: lambda o: o.fileobj == stream, dict(self.selector.get_map()).values(), ): - if k.data != self.current_task: - # We don't want to raise an error inside - # the task that's trying to close the stream! - self.handle_task_run(partial(k.data.throw, exc), k.data) + for task in k.data.values(): + if task is not self.current_task: + # We don't want to raise an error inside + # the task that's trying to close the stream! + self.handle_errors(partial(k.data.throw, exc), k.data) self.reschedule_running() def cancel(self, task: Task): @@ -420,14 +429,14 @@ class FIFOKernel: it fails """ - self.io_release_task(task) - self.paused.discard(task) - self.handle_task_run(partial(task.throw, Cancelled(task)), task) + self.handle_errors(partial(task.throw, Cancelled(task)), task) if task.state != TaskState.CANCELLED: task.pending_cancellation = True + self.io_release_task(task) + self.paused.discard(task) self.reschedule_running() - def handle_task_run(self, func: Callable, task: Task | None = None): + def handle_errors(self, func: Callable, task: Task | None = None): """ Convenience method for handling various exceptions from tasks @@ -470,6 +479,7 @@ class FIFOKernel: task = task or self.current_task task.exc = err task.state = TaskState.CRASHED + self.debugger.on_exception_raised(task, err) self.wait(task) def sleep(self, seconds: int | float): @@ -499,7 +509,8 @@ class FIFOKernel: self.paused.discard(task) self.io_release_task(task) self.run_ready.extend(task.joiners) - self.reschedule_running() + if task is not self.current_task: + self.reschedule_running() def join(self, task: Task): """ @@ -575,19 +586,19 @@ class FIFOKernel: self.current_task.state = TaskState.IO if self.current_task.last_io: - # Since, most of the time, tasks will perform multiple + # Since most of the time tasks will perform multiple # I/O operations on a given resource, unregistering them # every time isn't a sensible approach. A quick and # easy optimization to address this problem is to - # store the last I/O operation that the task performed + # store the last I/O operation that the task performed, # together with the resource itself, inside the task # object. If the task then tries to perform the same - # operation on the same resource again, then this method - # returns immediately as it is already being watched by - # the selector. If the resource is the same, but the + # operation on the same resource again, this method then + # returns immediately as the resource is already being watched + # by the selector. If the resource is the same, but the # event type has changed, then we modify the resource's # associated event. Only if the resource is different from - # the last one used, then this method will register a new + # the last one used then this method will register a new # one if self.current_task.last_io == (evt_type, resource): # Selector is already listening for that event on @@ -595,7 +606,7 @@ class FIFOKernel: return elif self.current_task.last_io[1] == resource: # If the event to listen for has changed we just modify it - self.selector.modify(resource, evt_type, self.current_task) + self.selector.modify(resource, evt_type, {evt_type: self.current_task}) self.current_task.last_io = (evt_type, resource) self.debugger.on_io_schedule(resource, evt_type) elif not self.current_task.last_io or self.current_task.last_io[1] != resource: @@ -603,21 +614,26 @@ class FIFOKernel: # I/O for the first time self.current_task.last_io = evt_type, resource try: - self.selector.register(resource, evt_type, self.current_task) + self.selector.register(resource, evt_type, {evt_type: self.current_task}) self.debugger.on_io_schedule(resource, evt_type) except KeyError: # The stream is already being used key = self.selector.get_key(resource) - if key.data == self.current_task or evt_type != key.events: + if key.data[key.events] == self.current_task: # If the task that registered the stream # changed their mind on what they want # to do with it, who are we to deny their - # request? We also modify the event in + # request? + self.selector.modify(resource, key.events | evt_type, {EVENT_READ: self.current_task, + EVENT_WRITE: self.current_task}) + self.debugger.on_io_schedule(resource, evt_type) + elif key.events != evt_type: + # We also modify the event in # our selector so that one task can read # off a given stream while another one is # writing to it - self.selector.modify(resource, evt_type, self.current_task) - self.debugger.on_io_schedule(resource, evt_type) + self.selector.modify(resource, key.events | evt_type, {evt_type: self.current_task, + key.events: list(key.data.values())[0]}) else: # One task reading and one writing on the same # resource is fine (think producer-consumer), diff --git a/tests/echo_server.py b/tests/echo_server.py index 2000948..6a11e33 100644 --- a/tests/echo_server.py +++ b/tests/echo_server.py @@ -1,7 +1,7 @@ +import sys +import logging import aiosched from debugger import Debugger -import logging -import sys # A test to check for asynchronous I/O