Added X_noblock methods to Queue class
This commit is contained in:
parent
95e265aca1
commit
51a5cd072a
|
@ -6,7 +6,7 @@ from structio.core.managers.signals.sigint import SigIntManager
|
|||
from structio.core.time.clock import DefaultClock
|
||||
from structio.core.syscalls import sleep, suspend as _suspend
|
||||
from structio.core.context import TaskPool, TaskScope
|
||||
from structio.exceptions import Cancelled, TimedOut, ResourceClosed
|
||||
from structio.exceptions import Cancelled, TimedOut, ResourceClosed, ResourceBroken, ResourceBusy, WouldBlock
|
||||
from structio.core import task
|
||||
from structio.core.task import Task, TaskState
|
||||
from structio.sync import (
|
||||
|
@ -165,4 +165,7 @@ __all__ = [
|
|||
"get_signal_handler",
|
||||
"set_signal_handler",
|
||||
"util",
|
||||
"ResourceBusy",
|
||||
"ResourceBroken",
|
||||
"WouldBlock"
|
||||
]
|
||||
|
|
|
@ -45,3 +45,10 @@ class ResourceBroken(StructIOException):
|
|||
Raised when an asynchronous resource gets
|
||||
corrupted and is no longer usable
|
||||
"""
|
||||
|
||||
|
||||
class WouldBlock(StructIOException):
|
||||
"""
|
||||
Raised when a non-blocking operation
|
||||
cannot be carried out immediately
|
||||
"""
|
|
@ -1,7 +1,7 @@
|
|||
# Task synchronization primitives
|
||||
import structio
|
||||
from structio.core.syscalls import suspend, checkpoint
|
||||
from structio.exceptions import ResourceClosed
|
||||
from structio.exceptions import ResourceClosed, WouldBlock
|
||||
from structio.core.run import current_task, current_loop
|
||||
from structio.abc import ChannelReader, ChannelWriter, Channel
|
||||
from structio.util.ki import enable_ki_protection
|
||||
|
@ -138,6 +138,36 @@ class Queue:
|
|||
await checkpoint()
|
||||
return result
|
||||
|
||||
@enable_ki_protection
|
||||
def get_noblock(self) -> Any:
|
||||
"""
|
||||
Equivalent of get(), but it raises
|
||||
structio.WouldBlock if there's no
|
||||
elements on the queue instead of
|
||||
blocking
|
||||
"""
|
||||
|
||||
if not self.container:
|
||||
raise WouldBlock()
|
||||
if self.putters:
|
||||
self.putters.popleft().set()
|
||||
return self.container.popleft()
|
||||
|
||||
@enable_ki_protection
|
||||
def put_noblock(self, item: Any):
|
||||
"""
|
||||
Equivalent of put(), but it raises
|
||||
structio.WouldBlock if there's not
|
||||
enough space on the queue instead
|
||||
of blocking
|
||||
"""
|
||||
|
||||
if self.maxsize and len(self.container) == self.maxsize:
|
||||
raise WouldBlock()
|
||||
if self.getters:
|
||||
self.getters.popleft().set()
|
||||
self.container.append(item)
|
||||
|
||||
def clear(self):
|
||||
"""
|
||||
Clears the queue
|
||||
|
|
|
@ -116,6 +116,14 @@ class AsyncThreadQueue(Queue):
|
|||
self.container.append(item)
|
||||
await checkpoint()
|
||||
|
||||
@enable_ki_protection
|
||||
def get_noblock(self) -> Any:
|
||||
return super().get_noblock()
|
||||
|
||||
@enable_ki_protection
|
||||
def put_noblock(self, item: Any):
|
||||
return super().put_noblock(item)
|
||||
|
||||
@enable_ki_protection
|
||||
def put_sync(self, item):
|
||||
"""
|
||||
|
@ -308,8 +316,10 @@ async def run_in_worker(
|
|||
# Worker thread has exited: we no longer need to process
|
||||
# any requests, so we shut our request handler down
|
||||
handler.cancel()
|
||||
# Fetch for the final result from the thread (this should not block)
|
||||
success, data = await rsq.get() # TODO: Implement get_noblock()/put_noblock()
|
||||
# Fetch for the final result from the thread. We use get_noblock()
|
||||
# because we know the result should already be there, so the operation
|
||||
# should not block (and if this raises WouldBlock, then it's a bug)
|
||||
success, data = rsq.get_noblock()
|
||||
if success:
|
||||
return data
|
||||
raise data
|
||||
|
|
Loading…
Reference in New Issue