348 lines
12 KiB
Python
348 lines
12 KiB
Python
from collections import deque
|
|
from time import monotonic
|
|
from types import FunctionType
|
|
from math import inf
|
|
from .errors import QueueEmpty, QueueFull, StackEmpty, StackFull
|
|
from heapq import heappush, heappop
|
|
from .objects import TTLItem
|
|
|
|
|
|
class TTLQueue:
|
|
|
|
"""A queue (FIFO) with per-item time to live (TTL)
|
|
|
|
When a TTL expires, its associated element will be deleted, but please
|
|
note that TTL expiration (and therefore, items deletion) is performed
|
|
only when doing mutating operations (e.g. put and get)
|
|
on the queue itself and when performing operations using the 'in' operator
|
|
|
|
It is also possible to set a different TTL for every item and to
|
|
define the maximum queue size
|
|
|
|
Note: This queue is NOT thread safe and must be properly locked
|
|
when used with multiple threads
|
|
|
|
:param qsize: The max size of the queue, defaults to 0 (no limit)
|
|
:type qsize: int, optional
|
|
:param ttl: The TTL for every item in the queue, defaults to 0 (no TTL)
|
|
:type ttl: int, optional
|
|
:param timer: The timer function that the queue will use to
|
|
keep track of elapsed time. Defaults to time.monotonic(), but can
|
|
be customized. Any function that yields an incremental value
|
|
on each subsequent call is acceptable, but its return values
|
|
should not be repeated during runtime to avoid nonsense results
|
|
:type timer: class: FunctionType, optional
|
|
"""
|
|
|
|
def __init__(self, qsize: int = 0, ttl: int = 0, timer: FunctionType = monotonic):
|
|
"""Object constructor"""
|
|
|
|
self.qsize = qsize if qsize else inf # Infinite size
|
|
self.ttl = ttl
|
|
if self.ttl < 0:
|
|
raise ValueError("ttl can't be negative!")
|
|
if self.qsize < 0:
|
|
raise ValueError("qsize can't be negative!")
|
|
self.timer = timer
|
|
self._queue = deque()
|
|
|
|
def expire(self, when: int):
|
|
"""Pops expired element out of the queue if their TTL has
|
|
expired by when units of time (usually seconds)
|
|
|
|
:param when: The expiry date to check items against. Items' whose
|
|
insertion date, according to self.timer, is less or equal
|
|
than this number will be automatically deleted
|
|
:type when: int
|
|
"""
|
|
|
|
i = 0
|
|
n = len(self._queue)
|
|
while i < n: # Not the nicest-looking way of doing it, but surely works and is efficient
|
|
item = self._queue[i]
|
|
if item.date <= when:
|
|
self._queue.remove(item)
|
|
n = len(self._queue)
|
|
i -= 1
|
|
i += 1
|
|
|
|
def put(self, element, ttl: int = 0):
|
|
"""Puts an item onto the queue
|
|
|
|
:param element: The element to put in the queue
|
|
:type element: object
|
|
:param ttl: If you want to override the default ttl
|
|
of the class for a specific element, you can specify
|
|
that, defaults to 0 (use self.ttl)
|
|
:param ttl: int, optional
|
|
:raises QueueFull: If the queue is full
|
|
"""
|
|
|
|
self.expire(self.timer())
|
|
ttl = ttl if ttl else self.ttl
|
|
ttl += self.timer()
|
|
if len(self._queue) < self.qsize:
|
|
self._queue.append(TTLItem(element, ttl))
|
|
else:
|
|
raise QueueFull("The queue is full!")
|
|
|
|
def get(self):
|
|
"""Gets an item from the queue, raising QueueEmpty if the
|
|
queue is empty
|
|
"""
|
|
|
|
self.expire(self.timer())
|
|
if not self._queue:
|
|
raise QueueEmpty("The queue is empty!")
|
|
return self._queue.popleft().obj
|
|
|
|
def __repr__(self):
|
|
"""Implements repr(self)"""
|
|
|
|
string = "TTLQueue({list}, qsize={qsize}, ttl={ttl}, timer={timer})"
|
|
values = [t.obj for t in self._queue]
|
|
return string.format(list=values, qsize=self.qsize, ttl=self.ttl, timer=self.timer)
|
|
|
|
def __iter__(self):
|
|
"""Implements iter(self)"""
|
|
|
|
for item in self._queue:
|
|
yield item.obj
|
|
|
|
def __contains__(self, item):
|
|
"""Implements item in self"""
|
|
|
|
self.expire(self.timer())
|
|
return self._queue.__contains__(TTLItem(item, None))
|
|
|
|
def __bool__(self):
|
|
"""Implement bool(self)"""
|
|
|
|
return bool(self._queue)
|
|
|
|
def __eq__(self, other: object):
|
|
"""Implements self == other"""
|
|
|
|
queue = [item.obj for item in self._queue]
|
|
return queue == other
|
|
|
|
|
|
class TTLStack:
|
|
"""A stack (LIFO) with per-item time to live (TTL)
|
|
|
|
All items inside the stack will be associated to a TTL (time to live).
|
|
When a TTL expires, its associated element will be deleted, but please
|
|
note that TTL expiration (and therefore, items deletion) is performed
|
|
only when doing mutating operations on the stack itself (e.g. push/pop)
|
|
and when performing operations using the 'in' operator
|
|
|
|
It is also possible to set a different TTL for every item and to
|
|
define the maximum stack size
|
|
|
|
Note: This stack is NOT thread safe and must be properly locked
|
|
when used with multiple threads
|
|
|
|
:param size: The max size of the stack, defaults to 0 (no limit)
|
|
:type size: int, optional
|
|
:param ttl: The TTL for every item in the stack, defaults to 0 (no TTL)
|
|
:type ttl: int, optional
|
|
:param timer: The timer function that the stack will use to
|
|
keep track of elapsed time. Defaults to time.monotonic(), but can
|
|
be customized. Any function that yields an incremental value
|
|
on each subsequent call is acceptable, but its return values
|
|
should not be repeated during runtime to avoid nonsense results
|
|
:type timer: class: FunctionType, optional
|
|
"""
|
|
|
|
def __init__(self, size: int = 0, ttl: int = 0, timer: FunctionType = monotonic):
|
|
"""Object constructor"""
|
|
|
|
self.timer = timer
|
|
self.ttl = ttl
|
|
self.size = size if size else inf
|
|
if self.ttl < 0:
|
|
raise ValueError("ttl can't be negative!")
|
|
if self.size < 0:
|
|
raise ValueError("size can't be negative!")
|
|
self._stack = deque()
|
|
|
|
def push(self, element, ttl: int = 0):
|
|
"""Pushes an item onto the stack
|
|
|
|
:param element: The element to push
|
|
:type element: object
|
|
:param ttl: If you want to override the default ttl
|
|
of the class for a specific element, you can specify
|
|
that, defaults to 0 (use self.ttl)
|
|
:param ttl: int, optional
|
|
:raises StackFull: If the stack is full
|
|
"""
|
|
|
|
self.expire(self.timer())
|
|
ttl = ttl if ttl else self.ttl
|
|
ttl += self.timer()
|
|
if len(self._stack) < self.size:
|
|
self._stack.appendleft(TTLItem(element, ttl))
|
|
else:
|
|
raise StackFull("The stack is full!")
|
|
|
|
def pop(self):
|
|
"""Pops an item from the stack, raising StackEmpty if the
|
|
stack is empty
|
|
|
|
:raises StackEmpty: If the stack is empty
|
|
"""
|
|
|
|
self.expire(self.timer())
|
|
if not self._stack:
|
|
raise StackEmpty("The stack is empty!")
|
|
return self._stack.popleft().obj
|
|
|
|
def __repr__(self):
|
|
"""Implements repr(self)"""
|
|
|
|
string = "TTLStack({list}, size={qsize}, ttl={ttl}, timer={timer})"
|
|
values = [t.obj for t in self._stack]
|
|
return string.format(list=values, qsize=self.size, ttl=self.ttl, timer=self.timer)
|
|
|
|
def expire(self, when: int):
|
|
"""Pops expired element out of the stack if their TTL has
|
|
expired by when units of time (usually seconds)
|
|
|
|
:param when: The expiry date to check items against. Items' whose
|
|
insertion date, according to self.timer, is less or equal
|
|
than this number will be automatically deleted
|
|
:type when: int
|
|
"""
|
|
|
|
i = 0
|
|
n = len(self._stack)
|
|
while i < n:
|
|
item = self._stack[i]
|
|
if item.date <= when:
|
|
self._stack.remove(item)
|
|
n = len(self._stack)
|
|
i -= 1
|
|
i += 1
|
|
|
|
def __contains__(self, item):
|
|
"""Implements item in self"""
|
|
|
|
self.expire(self.timer())
|
|
return self._stack.__contains__(TTLItem(item, None))
|
|
|
|
def __bool__(self):
|
|
"""Implement bool(self)"""
|
|
|
|
return bool(self._stack)
|
|
|
|
def __eq__(self, other: object):
|
|
"""Implements self == other"""
|
|
|
|
stack = [item.obj for item in self._stack]
|
|
return stack == other
|
|
|
|
|
|
class TTLHeap(TTLQueue):
|
|
"""A heap queue with per-item time to live (TTL)
|
|
|
|
All items inside the queue will be associated to a TTL (time to live).
|
|
When a TTL expires, its associated element will be deleted, but please
|
|
note that TTL expiration (and therefore, items deletion) is performed
|
|
only when doing mutating operations (put/get) on the queue itself and
|
|
when performing operations using the 'in' operator.
|
|
|
|
It is also possible to set a different TTL for every item and to
|
|
define the maximum queue size
|
|
|
|
Note: This queue is NOT thread safe and must be properly locked
|
|
when used with multiple threads
|
|
|
|
:param qsize: The max size of the queue, defaults to 0 (no limit)
|
|
:type qsize: int, optional
|
|
:param ttl: The TTL for every item in the queue, defaults to 0 (no TTL)
|
|
:type ttl: int, optional
|
|
:param timer: The timer function that the queue will use to
|
|
keep track of elapsed time. Defaults to time.monotonic(), but can
|
|
be customized. Any function that yields an incremental value
|
|
on each subsequent call is acceptable, but its return values
|
|
should not be repeated during runtime to avoid nonsense results
|
|
:type timer: class: FunctionType, optional
|
|
"""
|
|
|
|
def __init__(self, qsize: int = 0, ttl: int = 0, timer: FunctionType = monotonic):
|
|
"""Object constructor"""
|
|
|
|
super().__init__(qsize, ttl, timer)
|
|
self._queue = []
|
|
|
|
def __iter__(self):
|
|
"""Implements iter(self)"""
|
|
|
|
super().__iter__()
|
|
|
|
def __repr__(self):
|
|
"""Implements repr(self)"""
|
|
|
|
string = "TTLHeap({list}, qsize={qsize}, ttl={ttl}, timer={timer})"
|
|
values = [t.obj for t in self._queue]
|
|
return string.format(list=values, qsize=self.qsize, ttl=self.ttl, timer=self.timer)
|
|
|
|
def __contains__(self, item):
|
|
"""Implements item in self"""
|
|
|
|
super().__contains__(item)
|
|
|
|
def put(self, element, ttl: int = 0):
|
|
"""Puts an item onto the queue
|
|
|
|
:param element: The element to put in the queue
|
|
:type element: object
|
|
:param ttl: If you want to override the default ttl
|
|
of the class for a specific element, you can specify
|
|
that, defaults to 0 (use self.ttl)
|
|
:param ttl: int, optional
|
|
:raises QueueFull: If the queue is full
|
|
"""
|
|
|
|
self.expire(self.timer())
|
|
ttl = ttl if ttl else self.ttl
|
|
ttl += self.timer()
|
|
if len(self._queue) < self.qsize:
|
|
heappush(self._queue, TTLItem(element, ttl))
|
|
else:
|
|
raise QueueFull("The queue is full!")
|
|
|
|
def get(self):
|
|
"""Gets an item from the queue, raising QueueEmpty if the
|
|
queue is empty
|
|
"""
|
|
|
|
self.expire(self.timer())
|
|
if not self._queue:
|
|
raise QueueEmpty("The queue is empty!")
|
|
return heappop(self._queue).obj
|
|
|
|
|
|
def expire(self, when: int):
|
|
"""Pops expired element out of the queue if their TTL has
|
|
expired by when units of time (usually seconds)
|
|
|
|
:param when: The expiry date to check items against. Items' whose
|
|
insertion date, according to self.timer, is less or equal
|
|
than this number will be automatically deleted
|
|
:type when: int
|
|
"""
|
|
|
|
super().expire(when)
|
|
|
|
def __bool__(self):
|
|
"""Implement bool(self)"""
|
|
|
|
return super().__bool__()
|
|
|
|
def __eq__(self, other: object):
|
|
"""Implements self == other"""
|
|
|
|
return super().__eq__(other)
|