diff --git a/structio/core/kernels/fifo.py b/structio/core/kernels/fifo.py index 1a665e8..cd76089 100644 --- a/structio/core/kernels/fifo.py +++ b/structio/core/kernels/fifo.py @@ -141,6 +141,9 @@ class FIFOKernel(BaseKernel): def check_cancelled(self): if self.current_task.pending_cancellation: self.cancel_task(self.current_task) + else: + # We reschedule the caller immediately! + self.run_queue.appendleft(self.current_task) def schedule_point(self): self.reschedule_running() diff --git a/structio/sync.py b/structio/sync.py index 21684d8..59f30f6 100644 --- a/structio/sync.py +++ b/structio/sync.py @@ -1,5 +1,5 @@ # Task synchronization primitives -from structio.core.syscalls import suspend +from structio.core.syscalls import suspend, checkpoint from structio.core.run import current_task, current_loop from structio.core.abc import ChannelReader, ChannelWriter, Channel from structio.util.ki import enable_ki_protection @@ -36,6 +36,7 @@ class Event: """ if self.is_set(): + await checkpoint() return self.waiters.append(current_task()) await suspend() # We get re-scheduled by set() @@ -46,6 +47,7 @@ class Event: Sets the event, awaking all tasks that called wait() on it """ + if self.is_set(): raise RuntimeError("the event has already been set") self._set = True @@ -105,6 +107,7 @@ class Queue: enough space for the queue """ + await checkpoint() if self.getters: self.getters.popleft().set() self.container.append(item) @@ -123,6 +126,7 @@ class Queue: is empty """ + await checkpoint() if self.putters: self.putters.popleft().set() if not self.container: @@ -165,6 +169,7 @@ class MemorySendChannel(ChannelWriter): @enable_ki_protection async def close(self): + await checkpoint() if self._closed: raise IOError("cannot operate on a closed channel") self._closed = True @@ -189,6 +194,7 @@ class MemoryReceiveChannel(ChannelReader): @enable_ki_protection async def close(self): + await checkpoint() if self._closed: raise IOError("cannot operate on a closed channel") self._closed = True @@ -236,6 +242,7 @@ class Semaphore: self.waiters.append(Event()) await self.waiters[-1].wait() self.owners.add(current) + await checkpoint() @enable_ki_protection async def release(self): @@ -252,6 +259,7 @@ class Semaphore: self.owners.remove(current) if self.waiters: self.waiters.popleft().set() + await checkpoint() async def __aenter__(self): await self.acquire()