diff --git a/aiosched/kernel.py b/aiosched/kernel.py index d174bb9..0a09155 100644 --- a/aiosched/kernel.py +++ b/aiosched/kernel.py @@ -151,8 +151,8 @@ class FIFOKernel: def wait_io(self): """ - Waits for I/O and implements part of the sleeping mechanism - for the event loop + Waits for I/O and schedules tasks when their + associated resource is ready to be used """ before_time = self.clock() # Used for the debugger @@ -168,7 +168,7 @@ class FIFOKernel: io_ready = self.selector.select(timeout) # Get sockets that are ready and schedule their tasks for key, _ in io_ready: - self.run_ready.extend(key.data) # Resource ready? Schedule its tasks + self.run_ready.append(key.data) # Resource ready? Schedule its task self.debugger.after_io(self.clock() - before_time) def awake_tasks(self): @@ -207,7 +207,8 @@ class FIFOKernel: Suspends execution of the current task. This is basically a do-nothing method, since it will not reschedule the task before returning. The task will stay suspended as long as - something else outside the loop reschedules it + something else outside the loop reschedules it (possibly + forever) """ self.current_task.state = TaskState.PAUSED @@ -336,12 +337,9 @@ class FIFOKernel: for each I/O resource the given task owns """ - for key in dict(self.selector.get_map()).values(): - if task in key.data: - key.data.remove(task) - if not key.data: - self.notify_closing(key.fileobj, broken=True) - self.selector.unregister(key.fileobj) + for key in filter(lambda k: k.data == task, dict(self.selector.get_map()).values()): + self.notify_closing(key.fileobj, broken=True) + self.selector.unregister(key.fileobj) task.last_io = () def notify_closing(self, stream, broken: bool = False): @@ -551,14 +549,21 @@ class FIFOKernel: try: self.selector.register(resource, evt_type, [self.current_task]) except KeyError: - # The resource is already registered doing something else: we try - # to see if we can modify the event + # The stream is already being used key = self.selector.get_key(resource) - if evt_type != key.events: - self.selector.modify( - resource, evt_type | key.events, key.data + [self.current_task] - ) - # If we get here, two tasks are trying to read or write on the same resource at the same time - raise ResourceBusy( - "The given resource is being read from/written to from another task" - ) + if key.data == self.current_task or evt_type != key.events: + # If the task that registered the stream + # changed their mind on what they want + # to do with it, who are we to deny their + # request? We also modify the the event in + # our selector so that one task can read + # off a given stream while another one is + # writing to it + self.selector.modify(resource, evt_type, self.current_task) + else: + # One task reading and one writing on the same + # resource is fine (think producer-consumer), + # but having two tasks reading/writing at the + # same time can't lead to anything good, better + # disallow it + self.current_task.throw(ResourceBusy(f"The resource is being read from/written by another task")) diff --git a/aiosched/sync.py b/aiosched/sync.py index fa615a5..7fda2ad 100644 --- a/aiosched/sync.py +++ b/aiosched/sync.py @@ -123,7 +123,7 @@ class Queue: if not self.maxsize or len(self.container) < self.maxsize: self.container.append(item) if self.getters: - await self.getters.popleft().trigger(self.container.popleft()) + await self.getters.popleft().trigger() else: ev = Event() self.putters.append(ev) @@ -140,11 +140,11 @@ class Queue: if self.container: if self.putters: await self.putters.popleft().trigger() - return self.container.popleft() else: ev = Event() self.getters.append(ev) - return await ev.wait() + await ev.wait() + return self.container.popleft() async def clear(self): """