From e730f7f27abd38c4c47e02c7c4515f6157f297b3 Mon Sep 17 00:00:00 2001 From: Nocturn9x Date: Tue, 28 Mar 2023 14:42:41 +0200 Subject: [PATCH] I/O and Ctrl+C fixes --- aiosched/context.py | 8 ++++++-- aiosched/kernel.py | 28 +++++++++++++++------------- tests/chatroom_server.py | 2 +- 3 files changed, 22 insertions(+), 16 deletions(-) diff --git a/aiosched/context.py b/aiosched/context.py index 52c5eae..d6d2ec6 100644 --- a/aiosched/context.py +++ b/aiosched/context.py @@ -15,7 +15,7 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. """ -from aiosched.task import Task +from aiosched.task import Task, TaskState from aiosched.internals.syscalls import ( spawn, wait, @@ -196,9 +196,13 @@ class TaskContext(Task): def throw(self, err: BaseException): for task in self.tasks: + if task is self.entry_point: + continue try: + task.exc = err + task.state = TaskState.CRASHED task.throw(err) - except err: + finally: continue self.entry_point.throw(err) diff --git a/aiosched/kernel.py b/aiosched/kernel.py index 1cff740..71fe6fe 100644 --- a/aiosched/kernel.py +++ b/aiosched/kernel.py @@ -15,6 +15,7 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. """ +import random import signal import itertools from collections import deque @@ -127,9 +128,6 @@ class FIFOKernel: to do """ - if self.current_task and not self.current_task.done(): - # Current task isn't done yet! - return False if any([self.paused, self.run_ready]): # There's tasks sleeping and/or on the # ready queue! @@ -286,7 +284,6 @@ class FIFOKernel: # entry point raised and caught an error so that # self.start() doesn't raise it again at the end self.current_task.exc = None - self.debugger.before_task_step(self.current_task) self._running = True # Some debugging and internal chatter here self.current_task.state = TaskState.RUN @@ -294,11 +291,13 @@ class FIFOKernel: if self._sigint_handled: self._sigint_handled = False self.current_task.throw(KeyboardInterrupt()) + self.join(self.current_task) elif self.current_task.pending_cancellation: # We perform the deferred cancellation # if it was previously scheduled self.cancel(self.current_task) else: + self.debugger.before_task_step(self.current_task) # Run a single step with the calculation (i.e. until a yield # somewhere) method, args, kwargs = self.current_task.run( @@ -341,17 +340,17 @@ class FIFOKernel: self.shutdown() break elif self._sigint_handled: - # We got Ctrl+C-ed while not running a task! Pick a - # random task and blow it up with a KeyboardInterrupt - # exception: our existing error handling machinery will - # deal with it accordingly + # We got Ctrl+C-ed while not running a task! We pick + # any of the tasks we have, schedule it for execution + # (no matter what it's doing, because it doesn't really + # matter) and let run_task_step raise an exception inside + # it task: Task | None = None if self.selector.get_map(): # Pretty convoluted, huh? Sorry, but I wanted this on one line ;) task = next(iter(next(iter(self.selector.get_map().values())).data.values())) elif self.paused: - # TODO - task = None + task, *_ = self.paused.get() self.run_ready.append(task) self.handle_errors(self.run_task_step) elif not self.run_ready: @@ -374,7 +373,7 @@ class FIFOKernel: """ Starts the event loop from a synchronous context """ - + old = signal.getsignal(signal.SIGINT) signal.signal(signal.SIGINT, self._sigint_handler) self.entry_point = Task(func.__name__ or str(func), func(*args, **kwargs)) @@ -458,7 +457,8 @@ class FIFOKernel: if task is not self.current_task: # We don't want to raise an error inside # the task that's trying to close the stream! - self.handle_errors(partial(k.data.throw, exc), k.data) + for t in k.data: + self.handle_errors(partial(t.throw, exc), k.data) self.reschedule_running() def cancel(self, task: Task): @@ -513,13 +513,15 @@ class FIFOKernel: task.pending_cancellation = False self.debugger.after_cancel(task) self.wait(task) - except Exception as err: + except (Exception, KeyboardInterrupt) as err: # Any other exception is caught here task = task or self.current_task task.exc = err task.state = TaskState.CRASHED self.debugger.on_exception_raised(task, err) self.wait(task) + if isinstance(err, KeyboardInterrupt): + raise def sleep(self, seconds: int | float): """ diff --git a/tests/chatroom_server.py b/tests/chatroom_server.py index f22c347..3463b19 100644 --- a/tests/chatroom_server.py +++ b/tests/chatroom_server.py @@ -88,7 +88,7 @@ if __name__ == "__main__": datefmt="%d/%m/%Y %p", ) try: - aiosched.run(serve, ("localhost", port), debugger=()) + aiosched.run(serve, ("0.0.0.0", port), debugger=()) except (Exception, KeyboardInterrupt) as error: # Exceptions propagate! if isinstance(error, KeyboardInterrupt): logging.info("Ctrl+C detected, exiting")