Added full checkpoint support

This commit is contained in:
Mattia Giambirtone 2023-05-16 16:37:03 +02:00 committed by nocturn9x
parent cbbe8cc114
commit 52daf54ee3
Signed by: nocturn9x
GPG Key ID: 8270F9F467971E59
2 changed files with 12 additions and 1 deletions

View File

@ -141,6 +141,9 @@ class FIFOKernel(BaseKernel):
def check_cancelled(self):
if self.current_task.pending_cancellation:
self.cancel_task(self.current_task)
else:
# We reschedule the caller immediately!
self.run_queue.appendleft(self.current_task)
def schedule_point(self):
self.reschedule_running()

View File

@ -1,5 +1,5 @@
# Task synchronization primitives
from structio.core.syscalls import suspend
from structio.core.syscalls import suspend, checkpoint
from structio.core.run import current_task, current_loop
from structio.core.abc import ChannelReader, ChannelWriter, Channel
from structio.util.ki import enable_ki_protection
@ -36,6 +36,7 @@ class Event:
"""
if self.is_set():
await checkpoint()
return
self.waiters.append(current_task())
await suspend() # We get re-scheduled by set()
@ -46,6 +47,7 @@ class Event:
Sets the event, awaking all tasks
that called wait() on it
"""
if self.is_set():
raise RuntimeError("the event has already been set")
self._set = True
@ -105,6 +107,7 @@ class Queue:
enough space for the queue
"""
await checkpoint()
if self.getters:
self.getters.popleft().set()
self.container.append(item)
@ -123,6 +126,7 @@ class Queue:
is empty
"""
await checkpoint()
if self.putters:
self.putters.popleft().set()
if not self.container:
@ -165,6 +169,7 @@ class MemorySendChannel(ChannelWriter):
@enable_ki_protection
async def close(self):
await checkpoint()
if self._closed:
raise IOError("cannot operate on a closed channel")
self._closed = True
@ -189,6 +194,7 @@ class MemoryReceiveChannel(ChannelReader):
@enable_ki_protection
async def close(self):
await checkpoint()
if self._closed:
raise IOError("cannot operate on a closed channel")
self._closed = True
@ -236,6 +242,7 @@ class Semaphore:
self.waiters.append(Event())
await self.waiters[-1].wait()
self.owners.add(current)
await checkpoint()
@enable_ki_protection
async def release(self):
@ -252,6 +259,7 @@ class Semaphore:
self.owners.remove(current)
if self.waiters:
self.waiters.popleft().set()
await checkpoint()
async def __aenter__(self):
await self.acquire()