From 09a4e2f576705847032d54f2ca21019dca111b52 Mon Sep 17 00:00:00 2001 From: nocturn9x Date: Mon, 21 Aug 2023 09:22:16 +0200 Subject: [PATCH] Fixes to I/O waiting --- structio/abc.py | 2 +- structio/core/context.py | 13 ++++--- structio/core/kernels/fifo.py | 10 +++-- structio/core/managers/io/simple.py | 7 ++-- structio/core/time/queue.py | 57 +++++++++++------------------ structio/io/socket.py | 2 +- 6 files changed, 41 insertions(+), 50 deletions(-) diff --git a/structio/abc.py b/structio/abc.py index 34406b9..4a2be81 100644 --- a/structio/abc.py +++ b/structio/abc.py @@ -370,7 +370,7 @@ class BaseIOManager(ABC): """ @abstractmethod - def wait_io(self): + def wait_io(self, current_time): """ Waits for I/O and reschedules tasks when data is ready to be read/written diff --git a/structio/core/context.py b/structio/core/context.py index 5815683..a4fb833 100644 --- a/structio/core/context.py +++ b/structio/core/context.py @@ -1,8 +1,9 @@ import structio -from structio.core.run import current_loop from structio.core.task import Task -from structio.core.syscalls import suspend, checkpoint +from structio.core.run import current_loop from typing import Callable, Coroutine, Any +from structio.core.time.queue import TimeQueue +from structio.core.syscalls import suspend, checkpoint from structio.exceptions import Cancelled, StructIOException @@ -71,14 +72,16 @@ class TaskScope: cancel scope """ + queue = TimeQueue() if self.shielded: return float("inf"), self - times = [(self.deadline, self)] + times = queue.put(self, self.deadline) for child in self.children: if child.shielded: return float("inf"), self - times.append(child.get_effective_deadline()) - return min(times) + deadline, scope = child.get_effective_deadline() + queue.put(scope, deadline) + return queue.get_closest_deadline(), queue.get()[0] def __repr__(self): return f"{self.__class__.__name__}(owner={self.owner}, timeout={self.timeout})" diff --git a/structio/core/kernels/fifo.py b/structio/core/kernels/fifo.py index 9a00064..e42bf37 100644 --- a/structio/core/kernels/fifo.py +++ b/structio/core/kernels/fifo.py @@ -51,7 +51,7 @@ class FIFOKernel(BaseKernel): # Have we handled SIGINT? self._sigint_handled: bool = False # Paused tasks along with their deadlines - self.paused: TimeQueue = TimeQueue(self.clock) + self.paused: TimeQueue = TimeQueue() self.pool = TaskPool() self.pool.scope.shielded = True self.current_scope = self.pool.scope @@ -61,7 +61,7 @@ class FIFOKernel(BaseKernel): if self.run_queue: # We absolutely cannot block while other # tasks have things to do! - return 0 + return self.clock.current_time() deadlines = [] for scope in self.pool.scope.children: deadlines.append(scope.get_effective_deadline()[0]) @@ -74,6 +74,7 @@ class FIFOKernel(BaseKernel): ] ) + def wait_readable(self, resource: FdWrapper): self.current_task.state = TaskState.IO self.io_manager.request_read(resource, self.current_task) @@ -239,7 +240,7 @@ class FIFOKernel(BaseKernel): if amount > 0: self.event("before_sleep", self.current_task, amount) self.current_task.next_deadline = self.clock.deadline(amount) - self.paused.put(self.current_task, amount) + self.paused.put(self.current_task, self.clock.deadline(amount)) else: # If sleep is called with 0 as argument, # then it's just a checkpoint! @@ -285,7 +286,7 @@ class FIFOKernel(BaseKernel): self.wakeup() self.check_scopes() if self.io_manager.pending(): - self.io_manager.wait_io() + self.io_manager.wait_io(self.clock.current_time()) self.close() def reschedule_running(self): @@ -405,6 +406,7 @@ class FIFOKernel(BaseKernel): def close_scope(self, scope: TaskScope): self.current_scope = scope.outer + self.current_scope.inner = [] def cancel_task(self, task: Task): if task.done(): diff --git a/structio/core/managers/io/simple.py b/structio/core/managers/io/simple.py index ab0ea37..479312e 100644 --- a/structio/core/managers/io/simple.py +++ b/structio/core/managers/io/simple.py @@ -57,14 +57,13 @@ class SimpleIOManager(BaseIOManager): result.append(writer.fileno()) return result - def wait_io(self): + def wait_io(self, current_time): kernel: BaseKernel = current_loop() deadline = kernel.get_closest_deadline() if deadline == float("inf"): deadline = 0 - if deadline > 0: - deadline -= kernel.clock.current_time() - deadline = abs(deadline) + elif deadline > 0: + deadline -= current_time writers = self._collect_writers() readable, writable, exceptional = select.select( self._collect_readers(), diff --git a/structio/core/time/queue.py b/structio/core/time/queue.py index e35d890..cf7822e 100644 --- a/structio/core/time/queue.py +++ b/structio/core/time/queue.py @@ -1,27 +1,20 @@ from typing import Any from structio.core.task import Task, TaskState -from structio.abc import BaseClock from heapq import heappush, heappop, heapify class TimeQueue: """ - An abstraction layer over a heap queue based on time. This is where - paused tasks will be put when they are not running - - :param clock: The same clock that was passed to the thread-local event loop. - It is important for the queue to be synchronized with the loop as this allows - the sleeping mechanism to work reliably + An abstraction layer over a heap queue based on time """ - def __init__(self, clock: BaseClock): + def __init__(self): """ Object constructor """ - self.clock = clock # The sequence float handles the race condition - # of two tasks with identical deadlines, acting + # of two items with identical deadlines, acting # as a tiebreaker self.sequence = 0 self.container: list[tuple[float, int, Task, dict[str, Any]]] = [] @@ -40,11 +33,10 @@ class TimeQueue: return len(self.container) - def __contains__(self, item: Task): + def __contains__(self, item): """ - Implements item in self. This method behaves - as if the queue only contained tasks and ignores - their timeouts and tiebreakers + Implements item in self. This method ignores + timeouts and tiebreakers """ for i in self.container: @@ -52,7 +44,7 @@ class TimeQueue: return True return False - def index(self, item: Task): + def index(self, item): """ Returns the index of the given item in the list or -1 if it is not present @@ -63,15 +55,15 @@ class TimeQueue: return i return -1 - def discard(self, item: Task): + def discard(self, item): """ Discards an item from the queue and calls heapify(self.container) to keep the heap invariant if an element is removed. This method does nothing if the item is not in the queue, but note that in this case the - operation would still take O(n) iterations - to complete + operation would still take at least O(n) + iterations to complete :param item: The item to be discarded """ @@ -126,38 +118,33 @@ class TimeQueue: Implements repr(self) and str(self) """ - return f"TimeQueue({self.container}, clock={self.clock})" + return f"TimeQueue({self.container})" - def put(self, task: Task, delay: float, metadata: dict[str, Any] | None = None): + def put(self, item, delay: float, metadata: dict[str, Any] | None = None): """ - Pushes a task onto the queue together with its + Pushes an item onto the queue together with its delay and optional metadata - :param task: The task that is meant to sleep - :type task: :class: Task - :param delay: The delay associated with the task + :param item: The item to be pushed + :param delay: The delay associated with the item :type delay: float :param metadata: A dictionary representing additional - task metadata. Defaults to None + metadata. Defaults to None :type metadata: dict[str, Any], optional """ - time = self.clock.current_time() - task.paused_when = time - task.state = TaskState.PAUSED - task.next_deadline = task.paused_when + delay - heappush(self.container, (time + delay, self.sequence, task, metadata)) + heappush(self.container, (delay, self.sequence, item, metadata)) self.sequence += 1 - def get(self) -> tuple[Task, dict[str, Any] | None]: + def get(self) -> tuple[Any, dict[str, Any] | None]: """ - Gets the first task that is meant to run along - with its metadata + Gets the first item on the queue along with + its metadata :raises: IndexError if the queue is empty """ if not self.container: raise IndexError("get from empty TimeQueue") - _, __, task, meta = heappop(self.container) - return task, meta + _, __, item, meta = heappop(self.container) + return item, meta diff --git a/structio/io/socket.py b/structio/io/socket.py index 963def7..798e517 100644 --- a/structio/io/socket.py +++ b/structio/io/socket.py @@ -248,8 +248,8 @@ async def connect_tcp_socket( # This event notifies us if a connection attempt # fails, so we can start early event = structio.Event() - pool.spawn(attempt, sock_args, address, event, pool.scope) try: + pool.spawn(attempt, sock_args, address, event, pool.scope) with structio.with_timeout(happy_eyeballs_delay): # We'll wait for the event to be triggered or for at # most happy_eyeballs_delay seconds before moving on,