Initial work on LIFO and priority queues. Process pool skeleton
This commit is contained in:
parent
e1485d9317
commit
91ca2c8ee6
|
@ -737,3 +737,5 @@ class BaseKernel(ABC):
|
|||
|
||||
if not self.done() and not force:
|
||||
raise StructIOException("the event loop is running")
|
||||
|
||||
|
||||
|
|
|
@ -0,0 +1,36 @@
|
|||
"""
|
||||
A module implementing worker pools
|
||||
"""
|
||||
from multiprocessing import cpu_count
|
||||
from typing import Callable, Any
|
||||
|
||||
|
||||
from structio.parallel import Popen
|
||||
from structio import Queue
|
||||
|
||||
|
||||
class AsyncProcessPool:
|
||||
"""
|
||||
A worker pool for asynchronous processes.
|
||||
|
||||
:param max_workers: The maximum number of concurrent
|
||||
worker processes that can be running at any given
|
||||
time
|
||||
"""
|
||||
|
||||
|
||||
def __init__(self, max_workers: int = cpu_count()):
|
||||
"""
|
||||
Public object constructor
|
||||
"""
|
||||
|
||||
self.processes: Queue = Queue(max_workers)
|
||||
|
||||
@property
|
||||
def max_workers(self):
|
||||
return self.processes.maxsize
|
||||
|
||||
|
||||
async def submit(self, fn: Callable[[Any, Any], Any], *args, block: bool = True):
|
||||
if block:
|
||||
await self.processes.put((fn, args))
|
|
@ -10,6 +10,7 @@ from structio.core.task import Task
|
|||
from collections import deque, defaultdict
|
||||
from typing import Any, Callable, Coroutine
|
||||
from functools import partial, wraps
|
||||
from heapq import heappush, heappop
|
||||
|
||||
|
||||
class Event:
|
||||
|
@ -67,6 +68,12 @@ class Queue:
|
|||
An asynchronous FIFO queue
|
||||
"""
|
||||
|
||||
def _put_item(self, item):
|
||||
self.container.append(item)
|
||||
|
||||
def _get_item(self):
|
||||
return self.container.popleft()
|
||||
|
||||
def __init__(self, maxsize: int | None = None):
|
||||
"""
|
||||
Object constructor
|
||||
|
@ -121,7 +128,7 @@ class Queue:
|
|||
await self.putters[-1].wait()
|
||||
if self.getters:
|
||||
self.getters.popleft().set()
|
||||
self.container.append(item)
|
||||
self._put_item(item)
|
||||
await checkpoint()
|
||||
|
||||
@enable_ki_protection
|
||||
|
@ -136,7 +143,7 @@ class Queue:
|
|||
await self.getters[-1].wait()
|
||||
if self.putters:
|
||||
self.putters.popleft().set()
|
||||
result = self.container.popleft()
|
||||
result = self._get_item()
|
||||
await checkpoint()
|
||||
return result
|
||||
|
||||
|
@ -153,7 +160,7 @@ class Queue:
|
|||
raise WouldBlock()
|
||||
if self.putters:
|
||||
self.putters.popleft().set()
|
||||
return self.container.popleft()
|
||||
return self._get_item()
|
||||
|
||||
@enable_ki_protection
|
||||
def put_noblock(self, item: Any):
|
||||
|
@ -168,7 +175,7 @@ class Queue:
|
|||
raise WouldBlock()
|
||||
if self.getters:
|
||||
self.getters.popleft().set()
|
||||
self.container.append(item)
|
||||
self._put_item(item)
|
||||
|
||||
def clear(self):
|
||||
"""
|
||||
|
@ -187,6 +194,36 @@ class Queue:
|
|||
self.putters.clear()
|
||||
|
||||
|
||||
class LIFOQueue(Queue):
|
||||
"""
|
||||
A LIFO variant of the regular Queue class
|
||||
"""
|
||||
|
||||
def __init__(self, maxsize: int):
|
||||
super().__init__(maxsize)
|
||||
|
||||
def _get_item(self):
|
||||
return self.container.pop()
|
||||
|
||||
|
||||
class PriorityQueue(Queue):
|
||||
"""
|
||||
A queue with built-in priority. Lowest-priority
|
||||
items are retrieved first. The items to be stored
|
||||
in the queue must be comparable: consider using a
|
||||
wrapper dataclass if they aren't, as shown [here](https://docs.python.org/3/library/queue.html#queue.PriorityQueue)
|
||||
"""
|
||||
|
||||
def __init__(self, maxsize: int):
|
||||
super().__init__(maxsize)
|
||||
self.container: list[Any] = []
|
||||
|
||||
def _get_item(self):
|
||||
return heappop(self.container)
|
||||
|
||||
def _put_item(self, item):
|
||||
heappush(self.container, item)
|
||||
|
||||
class MemoryReceiveChannel(ChannelReader):
|
||||
"""
|
||||
An in-memory one-way channel to read
|
||||
|
|
Loading…
Reference in New Issue