Fixed bug with resource de-registration

This commit is contained in:
Mattia Giambirtone 2023-06-18 18:52:36 +02:00 committed by nocturn9x
parent a09babae53
commit ee7451014b
Signed by: nocturn9x
GPG Key ID: 8270F9F467971E59
3 changed files with 20 additions and 6 deletions

View File

@ -65,9 +65,11 @@ class FIFOKernel(BaseKernel):
) )
def wait_readable(self, resource: FdWrapper): def wait_readable(self, resource: FdWrapper):
self.current_task.state = TaskState.IO
self.io_manager.request_read(resource, self.current_task) self.io_manager.request_read(resource, self.current_task)
def wait_writable(self, resource: FdWrapper): def wait_writable(self, resource: FdWrapper):
self.current_task.state = TaskState.IO
self.io_manager.request_write(resource, self.current_task) self.io_manager.request_write(resource, self.current_task)
def notify_closing(self, resource: FdWrapper, broken: bool = False, owner: Task | None = None): def notify_closing(self, resource: FdWrapper, broken: bool = False, owner: Task | None = None):
@ -173,6 +175,7 @@ class FIFOKernel(BaseKernel):
self.event("before_task_step", self.current_task) self.event("before_task_step", self.current_task)
self.current_task.state = TaskState.RUNNING self.current_task.state = TaskState.RUNNING
method, args, kwargs = runner() method, args, kwargs = runner()
self.current_task.state = TaskState.PAUSED
if not callable(getattr(self, method, None)): if not callable(getattr(self, method, None)):
# This if block is meant to be triggered by other async # This if block is meant to be triggered by other async
# libraries, which most likely have different method names and behaviors # libraries, which most likely have different method names and behaviors

View File

@ -1,5 +1,6 @@
from structio.abc import BaseIOManager, BaseKernel from structio.abc import BaseIOManager, BaseKernel
from structio.core.task import Task, TaskState from structio.core.task import Task, TaskState
from structio.util.ki import CTRLC_PROTECTION_ENABLED
from structio.core.run import current_loop, current_task from structio.core.run import current_loop, current_task
from structio.io import FdWrapper from structio.io import FdWrapper
import select import select
@ -40,8 +41,8 @@ class SimpleIOManager(BaseIOManager):
""" """
result = [] result = []
for resource in self.readers: for reader in self.readers.copy():
result.append(resource.fileno()) result.append(reader.fileno())
return result return result
def _collect_writers(self) -> list[int]: def _collect_writers(self) -> list[int]:
@ -51,8 +52,8 @@ class SimpleIOManager(BaseIOManager):
""" """
result = [] result = []
for resource in self.writers: for reader in self.writers.copy():
result.append(resource.fileno()) result.append(reader.fileno())
return result return result
def wait_io(self): def wait_io(self):
@ -62,20 +63,28 @@ class SimpleIOManager(BaseIOManager):
deadline = 0 deadline = 0
if deadline > 0: if deadline > 0:
deadline -= kernel.clock.current_time() deadline -= kernel.clock.current_time()
deadline = abs(deadline)
# Ctrl+C protection must be disabled right before
# select() because it is a blocking call!
locals()[CTRLC_PROTECTION_ENABLED] = False
readable, writable, _ = select.select( readable, writable, _ = select.select(
self._collect_readers(), self._collect_readers(),
self._collect_writers(), self._collect_writers(),
[], [],
deadline, deadline,
) )
# We're paranoid, so yeah. Protected we go
locals()[CTRLC_PROTECTION_ENABLED] = True
for read_ready in readable: for read_ready in readable:
for resource, task in self.readers.items(): for resource, task in self.readers.copy().items():
if resource.fileno() == read_ready and task.state == TaskState.IO: if resource.fileno() == read_ready and task.state == TaskState.IO:
kernel.reschedule(task) kernel.reschedule(task)
self.readers.pop(resource)
for write_ready in writable: for write_ready in writable:
for resource, task in self.writers.items(): for resource, task in self.writers.copy().items():
if resource.fileno() == write_ready and task.state == TaskState.IO: if resource.fileno() == write_ready and task.state == TaskState.IO:
kernel.reschedule(task) kernel.reschedule(task)
self.writers.pop(resource)
def request_read(self, rsc: FdWrapper, task: Task): def request_read(self, rsc: FdWrapper, task: Task):
current_task().state = TaskState.IO current_task().state = TaskState.IO

View File

@ -20,6 +20,8 @@ async def test(host: str, port: int, bufsize: int = 4096):
async with socket: async with socket:
print("Entered socket context manager, sending request data") print("Entered socket context manager, sending request data")
await socket.send_all( await socket.send_all(
# Try changing the "Connection" header to keep-alive and
# watch the timeout expire!
f"GET / HTTP/1.1\r\nUser-Agent: PostmanRuntime/7.32.2\r\nAccept: */*\r\nHost: {host}\r\nAccept-Encoding: gzip, deflate, br\r\nConnection: close\r\n\r\n".encode() f"GET / HTTP/1.1\r\nUser-Agent: PostmanRuntime/7.32.2\r\nAccept: */*\r\nHost: {host}\r\nAccept-Encoding: gzip, deflate, br\r\nConnection: close\r\n\r\n".encode()
) )
print("Data sent") print("Data sent")