From c85e3037e4566147f6d7b333c52e6a2314cc67c2 Mon Sep 17 00:00:00 2001 From: Mattia Giambirtone Date: Tue, 19 Mar 2024 21:44:50 +0100 Subject: [PATCH] Fixed a bug with nested task scopes and deadlines. Add correctness checks to TaskPool and other minor changes --- structio/abc.py | 2 +- structio/core/context.py | 1 + structio/core/kernel.py | 3 +-- structio/core/managers/io/simple.py | 6 +++--- structio/io/socket.py | 5 ++--- 5 files changed, 8 insertions(+), 9 deletions(-) diff --git a/structio/abc.py b/structio/abc.py index e03c254..9555b8f 100644 --- a/structio/abc.py +++ b/structio/abc.py @@ -457,7 +457,7 @@ class BaseIOManager(ABC): """ @abstractmethod - def wait_io(self, current_time): + def wait_io(self): """ Waits for I/O and reschedules tasks when data is ready to be read/written diff --git a/structio/core/context.py b/structio/core/context.py index 49f07e5..ff0c409 100644 --- a/structio/core/context.py +++ b/structio/core/context.py @@ -151,6 +151,7 @@ class TaskPool: raise exc_val.with_traceback(exc_tb) elif not self.done(): await suspend() + assert self.done() else: await checkpoint() except Cancelled as e: diff --git a/structio/core/kernel.py b/structio/core/kernel.py index 459e1d2..2dd367d 100644 --- a/structio/core/kernel.py +++ b/structio/core/kernel.py @@ -271,7 +271,6 @@ class DefaultKernel(BaseKernel): error = TimedOut("timed out") error.scope = scope self.throw(scope.owner, error) - self.reschedule(scope.owner) def wakeup(self): while ( @@ -293,7 +292,7 @@ class DefaultKernel(BaseKernel): self.wakeup() self.check_scopes() if self.io_manager.pending(): - self.io_manager.wait_io(self.clock.current_time()) + self.io_manager.wait_io() def run(self): """ diff --git a/structio/core/managers/io/simple.py b/structio/core/managers/io/simple.py index 7c6f73a..97ed71d 100644 --- a/structio/core/managers/io/simple.py +++ b/structio/core/managers/io/simple.py @@ -67,16 +67,16 @@ class SimpleIOManager(BaseIOManager): if self._closed: raise structio.exceptions.ResourceClosed("the I/O manager is closed") - def wait_io(self, current_time): + def wait_io(self): self._check_closed() kernel: BaseKernel = current_loop() + current_time = kernel.clock.current_time() deadline = kernel.get_closest_deadline() if deadline == float("inf"): deadline = 0 elif deadline > 0: deadline -= current_time deadline = max(0, deadline) - current = kernel.clock.current_time() readers = self._collect_readers() writers = self._collect_writers() kernel.event("before_io", deadline) @@ -86,7 +86,7 @@ class SimpleIOManager(BaseIOManager): writers + readers, deadline, ) - kernel.event("after_io", kernel.clock.current_time() - current) + kernel.event("after_io", kernel.clock.current_time() - current_time) # On Windows, a successful connection is marked # as an exceptional event rather than a write # one diff --git a/structio/io/socket.py b/structio/io/socket.py index 8af2194..3e26747 100644 --- a/structio/io/socket.py +++ b/structio/io/socket.py @@ -386,14 +386,13 @@ class AsyncSocket(AsyncResource): if self._fd == -1: raise ResourceClosed("I/O operation on closed socket") with self.write_lock, self.read_lock: - connected = False - while not connected: + while True: try: self.socket.connect(address) if self.do_handshake_on_connect: await self.do_handshake() - connected = True await checkpoint() + break except WantRead: await wait_readable(self._fd) except WantWrite: