from collections import defaultdict from structio.abc import BaseIOManager, BaseKernel from structio.core.context import Task from structio.core.run import current_loop, current_task import select class SimpleIOManager(BaseIOManager): """ A simple, cross-platform, select()-based I/O manager """ def __init__(self): """ Public object constructor """ # Maps resources to tasks self.readers = {} self.writers = {} # This allows us to have a bidirectional mapping: # we know both which tasks are using which resources # and which resources are used by which tasks, # without having to go through too many hoops and jumps. self.tasks: dict[Task, list] = defaultdict(list) def pending(self): # We don't return bool(self.resources) because there is # no pending I/O to do if no tasks are waiting to read or # write, even if there's dangling resources around! return bool(self.readers or self.writers) def _collect_readers(self) -> list: """ Collects all resources that need to be read from, so we can select() on them later """ result = [] for resource in self.readers: result.append(resource) return result def _collect_writers(self) -> list: """ Collects all resources that need to be written to, so we can select() on them later """ result = [] for resource in self.writers: result.append(resource) return result def wait_io(self): kernel: BaseKernel = current_loop() readable, writable, _ = select.select( self._collect_readers(), self._collect_writers(), [], kernel.get_closest_deadline(), ) for read_ready in readable: kernel.reschedule(self.readers[read_ready]) for write_ready in writable: kernel.reschedule(self.writers[write_ready]) def request_read(self, rsc): task = current_task() self.readers[rsc] = task def request_write(self, rsc): task = current_task() self.writers[rsc] = task def release(self, resource): self.readers.pop(resource, None) self.writers.pop(resource, None) def release_task(self, task: Task): for resource in self.tasks[task]: self.release(resource)