diff --git a/aiosched/context.py b/aiosched/context.py index c2ed835..52c5eae 100644 --- a/aiosched/context.py +++ b/aiosched/context.py @@ -115,11 +115,11 @@ class TaskContext(Task): except BaseException as exc: await self.cancel(False) self.exc = exc - if not self.silent: - raise self.exc finally: await close_context(self) self.entry_point.propagate = True + if self.exc and not self.silent: + raise self.exc # Task method wrappers diff --git a/aiosched/kernel.py b/aiosched/kernel.py index 1047e92..be3bece 100644 --- a/aiosched/kernel.py +++ b/aiosched/kernel.py @@ -15,6 +15,7 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. """ +import signal import itertools from collections import deque from functools import partial @@ -86,6 +87,10 @@ class FIFOKernel: self.current_task: Task | None = None # The loop's entry point self.entry_point: Task | None = None + # Did we receive a Ctrl+C? + self._sigint_handled: bool = False + # Are we executing any task code? + self._running: bool = False def __repr__(self): """ @@ -107,6 +112,15 @@ class FIFOKernel: ) return f"{type(self).__name__}({data})" + def _sigint_handler(self, *_args): + """ + Handles SIGINT + + :return: + """ + + self._sigint_handled = True + def done(self) -> bool: """ Returns whether the loop has no more work @@ -169,6 +183,7 @@ class FIFOKernel: associated resource is ready to be used """ + self._running = False before_time = self.clock() # Used for the debugger timeout = 0.0 if self.run_ready: @@ -205,6 +220,7 @@ class FIFOKernel: has elapsed """ + self._running = False while self.paused and self.paused.get_closest_deadline() <= self.clock(): # Reschedules tasks when their deadline has elapsed task, _ = self.paused.get() @@ -271,10 +287,14 @@ class FIFOKernel: # self.start() doesn't raise it again at the end self.current_task.exc = None self.debugger.before_task_step(self.current_task) + self._running = True # Some debugging and internal chatter here self.current_task.state = TaskState.RUN self.current_task.steps += 1 - if self.current_task.pending_cancellation: + if self._sigint_handled: + self._sigint_handled = False + self.current_task.throw(KeyboardInterrupt()) + elif self.current_task.pending_cancellation: # We perform the deferred cancellation # if it was previously scheduled self.cancel(self.current_task) @@ -320,16 +340,30 @@ class FIFOKernel: # simply tear us down and return to self.start self.shutdown() break + elif self._sigint_handled: + # We got Ctrl+C-ed while not running a task! Pick a + # random task and blow it up with a KeyboardInterrupt + # exception: our existing error handling machinery will + # deal with it accordingly + task: Task | None = None + if self.selector.get_map(): + # Pretty convoluted, huh? Sorry, but I wanted this on one line ;) + task = next(iter(next(iter(self.selector.get_map().values())).data.values())) + elif self.paused: + # TODO + task = None + self.run_ready.append(task) + self.handle_errors(self.run_task_step) elif not self.run_ready: # If there are no actively running tasks, we start by # checking for I/O. This method will wait for I/O until # the closest deadline to avoid starving sleeping tasks # or missing deadlines if self.selector.get_map(): - self.handle_errors(self.wait_io) + self.wait_io() if self.paused: # Next we check for deadlines - self.handle_errors(self.awake_tasks) + self.awake_tasks() else: # Otherwise, while there are tasks ready to run, we run them! self.handle_errors(self.run_task_step) @@ -341,6 +375,7 @@ class FIFOKernel: Starts the event loop from a synchronous context """ + signal.signal(signal.SIGINT, self._sigint_handler) self.entry_point = Task(func.__name__ or str(func), func(*args, **kwargs)) self.run_ready.append(self.entry_point) self.debugger.on_start() @@ -366,9 +401,11 @@ class FIFOKernel: :param resource: The resource to be released """ - if self.selector.get_map() and resource in self.selector.get_map(): + if resource in self.selector.get_map(): self.selector.unregister(resource) self.debugger.on_io_unschedule(resource) + if resource is self.current_task.last_io[1]: + self.current_task.last_io = () self.reschedule_running() def io_release_task(self, task: Task): @@ -474,7 +511,7 @@ class FIFOKernel: task.pending_cancellation = False self.debugger.after_cancel(task) self.wait(task) - except BaseException as err: + except Exception as err: # Any other exception is caught here task = task or self.current_task task.exc = err @@ -638,8 +675,4 @@ class FIFOKernel: # but having two tasks reading/writing at the # same time can't lead to anything good, better # disallow it - self.current_task.throw( - ResourceBusy( - f"The resource is being read from/written by another task" - ) - ) + self.current_task.throw(ResourceBusy(f"The resource is being read from/written by another task")) diff --git a/aiosched/task.py b/aiosched/task.py index 7d0bde5..96cc631 100644 --- a/aiosched/task.py +++ b/aiosched/task.py @@ -20,6 +20,7 @@ from enum import Enum, auto from typing import Coroutine, Any from dataclasses import dataclass, field + class TaskState(Enum): """ An enumeration of task states diff --git a/tests/chatroom_server.py b/tests/chatroom_server.py index 9148f7f..f22c347 100644 --- a/tests/chatroom_server.py +++ b/tests/chatroom_server.py @@ -2,6 +2,8 @@ import aiosched import logging import sys + +from debugger import Debugger # An asynchronous chatroom clients: dict[aiosched.socket.AsyncSocket, list[str, str]] = {} @@ -86,7 +88,7 @@ if __name__ == "__main__": datefmt="%d/%m/%Y %p", ) try: - aiosched.run(serve, ("localhost", port)) + aiosched.run(serve, ("localhost", port), debugger=()) except (Exception, KeyboardInterrupt) as error: # Exceptions propagate! if isinstance(error, KeyboardInterrupt): logging.info("Ctrl+C detected, exiting")