From 51a5cd072a3e4308e7f2256e5ccd73d3c4d34c73 Mon Sep 17 00:00:00 2001 From: nocturn9x Date: Mon, 4 Sep 2023 18:35:27 +0200 Subject: [PATCH] Added X_noblock methods to Queue class --- structio/__init__.py | 5 ++++- structio/exceptions.py | 7 +++++++ structio/sync.py | 32 +++++++++++++++++++++++++++++++- structio/thread.py | 14 ++++++++++++-- 4 files changed, 54 insertions(+), 4 deletions(-) diff --git a/structio/__init__.py b/structio/__init__.py index 3f8abae..bf76ef2 100644 --- a/structio/__init__.py +++ b/structio/__init__.py @@ -6,7 +6,7 @@ from structio.core.managers.signals.sigint import SigIntManager from structio.core.time.clock import DefaultClock from structio.core.syscalls import sleep, suspend as _suspend from structio.core.context import TaskPool, TaskScope -from structio.exceptions import Cancelled, TimedOut, ResourceClosed +from structio.exceptions import Cancelled, TimedOut, ResourceClosed, ResourceBroken, ResourceBusy, WouldBlock from structio.core import task from structio.core.task import Task, TaskState from structio.sync import ( @@ -165,4 +165,7 @@ __all__ = [ "get_signal_handler", "set_signal_handler", "util", + "ResourceBusy", + "ResourceBroken", + "WouldBlock" ] diff --git a/structio/exceptions.py b/structio/exceptions.py index 8729441..a7dc1ff 100644 --- a/structio/exceptions.py +++ b/structio/exceptions.py @@ -45,3 +45,10 @@ class ResourceBroken(StructIOException): Raised when an asynchronous resource gets corrupted and is no longer usable """ + + +class WouldBlock(StructIOException): + """ + Raised when a non-blocking operation + cannot be carried out immediately + """ \ No newline at end of file diff --git a/structio/sync.py b/structio/sync.py index bdb563a..6f8a0c6 100644 --- a/structio/sync.py +++ b/structio/sync.py @@ -1,7 +1,7 @@ # Task synchronization primitives import structio from structio.core.syscalls import suspend, checkpoint -from structio.exceptions import ResourceClosed +from structio.exceptions import ResourceClosed, WouldBlock from structio.core.run import current_task, current_loop from structio.abc import ChannelReader, ChannelWriter, Channel from structio.util.ki import enable_ki_protection @@ -138,6 +138,36 @@ class Queue: await checkpoint() return result + @enable_ki_protection + def get_noblock(self) -> Any: + """ + Equivalent of get(), but it raises + structio.WouldBlock if there's no + elements on the queue instead of + blocking + """ + + if not self.container: + raise WouldBlock() + if self.putters: + self.putters.popleft().set() + return self.container.popleft() + + @enable_ki_protection + def put_noblock(self, item: Any): + """ + Equivalent of put(), but it raises + structio.WouldBlock if there's not + enough space on the queue instead + of blocking + """ + + if self.maxsize and len(self.container) == self.maxsize: + raise WouldBlock() + if self.getters: + self.getters.popleft().set() + self.container.append(item) + def clear(self): """ Clears the queue diff --git a/structio/thread.py b/structio/thread.py index 89b4822..c2135ab 100644 --- a/structio/thread.py +++ b/structio/thread.py @@ -116,6 +116,14 @@ class AsyncThreadQueue(Queue): self.container.append(item) await checkpoint() + @enable_ki_protection + def get_noblock(self) -> Any: + return super().get_noblock() + + @enable_ki_protection + def put_noblock(self, item: Any): + return super().put_noblock(item) + @enable_ki_protection def put_sync(self, item): """ @@ -308,8 +316,10 @@ async def run_in_worker( # Worker thread has exited: we no longer need to process # any requests, so we shut our request handler down handler.cancel() - # Fetch for the final result from the thread (this should not block) - success, data = await rsq.get() # TODO: Implement get_noblock()/put_noblock() + # Fetch for the final result from the thread. We use get_noblock() + # because we know the result should already be there, so the operation + # should not block (and if this raises WouldBlock, then it's a bug) + success, data = rsq.get_noblock() if success: return data raise data