From ee7451014bc2fa1e9657836f01eea7efed5b5476 Mon Sep 17 00:00:00 2001 From: Nocturn9x Date: Sun, 18 Jun 2023 18:52:36 +0200 Subject: [PATCH] Fixed bug with resource de-registration --- structio/core/kernels/fifo.py | 3 +++ structio/core/managers/io/simple.py | 21 +++++++++++++++------ tests/https_test.py | 2 ++ 3 files changed, 20 insertions(+), 6 deletions(-) diff --git a/structio/core/kernels/fifo.py b/structio/core/kernels/fifo.py index 917dd7f..f326bc7 100644 --- a/structio/core/kernels/fifo.py +++ b/structio/core/kernels/fifo.py @@ -65,9 +65,11 @@ class FIFOKernel(BaseKernel): ) def wait_readable(self, resource: FdWrapper): + self.current_task.state = TaskState.IO self.io_manager.request_read(resource, self.current_task) def wait_writable(self, resource: FdWrapper): + self.current_task.state = TaskState.IO self.io_manager.request_write(resource, self.current_task) def notify_closing(self, resource: FdWrapper, broken: bool = False, owner: Task | None = None): @@ -173,6 +175,7 @@ class FIFOKernel(BaseKernel): self.event("before_task_step", self.current_task) self.current_task.state = TaskState.RUNNING method, args, kwargs = runner() + self.current_task.state = TaskState.PAUSED if not callable(getattr(self, method, None)): # This if block is meant to be triggered by other async # libraries, which most likely have different method names and behaviors diff --git a/structio/core/managers/io/simple.py b/structio/core/managers/io/simple.py index 7824de9..83da862 100644 --- a/structio/core/managers/io/simple.py +++ b/structio/core/managers/io/simple.py @@ -1,5 +1,6 @@ from structio.abc import BaseIOManager, BaseKernel from structio.core.task import Task, TaskState +from structio.util.ki import CTRLC_PROTECTION_ENABLED from structio.core.run import current_loop, current_task from structio.io import FdWrapper import select @@ -40,8 +41,8 @@ class SimpleIOManager(BaseIOManager): """ result = [] - for resource in self.readers: - result.append(resource.fileno()) + for reader in self.readers.copy(): + result.append(reader.fileno()) return result def _collect_writers(self) -> list[int]: @@ -51,8 +52,8 @@ class SimpleIOManager(BaseIOManager): """ result = [] - for resource in self.writers: - result.append(resource.fileno()) + for reader in self.writers.copy(): + result.append(reader.fileno()) return result def wait_io(self): @@ -62,20 +63,28 @@ class SimpleIOManager(BaseIOManager): deadline = 0 if deadline > 0: deadline -= kernel.clock.current_time() + deadline = abs(deadline) + # Ctrl+C protection must be disabled right before + # select() because it is a blocking call! + locals()[CTRLC_PROTECTION_ENABLED] = False readable, writable, _ = select.select( self._collect_readers(), self._collect_writers(), [], deadline, ) + # We're paranoid, so yeah. Protected we go + locals()[CTRLC_PROTECTION_ENABLED] = True for read_ready in readable: - for resource, task in self.readers.items(): + for resource, task in self.readers.copy().items(): if resource.fileno() == read_ready and task.state == TaskState.IO: kernel.reschedule(task) + self.readers.pop(resource) for write_ready in writable: - for resource, task in self.writers.items(): + for resource, task in self.writers.copy().items(): if resource.fileno() == write_ready and task.state == TaskState.IO: kernel.reschedule(task) + self.writers.pop(resource) def request_read(self, rsc: FdWrapper, task: Task): current_task().state = TaskState.IO diff --git a/tests/https_test.py b/tests/https_test.py index 298b7ca..6a804ea 100644 --- a/tests/https_test.py +++ b/tests/https_test.py @@ -20,6 +20,8 @@ async def test(host: str, port: int, bufsize: int = 4096): async with socket: print("Entered socket context manager, sending request data") await socket.send_all( + # Try changing the "Connection" header to keep-alive and + # watch the timeout expire! f"GET / HTTP/1.1\r\nUser-Agent: PostmanRuntime/7.32.2\r\nAccept: */*\r\nHost: {host}\r\nAccept-Encoding: gzip, deflate, br\r\nConnection: close\r\n\r\n".encode() ) print("Data sent")