Compare commits

...

3 Commits

Author SHA1 Message Date
Nocturn9x e730f7f27a
I/O and Ctrl+C fixes 2023-03-28 14:42:41 +02:00
Nocturn9x d80fe45959 Original SIGINT handler is restored upon exit 2023-03-02 14:40:06 +01:00
Nocturn9x 6c91129ab6 Half-assed fix for Ctrl+C 2023-03-02 14:38:14 +01:00
4 changed files with 64 additions and 20 deletions

View File

@ -15,7 +15,7 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
from aiosched.task import Task
from aiosched.task import Task, TaskState
from aiosched.internals.syscalls import (
spawn,
wait,
@ -115,11 +115,11 @@ class TaskContext(Task):
except BaseException as exc:
await self.cancel(False)
self.exc = exc
if not self.silent:
raise self.exc
finally:
await close_context(self)
self.entry_point.propagate = True
if self.exc and not self.silent:
raise self.exc
# Task method wrappers
@ -196,9 +196,13 @@ class TaskContext(Task):
def throw(self, err: BaseException):
for task in self.tasks:
if task is self.entry_point:
continue
try:
task.exc = err
task.state = TaskState.CRASHED
task.throw(err)
except err:
finally:
continue
self.entry_point.throw(err)

View File

@ -15,6 +15,8 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
import random
import signal
import itertools
from collections import deque
from functools import partial
@ -86,6 +88,10 @@ class FIFOKernel:
self.current_task: Task | None = None
# The loop's entry point
self.entry_point: Task | None = None
# Did we receive a Ctrl+C?
self._sigint_handled: bool = False
# Are we executing any task code?
self._running: bool = False
def __repr__(self):
"""
@ -107,15 +113,21 @@ class FIFOKernel:
)
return f"{type(self).__name__}({data})"
def _sigint_handler(self, *_args):
"""
Handles SIGINT
:return:
"""
self._sigint_handled = True
def done(self) -> bool:
"""
Returns whether the loop has no more work
to do
"""
if self.current_task and not self.current_task.done():
# Current task isn't done yet!
return False
if any([self.paused, self.run_ready]):
# There's tasks sleeping and/or on the
# ready queue!
@ -169,6 +181,7 @@ class FIFOKernel:
associated resource is ready to be used
"""
self._running = False
before_time = self.clock() # Used for the debugger
timeout = 0.0
if self.run_ready:
@ -205,6 +218,7 @@ class FIFOKernel:
has elapsed
"""
self._running = False
while self.paused and self.paused.get_closest_deadline() <= self.clock():
# Reschedules tasks when their deadline has elapsed
task, _ = self.paused.get()
@ -270,15 +284,20 @@ class FIFOKernel:
# entry point raised and caught an error so that
# self.start() doesn't raise it again at the end
self.current_task.exc = None
self.debugger.before_task_step(self.current_task)
self._running = True
# Some debugging and internal chatter here
self.current_task.state = TaskState.RUN
self.current_task.steps += 1
if self.current_task.pending_cancellation:
if self._sigint_handled:
self._sigint_handled = False
self.current_task.throw(KeyboardInterrupt())
self.join(self.current_task)
elif self.current_task.pending_cancellation:
# We perform the deferred cancellation
# if it was previously scheduled
self.cancel(self.current_task)
else:
self.debugger.before_task_step(self.current_task)
# Run a single step with the calculation (i.e. until a yield
# somewhere)
method, args, kwargs = self.current_task.run(
@ -320,16 +339,30 @@ class FIFOKernel:
# simply tear us down and return to self.start
self.shutdown()
break
elif self._sigint_handled:
# We got Ctrl+C-ed while not running a task! We pick
# any of the tasks we have, schedule it for execution
# (no matter what it's doing, because it doesn't really
# matter) and let run_task_step raise an exception inside
# it
task: Task | None = None
if self.selector.get_map():
# Pretty convoluted, huh? Sorry, but I wanted this on one line ;)
task = next(iter(next(iter(self.selector.get_map().values())).data.values()))
elif self.paused:
task, *_ = self.paused.get()
self.run_ready.append(task)
self.handle_errors(self.run_task_step)
elif not self.run_ready:
# If there are no actively running tasks, we start by
# checking for I/O. This method will wait for I/O until
# the closest deadline to avoid starving sleeping tasks
# or missing deadlines
if self.selector.get_map():
self.handle_errors(self.wait_io)
self.wait_io()
if self.paused:
# Next we check for deadlines
self.handle_errors(self.awake_tasks)
self.awake_tasks()
else:
# Otherwise, while there are tasks ready to run, we run them!
self.handle_errors(self.run_task_step)
@ -341,6 +374,8 @@ class FIFOKernel:
Starts the event loop from a synchronous context
"""
old = signal.getsignal(signal.SIGINT)
signal.signal(signal.SIGINT, self._sigint_handler)
self.entry_point = Task(func.__name__ or str(func), func(*args, **kwargs))
self.run_ready.append(self.entry_point)
self.debugger.on_start()
@ -348,6 +383,7 @@ class FIFOKernel:
self.run()
finally:
self.debugger.on_exit()
signal.signal(signal.SIGINT, old)
if (
self.entry_point.exc
and self.entry_point.context is None
@ -366,9 +402,11 @@ class FIFOKernel:
:param resource: The resource to be released
"""
if self.selector.get_map() and resource in self.selector.get_map():
if resource in self.selector.get_map():
self.selector.unregister(resource)
self.debugger.on_io_unschedule(resource)
if resource is self.current_task.last_io[1]:
self.current_task.last_io = ()
self.reschedule_running()
def io_release_task(self, task: Task):
@ -419,7 +457,8 @@ class FIFOKernel:
if task is not self.current_task:
# We don't want to raise an error inside
# the task that's trying to close the stream!
self.handle_errors(partial(k.data.throw, exc), k.data)
for t in k.data:
self.handle_errors(partial(t.throw, exc), k.data)
self.reschedule_running()
def cancel(self, task: Task):
@ -474,13 +513,15 @@ class FIFOKernel:
task.pending_cancellation = False
self.debugger.after_cancel(task)
self.wait(task)
except BaseException as err:
except (Exception, KeyboardInterrupt) as err:
# Any other exception is caught here
task = task or self.current_task
task.exc = err
task.state = TaskState.CRASHED
self.debugger.on_exception_raised(task, err)
self.wait(task)
if isinstance(err, KeyboardInterrupt):
raise
def sleep(self, seconds: int | float):
"""
@ -638,8 +679,4 @@ class FIFOKernel:
# but having two tasks reading/writing at the
# same time can't lead to anything good, better
# disallow it
self.current_task.throw(
ResourceBusy(
f"The resource is being read from/written by another task"
)
)
self.current_task.throw(ResourceBusy(f"The resource is being read from/written by another task"))

View File

@ -20,6 +20,7 @@ from enum import Enum, auto
from typing import Coroutine, Any
from dataclasses import dataclass, field
class TaskState(Enum):
"""
An enumeration of task states

View File

@ -2,6 +2,8 @@ import aiosched
import logging
import sys
from debugger import Debugger
# An asynchronous chatroom
clients: dict[aiosched.socket.AsyncSocket, list[str, str]] = {}
@ -86,7 +88,7 @@ if __name__ == "__main__":
datefmt="%d/%m/%Y %p",
)
try:
aiosched.run(serve, ("localhost", port))
aiosched.run(serve, ("0.0.0.0", port), debugger=())
except (Exception, KeyboardInterrupt) as error: # Exceptions propagate!
if isinstance(error, KeyboardInterrupt):
logging.info("Ctrl+C detected, exiting")