Fixes to I/O waiting

This commit is contained in:
Mattia Giambirtone 2023-08-21 09:22:16 +02:00
parent c5d55e6ea6
commit 09a4e2f576
Signed by: nocturn9x
GPG Key ID: 8270F9F467971E59
6 changed files with 41 additions and 50 deletions

View File

@ -370,7 +370,7 @@ class BaseIOManager(ABC):
""" """
@abstractmethod @abstractmethod
def wait_io(self): def wait_io(self, current_time):
""" """
Waits for I/O and reschedules tasks Waits for I/O and reschedules tasks
when data is ready to be read/written when data is ready to be read/written

View File

@ -1,8 +1,9 @@
import structio import structio
from structio.core.run import current_loop
from structio.core.task import Task from structio.core.task import Task
from structio.core.syscalls import suspend, checkpoint from structio.core.run import current_loop
from typing import Callable, Coroutine, Any from typing import Callable, Coroutine, Any
from structio.core.time.queue import TimeQueue
from structio.core.syscalls import suspend, checkpoint
from structio.exceptions import Cancelled, StructIOException from structio.exceptions import Cancelled, StructIOException
@ -71,14 +72,16 @@ class TaskScope:
cancel scope cancel scope
""" """
queue = TimeQueue()
if self.shielded: if self.shielded:
return float("inf"), self return float("inf"), self
times = [(self.deadline, self)] times = queue.put(self, self.deadline)
for child in self.children: for child in self.children:
if child.shielded: if child.shielded:
return float("inf"), self return float("inf"), self
times.append(child.get_effective_deadline()) deadline, scope = child.get_effective_deadline()
return min(times) queue.put(scope, deadline)
return queue.get_closest_deadline(), queue.get()[0]
def __repr__(self): def __repr__(self):
return f"{self.__class__.__name__}(owner={self.owner}, timeout={self.timeout})" return f"{self.__class__.__name__}(owner={self.owner}, timeout={self.timeout})"

View File

@ -51,7 +51,7 @@ class FIFOKernel(BaseKernel):
# Have we handled SIGINT? # Have we handled SIGINT?
self._sigint_handled: bool = False self._sigint_handled: bool = False
# Paused tasks along with their deadlines # Paused tasks along with their deadlines
self.paused: TimeQueue = TimeQueue(self.clock) self.paused: TimeQueue = TimeQueue()
self.pool = TaskPool() self.pool = TaskPool()
self.pool.scope.shielded = True self.pool.scope.shielded = True
self.current_scope = self.pool.scope self.current_scope = self.pool.scope
@ -61,7 +61,7 @@ class FIFOKernel(BaseKernel):
if self.run_queue: if self.run_queue:
# We absolutely cannot block while other # We absolutely cannot block while other
# tasks have things to do! # tasks have things to do!
return 0 return self.clock.current_time()
deadlines = [] deadlines = []
for scope in self.pool.scope.children: for scope in self.pool.scope.children:
deadlines.append(scope.get_effective_deadline()[0]) deadlines.append(scope.get_effective_deadline()[0])
@ -74,6 +74,7 @@ class FIFOKernel(BaseKernel):
] ]
) )
def wait_readable(self, resource: FdWrapper): def wait_readable(self, resource: FdWrapper):
self.current_task.state = TaskState.IO self.current_task.state = TaskState.IO
self.io_manager.request_read(resource, self.current_task) self.io_manager.request_read(resource, self.current_task)
@ -239,7 +240,7 @@ class FIFOKernel(BaseKernel):
if amount > 0: if amount > 0:
self.event("before_sleep", self.current_task, amount) self.event("before_sleep", self.current_task, amount)
self.current_task.next_deadline = self.clock.deadline(amount) self.current_task.next_deadline = self.clock.deadline(amount)
self.paused.put(self.current_task, amount) self.paused.put(self.current_task, self.clock.deadline(amount))
else: else:
# If sleep is called with 0 as argument, # If sleep is called with 0 as argument,
# then it's just a checkpoint! # then it's just a checkpoint!
@ -285,7 +286,7 @@ class FIFOKernel(BaseKernel):
self.wakeup() self.wakeup()
self.check_scopes() self.check_scopes()
if self.io_manager.pending(): if self.io_manager.pending():
self.io_manager.wait_io() self.io_manager.wait_io(self.clock.current_time())
self.close() self.close()
def reschedule_running(self): def reschedule_running(self):
@ -405,6 +406,7 @@ class FIFOKernel(BaseKernel):
def close_scope(self, scope: TaskScope): def close_scope(self, scope: TaskScope):
self.current_scope = scope.outer self.current_scope = scope.outer
self.current_scope.inner = []
def cancel_task(self, task: Task): def cancel_task(self, task: Task):
if task.done(): if task.done():

View File

@ -57,14 +57,13 @@ class SimpleIOManager(BaseIOManager):
result.append(writer.fileno()) result.append(writer.fileno())
return result return result
def wait_io(self): def wait_io(self, current_time):
kernel: BaseKernel = current_loop() kernel: BaseKernel = current_loop()
deadline = kernel.get_closest_deadline() deadline = kernel.get_closest_deadline()
if deadline == float("inf"): if deadline == float("inf"):
deadline = 0 deadline = 0
if deadline > 0: elif deadline > 0:
deadline -= kernel.clock.current_time() deadline -= current_time
deadline = abs(deadline)
writers = self._collect_writers() writers = self._collect_writers()
readable, writable, exceptional = select.select( readable, writable, exceptional = select.select(
self._collect_readers(), self._collect_readers(),

View File

@ -1,27 +1,20 @@
from typing import Any from typing import Any
from structio.core.task import Task, TaskState from structio.core.task import Task, TaskState
from structio.abc import BaseClock
from heapq import heappush, heappop, heapify from heapq import heappush, heappop, heapify
class TimeQueue: class TimeQueue:
""" """
An abstraction layer over a heap queue based on time. This is where An abstraction layer over a heap queue based on time
paused tasks will be put when they are not running
:param clock: The same clock that was passed to the thread-local event loop.
It is important for the queue to be synchronized with the loop as this allows
the sleeping mechanism to work reliably
""" """
def __init__(self, clock: BaseClock): def __init__(self):
""" """
Object constructor Object constructor
""" """
self.clock = clock
# The sequence float handles the race condition # The sequence float handles the race condition
# of two tasks with identical deadlines, acting # of two items with identical deadlines, acting
# as a tiebreaker # as a tiebreaker
self.sequence = 0 self.sequence = 0
self.container: list[tuple[float, int, Task, dict[str, Any]]] = [] self.container: list[tuple[float, int, Task, dict[str, Any]]] = []
@ -40,11 +33,10 @@ class TimeQueue:
return len(self.container) return len(self.container)
def __contains__(self, item: Task): def __contains__(self, item):
""" """
Implements item in self. This method behaves Implements item in self. This method ignores
as if the queue only contained tasks and ignores timeouts and tiebreakers
their timeouts and tiebreakers
""" """
for i in self.container: for i in self.container:
@ -52,7 +44,7 @@ class TimeQueue:
return True return True
return False return False
def index(self, item: Task): def index(self, item):
""" """
Returns the index of the given item in the list Returns the index of the given item in the list
or -1 if it is not present or -1 if it is not present
@ -63,15 +55,15 @@ class TimeQueue:
return i return i
return -1 return -1
def discard(self, item: Task): def discard(self, item):
""" """
Discards an item from the queue and Discards an item from the queue and
calls heapify(self.container) to keep calls heapify(self.container) to keep
the heap invariant if an element is removed. the heap invariant if an element is removed.
This method does nothing if the item is not This method does nothing if the item is not
in the queue, but note that in this case the in the queue, but note that in this case the
operation would still take O(n) iterations operation would still take at least O(n)
to complete iterations to complete
:param item: The item to be discarded :param item: The item to be discarded
""" """
@ -126,38 +118,33 @@ class TimeQueue:
Implements repr(self) and str(self) Implements repr(self) and str(self)
""" """
return f"TimeQueue({self.container}, clock={self.clock})" return f"TimeQueue({self.container})"
def put(self, task: Task, delay: float, metadata: dict[str, Any] | None = None): def put(self, item, delay: float, metadata: dict[str, Any] | None = None):
""" """
Pushes a task onto the queue together with its Pushes an item onto the queue together with its
delay and optional metadata delay and optional metadata
:param task: The task that is meant to sleep :param item: The item to be pushed
:type task: :class: Task :param delay: The delay associated with the item
:param delay: The delay associated with the task
:type delay: float :type delay: float
:param metadata: A dictionary representing additional :param metadata: A dictionary representing additional
task metadata. Defaults to None metadata. Defaults to None
:type metadata: dict[str, Any], optional :type metadata: dict[str, Any], optional
""" """
time = self.clock.current_time() heappush(self.container, (delay, self.sequence, item, metadata))
task.paused_when = time
task.state = TaskState.PAUSED
task.next_deadline = task.paused_when + delay
heappush(self.container, (time + delay, self.sequence, task, metadata))
self.sequence += 1 self.sequence += 1
def get(self) -> tuple[Task, dict[str, Any] | None]: def get(self) -> tuple[Any, dict[str, Any] | None]:
""" """
Gets the first task that is meant to run along Gets the first item on the queue along with
with its metadata its metadata
:raises: IndexError if the queue is empty :raises: IndexError if the queue is empty
""" """
if not self.container: if not self.container:
raise IndexError("get from empty TimeQueue") raise IndexError("get from empty TimeQueue")
_, __, task, meta = heappop(self.container) _, __, item, meta = heappop(self.container)
return task, meta return item, meta

View File

@ -248,8 +248,8 @@ async def connect_tcp_socket(
# This event notifies us if a connection attempt # This event notifies us if a connection attempt
# fails, so we can start early # fails, so we can start early
event = structio.Event() event = structio.Event()
pool.spawn(attempt, sock_args, address, event, pool.scope)
try: try:
pool.spawn(attempt, sock_args, address, event, pool.scope)
with structio.with_timeout(happy_eyeballs_delay): with structio.with_timeout(happy_eyeballs_delay):
# We'll wait for the event to be triggered or for at # We'll wait for the event to be triggered or for at
# most happy_eyeballs_delay seconds before moving on, # most happy_eyeballs_delay seconds before moving on,