I/O and Ctrl+C fixes
This commit is contained in:
parent
d80fe45959
commit
e730f7f27a
|
@ -15,7 +15,7 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
See the License for the specific language governing permissions and
|
See the License for the specific language governing permissions and
|
||||||
limitations under the License.
|
limitations under the License.
|
||||||
"""
|
"""
|
||||||
from aiosched.task import Task
|
from aiosched.task import Task, TaskState
|
||||||
from aiosched.internals.syscalls import (
|
from aiosched.internals.syscalls import (
|
||||||
spawn,
|
spawn,
|
||||||
wait,
|
wait,
|
||||||
|
@ -196,9 +196,13 @@ class TaskContext(Task):
|
||||||
|
|
||||||
def throw(self, err: BaseException):
|
def throw(self, err: BaseException):
|
||||||
for task in self.tasks:
|
for task in self.tasks:
|
||||||
|
if task is self.entry_point:
|
||||||
|
continue
|
||||||
try:
|
try:
|
||||||
|
task.exc = err
|
||||||
|
task.state = TaskState.CRASHED
|
||||||
task.throw(err)
|
task.throw(err)
|
||||||
except err:
|
finally:
|
||||||
continue
|
continue
|
||||||
self.entry_point.throw(err)
|
self.entry_point.throw(err)
|
||||||
|
|
||||||
|
|
|
@ -15,6 +15,7 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
See the License for the specific language governing permissions and
|
See the License for the specific language governing permissions and
|
||||||
limitations under the License.
|
limitations under the License.
|
||||||
"""
|
"""
|
||||||
|
import random
|
||||||
import signal
|
import signal
|
||||||
import itertools
|
import itertools
|
||||||
from collections import deque
|
from collections import deque
|
||||||
|
@ -127,9 +128,6 @@ class FIFOKernel:
|
||||||
to do
|
to do
|
||||||
"""
|
"""
|
||||||
|
|
||||||
if self.current_task and not self.current_task.done():
|
|
||||||
# Current task isn't done yet!
|
|
||||||
return False
|
|
||||||
if any([self.paused, self.run_ready]):
|
if any([self.paused, self.run_ready]):
|
||||||
# There's tasks sleeping and/or on the
|
# There's tasks sleeping and/or on the
|
||||||
# ready queue!
|
# ready queue!
|
||||||
|
@ -286,7 +284,6 @@ class FIFOKernel:
|
||||||
# entry point raised and caught an error so that
|
# entry point raised and caught an error so that
|
||||||
# self.start() doesn't raise it again at the end
|
# self.start() doesn't raise it again at the end
|
||||||
self.current_task.exc = None
|
self.current_task.exc = None
|
||||||
self.debugger.before_task_step(self.current_task)
|
|
||||||
self._running = True
|
self._running = True
|
||||||
# Some debugging and internal chatter here
|
# Some debugging and internal chatter here
|
||||||
self.current_task.state = TaskState.RUN
|
self.current_task.state = TaskState.RUN
|
||||||
|
@ -294,11 +291,13 @@ class FIFOKernel:
|
||||||
if self._sigint_handled:
|
if self._sigint_handled:
|
||||||
self._sigint_handled = False
|
self._sigint_handled = False
|
||||||
self.current_task.throw(KeyboardInterrupt())
|
self.current_task.throw(KeyboardInterrupt())
|
||||||
|
self.join(self.current_task)
|
||||||
elif self.current_task.pending_cancellation:
|
elif self.current_task.pending_cancellation:
|
||||||
# We perform the deferred cancellation
|
# We perform the deferred cancellation
|
||||||
# if it was previously scheduled
|
# if it was previously scheduled
|
||||||
self.cancel(self.current_task)
|
self.cancel(self.current_task)
|
||||||
else:
|
else:
|
||||||
|
self.debugger.before_task_step(self.current_task)
|
||||||
# Run a single step with the calculation (i.e. until a yield
|
# Run a single step with the calculation (i.e. until a yield
|
||||||
# somewhere)
|
# somewhere)
|
||||||
method, args, kwargs = self.current_task.run(
|
method, args, kwargs = self.current_task.run(
|
||||||
|
@ -341,17 +340,17 @@ class FIFOKernel:
|
||||||
self.shutdown()
|
self.shutdown()
|
||||||
break
|
break
|
||||||
elif self._sigint_handled:
|
elif self._sigint_handled:
|
||||||
# We got Ctrl+C-ed while not running a task! Pick a
|
# We got Ctrl+C-ed while not running a task! We pick
|
||||||
# random task and blow it up with a KeyboardInterrupt
|
# any of the tasks we have, schedule it for execution
|
||||||
# exception: our existing error handling machinery will
|
# (no matter what it's doing, because it doesn't really
|
||||||
# deal with it accordingly
|
# matter) and let run_task_step raise an exception inside
|
||||||
|
# it
|
||||||
task: Task | None = None
|
task: Task | None = None
|
||||||
if self.selector.get_map():
|
if self.selector.get_map():
|
||||||
# Pretty convoluted, huh? Sorry, but I wanted this on one line ;)
|
# Pretty convoluted, huh? Sorry, but I wanted this on one line ;)
|
||||||
task = next(iter(next(iter(self.selector.get_map().values())).data.values()))
|
task = next(iter(next(iter(self.selector.get_map().values())).data.values()))
|
||||||
elif self.paused:
|
elif self.paused:
|
||||||
# TODO
|
task, *_ = self.paused.get()
|
||||||
task = None
|
|
||||||
self.run_ready.append(task)
|
self.run_ready.append(task)
|
||||||
self.handle_errors(self.run_task_step)
|
self.handle_errors(self.run_task_step)
|
||||||
elif not self.run_ready:
|
elif not self.run_ready:
|
||||||
|
@ -374,7 +373,7 @@ class FIFOKernel:
|
||||||
"""
|
"""
|
||||||
Starts the event loop from a synchronous context
|
Starts the event loop from a synchronous context
|
||||||
"""
|
"""
|
||||||
|
|
||||||
old = signal.getsignal(signal.SIGINT)
|
old = signal.getsignal(signal.SIGINT)
|
||||||
signal.signal(signal.SIGINT, self._sigint_handler)
|
signal.signal(signal.SIGINT, self._sigint_handler)
|
||||||
self.entry_point = Task(func.__name__ or str(func), func(*args, **kwargs))
|
self.entry_point = Task(func.__name__ or str(func), func(*args, **kwargs))
|
||||||
|
@ -458,7 +457,8 @@ class FIFOKernel:
|
||||||
if task is not self.current_task:
|
if task is not self.current_task:
|
||||||
# We don't want to raise an error inside
|
# We don't want to raise an error inside
|
||||||
# the task that's trying to close the stream!
|
# the task that's trying to close the stream!
|
||||||
self.handle_errors(partial(k.data.throw, exc), k.data)
|
for t in k.data:
|
||||||
|
self.handle_errors(partial(t.throw, exc), k.data)
|
||||||
self.reschedule_running()
|
self.reschedule_running()
|
||||||
|
|
||||||
def cancel(self, task: Task):
|
def cancel(self, task: Task):
|
||||||
|
@ -513,13 +513,15 @@ class FIFOKernel:
|
||||||
task.pending_cancellation = False
|
task.pending_cancellation = False
|
||||||
self.debugger.after_cancel(task)
|
self.debugger.after_cancel(task)
|
||||||
self.wait(task)
|
self.wait(task)
|
||||||
except Exception as err:
|
except (Exception, KeyboardInterrupt) as err:
|
||||||
# Any other exception is caught here
|
# Any other exception is caught here
|
||||||
task = task or self.current_task
|
task = task or self.current_task
|
||||||
task.exc = err
|
task.exc = err
|
||||||
task.state = TaskState.CRASHED
|
task.state = TaskState.CRASHED
|
||||||
self.debugger.on_exception_raised(task, err)
|
self.debugger.on_exception_raised(task, err)
|
||||||
self.wait(task)
|
self.wait(task)
|
||||||
|
if isinstance(err, KeyboardInterrupt):
|
||||||
|
raise
|
||||||
|
|
||||||
def sleep(self, seconds: int | float):
|
def sleep(self, seconds: int | float):
|
||||||
"""
|
"""
|
||||||
|
|
|
@ -88,7 +88,7 @@ if __name__ == "__main__":
|
||||||
datefmt="%d/%m/%Y %p",
|
datefmt="%d/%m/%Y %p",
|
||||||
)
|
)
|
||||||
try:
|
try:
|
||||||
aiosched.run(serve, ("localhost", port), debugger=())
|
aiosched.run(serve, ("0.0.0.0", port), debugger=())
|
||||||
except (Exception, KeyboardInterrupt) as error: # Exceptions propagate!
|
except (Exception, KeyboardInterrupt) as error: # Exceptions propagate!
|
||||||
if isinstance(error, KeyboardInterrupt):
|
if isinstance(error, KeyboardInterrupt):
|
||||||
logging.info("Ctrl+C detected, exiting")
|
logging.info("Ctrl+C detected, exiting")
|
||||||
|
|
Reference in New Issue