ttlcollections/ttlcollections/structures.py

291 lines
10 KiB
Python

from collections import deque
from time import monotonic
from types import FunctionType
import math
from .errors import QueueEmpty, QueueFull, StackEmpty, StackFull
from heapq import heappush, heappop
from .objects import TTLItem
class TTLQueue:
"""A queue (FIFO) with per-item time to live (TTL)
When a TTL expires, its associated element will be deleted, but please
note that TTL expiration (and therefore, items deletion) is performed
only when doing mutating operations on the queue itself (put and get)
It is also possible to set a different TTL for every item and to
define the maximum queue size
Note: This queue is NOT thread safe and must be properly locked
when used with multiple threads
:param qsize: The max size of the queue, defaults to 0 (no limit)
:type qsize: int, optional
:param ttl: The TTL for every item in the queue, defaults to 0 (no TTL)
:type ttl: int, optional
:param timer: The timer function that the queue will use to
keep track of elapsed time. Defaults to time.monotonic(), but can
be customized. Any function that yields an incremental value
on each subsequent call is acceptable, but its return values
should not be repeated during runtime to avoid nonsense results
:type timer: class: FunctionType, optional
"""
def __init__(self, qsize: int = 0, ttl: int = 0, timer: FunctionType = monotonic):
"""Object constructor"""
self.qsize = qsize if qsize else math.inf # Infinite size
self.ttl = ttl
self.timer = timer
self._queue = deque()
def expire(self, when: int):
"""Pops expired element out of the queue if their TTL has
expired by when units of time (usually seconds)
:param when: The expiry date to check items against. Items' whose
insertion date, according to self.timer, is less or equal
than this number will be automatically deleted
:type when: int
"""
i = 0
n = len(self._queue)
while i < n:
try:
item = self._queue[i]
except IndexError:
break
if item.date <= when:
del self._queue[i]
i += 1
def put(self, element, ttl: int = 0):
"""Puts an item onto the queue
:param element: The element to put in the queue
:type element: object
:param ttl: If you want to override the default ttl
of the class for a specific element, you can specify
that, defaults to 0 (use self.ttl)
:param ttl: int, optional
:raises QueueFull: If the queue is full
"""
ttl = ttl if ttl else self.ttl
self.expire(self.timer())
if len(self._queue) < self.qsize:
self._queue.append(TTLItem(element, ttl + self.timer()))
else:
raise QueueFull("The queue is full!")
def get(self):
"""Gets an item from the queue, raising QueueEmpty if the
queue is empty
"""
self.expire(self.timer())
if not self._queue:
raise QueueEmpty("The queue is empty!")
return self._queue.popleft().obj
def __repr__(self):
"""Implements repr(self)"""
string = "TTLQueue({list}, qsize={qsize}, ttl={ttl}, timer={timer})"
values = [t.obj for t in self._queue]
return string.format(list=values, qsize=self.qsize, ttl=self.ttl, timer=self.timer)
def __iter__(self):
"""Implements iter(self)"""
for element in self._queue:
yield element.obj
class TTLStack:
"""A stack (LIFO) with per-item time to live (TTL)
All items inside the stack will be associated to a TTL (time to live).
When a TTL expires, its associated element will be deleted, but please
note that TTL expiration (and therefore, items deletion) is performed
only when doing mutating operations on the stack itself (push and pop)
It is also possible to set a different TTL for every item and to
define the maximum stack size
Note: This stack is NOT thread safe and must be properly locked
when used with multiple threads
:param size: The max size of the stack, defaults to 0 (no limit)
:type size: int, optional
:param ttl: The TTL for every item in the stack, defaults to 0 (no TTL)
:type ttl: int, optional
:param timer: The timer function that the stack will use to
keep track of elapsed time. Defaults to time.monotonic(), but can
be customized. Any function that yields an incremental value
on each subsequent call is acceptable, but its return values
should not be repeated during runtime to avoid nonsense results
:type timer: class: FunctionType, optional
"""
def __init__(self, size: int = 0, ttl: int = 0, timer: FunctionType = monotonic):
"""Object constructor"""
self.timer = timer
self.ttl = ttl
self.size = size if size else math.inf
self._stack = deque()
def push(self, element, ttl: int = 0):
"""Pushes an item onto the stack
:param element: The element to push
:type element: object
:param ttl: If you want to override the default ttl
of the class for a specific element, you can specify
that, defaults to 0 (use self.ttl)
:param ttl: int, optional
:raises StackFull: If the stack is full
"""
ttl = ttl if ttl else self.ttl
self.expire(self.timer())
if len(self._stack) < self.size:
self._stack.appendleft((self.timer() + ttl, element))
else:
raise StackFull("The stack is full!")
def pop(self):
"""Pops an item from the stack, raising StackEmpty if the
stack is empty
:raises StackEmpty: If the stack is empty
"""
self.expire(self.timer())
if not self._stack:
raise StackEmpty("The stack is empty!")
return self._stack.popleft()[1]
def __repr__(self):
"""Implements repr(self)"""
string = "TTLStack({list}, size={qsize}, ttl={ttl}, timer={timer})"
values = [t[1] for t in self._stack]
return string.format(list=values, qsize=self.size, ttl=self.ttl, timer=self.timer)
def expire(self, when: int):
"""Pops expired element out of the stack if their TTL has
expired by when units of time (usually seconds)
:param when: The expiry date to check items against. Items' whose
insertion date, according to self.timer, is less or equal
than this number will be automatically deleted
:type when: int
"""
i = 0
n = len(self._stack)
while i < n: # Using a for loop would raise RuntimeError
try:
date, element = self._stack[i]
except IndexError:
break
if date <= when:
del self._stack[i]
i += 1
class TTLHeap(TTLQueue):
"""A heap queue with per-item time to live (TTL)
All items inside the queue will be associated to a TTL (time to live).
When a TTL expires, its associated element will be deleted, but please
note that TTL expiration (and therefore, items deletion) is performed
only when doing mutating operations on the queue itself (put and get)
It is also possible to set a different TTL for every item and to
define the maximum queue size
Note: This queue is NOT thread safe and must be properly locked
when used with multiple threads
:param qsize: The max size of the queue, defaults to 0 (no limit)
:type qsize: int, optional
:param ttl: The TTL for every item in the queue, defaults to 0 (no TTL)
:type ttl: int, optional
:param timer: The timer function that the queue will use to
keep track of elapsed time. Defaults to time.monotonic(), but can
be customized. Any function that yields an incremental value
on each subsequent call is acceptable, but its return values
should not be repeated during runtime to avoid nonsense results
:type timer: class: FunctionType, optional
"""
def __init__(self, qsize: int = 0, ttl: int = 0, timer: FunctionType = monotonic):
"""Object constructor"""
super().__init__(qsize, ttl, timer)
self._queue = []
def __iter__(self):
"""Implements iter(self)"""
super().__iter__()
def __repr__(self):
"""Implements repr(self)"""
string = "TTLHeap({list}, qsize={qsize}, ttl={ttl}, timer={timer})"
values = [t.obj for t in self._queue]
return string.format(list=values, qsize=self.qsize, ttl=self.ttl, timer=self.timer)
def put(self, element, ttl: int = 0):
"""Puts an item onto the queue
:param element: The element to put in the queue
:type element: object
:param ttl: If you want to override the default ttl
of the class for a specific element, you can specify
that, defaults to 0 (use self.ttl)
:param ttl: int, optional
:raises QueueFull: If the queue is full
"""
ttl = ttl if ttl else self.ttl
self.expire(self.timer())
if len(self._queue) < self.qsize:
heappush(self._queue, TTLItem(element, self.timer() + ttl))
else:
raise QueueFull("The queue is full!")
def get(self):
"""Gets an item from the queue, raising QueueEmpty if the
queue is empty
"""
self.expire(self.timer())
if not self._queue:
raise QueueEmpty("The queue is empty!")
return heappop(self._queue).obj
def expire(self, when: int):
"""Pops expired element out of the queue if their TTL has
expired by when units of time (usually seconds)
:param when: The expiry date to check items against. Items' whose
insertion date, according to self.timer, is less or equal
than this number will be automatically deleted
:type when: int
"""
for item in self._queue:
if item.date <= when:
self._queue.remove(item)