Fix some issues with the throw syscall, two-way proxy now working

This commit is contained in:
Mattia Giambirtone 2023-05-10 22:31:33 +02:00
parent 07dd63dd49
commit e423716442
Signed by: nocturn9x
GPG Key ID: 8270F9F467971E59
5 changed files with 21 additions and 14 deletions

View File

@ -84,7 +84,7 @@ class TaskPool:
""" """
An asynchronous context manager that automatically waits An asynchronous context manager that automatically waits
for all tasks spawned within it and cancels itself when for all tasks spawned within it and cancels itself when
an exception occurs. Contexts can be nested and will an exception occurs. Pools can be nested and will
cancel inner ones if an exception is raised inside them cancel inner ones if an exception is raised inside them
""" """

View File

@ -126,10 +126,10 @@ class FIFOKernel:
:return: :return:
""" """
if currently_protected(): self._sigint_handled = True
self._sigint_handled = True if not currently_protected():
else: self.run_ready.appendleft(self.entry_point)
raise KeyboardInterrupt self.handle_errors(self.run_task_step)
def done(self) -> bool: def done(self) -> bool:
""" """
@ -352,13 +352,15 @@ class FIFOKernel:
# there are no more runnable tasks # there are no more runnable tasks
return return
self.current_task = self.run_ready.popleft() self.current_task = self.run_ready.popleft()
self._running = True
_runner = self.current_task.run
_data = [self.data.pop(self.current_task, None)]
if self.current_task.pending_cancellation: if self.current_task.pending_cancellation:
# We perform the deferred cancellation # We perform the deferred cancellation
# if it was previously scheduled # if it was previously scheduled
self.cancel(self.current_task) self.cancel(self.current_task)
if self.current_task.done():
return
self._running = True
_runner = self.current_task.run
_data = [self.data.pop(self.current_task, None)]
# Some debugging and internal chatter here # Some debugging and internal chatter here
self.current_task.steps += 1 self.current_task.steps += 1
self.current_task.state = TaskState.RUN self.current_task.state = TaskState.RUN
@ -368,6 +370,10 @@ class FIFOKernel:
self.current_task.throw(KeyboardInterrupt()) self.current_task.throw(KeyboardInterrupt())
_runner = partial(self.current_task.throw, KeyboardInterrupt) _runner = partial(self.current_task.throw, KeyboardInterrupt)
_data = [] _data = []
elif exc := self.current_task.pending_exception:
self.current_task.pending_exception = None
_runner = partial(self.current_task.throw, exc)
_data = []
# Run a single step with the calculation (i.e. until a yield # Run a single step with the calculation (i.e. until a yield
# somewhere) # somewhere)
method, args, kwargs = _runner(*_data) method, args, kwargs = _runner(*_data)
@ -587,10 +593,9 @@ class FIFOKernel:
Throws the given exception into the given task Throws the given exception into the given task
""" """
self.paused.discard(task) task.pending_exception = error
self.io_release_task(task)
self.handle_errors(partial(task.throw, error), task)
self.run_ready.appendleft(task) self.run_ready.appendleft(task)
self.handle_errors(partial(task.throw, error), task)
self.reschedule_running() self.reschedule_running()
def handle_errors(self, func: Callable, task: Task | None = None): def handle_errors(self, func: Callable, task: Task | None = None):
@ -615,6 +620,7 @@ class FIFOKernel:
# most of this code below is just useful for internal/debugging purposes # most of this code below is just useful for internal/debugging purposes
task.state = TaskState.FINISHED task.state = TaskState.FINISHED
task.result = ret.value task.result = ret.value
self.io_release_task(self.current_task)
self.wait(task) self.wait(task)
except Cancelled: except Cancelled:
# When a task needs to be cancelled, aiosched tries to do it gracefully # When a task needs to be cancelled, aiosched tries to do it gracefully
@ -667,8 +673,6 @@ class FIFOKernel:
if task is not self.current_task: if task is not self.current_task:
task.joiners.add(self.current_task) task.joiners.add(self.current_task)
if task.done(): if task.done():
self.paused.discard(task)
self.io_release_task(task)
self.run_ready.extend(task.joiners) self.run_ready.extend(task.joiners)
def spawn(self, func: Callable[..., Coroutine[Any, Any, Any]], *args, **kwargs): def spawn(self, func: Callable[..., Coroutine[Any, Any, Any]], *args, **kwargs):

View File

@ -85,6 +85,8 @@ class Task:
propagate: bool = True propagate: bool = True
# The task's scope # The task's scope
scope: "TaskScope" = field(default=None, repr=False) scope: "TaskScope" = field(default=None, repr=False)
# Is there any pending exception?
pending_exception: BaseException | None = None
def run(self, what: Any | None = None): def run(self, what: Any | None = None):
""" """

View File

@ -1,7 +1,6 @@
import aiosched import aiosched
import socket import socket
# TODO: This is borked
# Pls notice me njsmith senpai :> # Pls notice me njsmith senpai :>
@ -22,6 +21,7 @@ async def proxy_two_way(a: aiosched.socket.AsyncSocket, b: aiosched.socket.Async
async def main(): async def main():
print("Starting two-way proxy server")
async with aiosched.skip_after(10): async with aiosched.skip_after(10):
a = aiosched.socket.socket(socket.AF_INET, socket.SOCK_STREAM) a = aiosched.socket.socket(socket.AF_INET, socket.SOCK_STREAM)
b = aiosched.socket.socket(socket.AF_INET, socket.SOCK_STREAM) b = aiosched.socket.socket(socket.AF_INET, socket.SOCK_STREAM)
@ -29,6 +29,7 @@ async def main():
await b.connect(("localhost", 54321)) await b.connect(("localhost", 54321))
async with a, b: async with a, b:
await proxy_two_way(a, b) await proxy_two_way(a, b)
print("Two-way proxy shutting down")
aiosched.run(main) aiosched.run(main)

View File