mirror of https://github.com/nocturn9x/giambio.git
Added a TimeQueue implementation
This commit is contained in:
parent
979e9959c6
commit
eb8770d0bf
|
@ -18,13 +18,12 @@ limitations under the License.
|
||||||
import types
|
import types
|
||||||
from collections import deque
|
from collections import deque
|
||||||
from selectors import DefaultSelector, EVENT_READ, EVENT_WRITE
|
from selectors import DefaultSelector, EVENT_READ, EVENT_WRITE
|
||||||
from heapq import heappush, heappop
|
|
||||||
import socket
|
import socket
|
||||||
from .exceptions import AlreadyJoinedError, CancelledError, ResourceBusy
|
from .exceptions import AlreadyJoinedError, CancelledError, ResourceBusy
|
||||||
from timeit import default_timer
|
from timeit import default_timer
|
||||||
from time import sleep as wait
|
from time import sleep as wait
|
||||||
from .socket import AsyncSocket, WantWrite, WantRead
|
from .socket import AsyncSocket, WantWrite, WantRead
|
||||||
from ._layers import Task
|
from ._layers import Task, TimeQueue
|
||||||
from socket import SOL_SOCKET, SO_ERROR
|
from socket import SOL_SOCKET, SO_ERROR
|
||||||
from ._traps import want_read, want_write
|
from ._traps import want_read, want_write
|
||||||
|
|
||||||
|
@ -44,14 +43,14 @@ class AsyncScheduler:
|
||||||
"""Object constructor"""
|
"""Object constructor"""
|
||||||
|
|
||||||
self.tasks = deque() # Tasks that are ready to run
|
self.tasks = deque() # Tasks that are ready to run
|
||||||
self.paused = [] # Tasks that are asleep
|
|
||||||
self.selector = DefaultSelector() # Selector object to perform I/O multiplexing
|
self.selector = DefaultSelector() # Selector object to perform I/O multiplexing
|
||||||
self.current_task = None # This will always point to the currently running coroutine (Task object)
|
self.current_task = None # This will always point to the currently running coroutine (Task object)
|
||||||
self.joined = {} # Maps child tasks that need to be joined their respective parent task
|
self.joined = {} # Maps child tasks that need to be joined their respective parent task
|
||||||
self.clock = default_timer # Monotonic clock to keep track of elapsed time reliably
|
self.clock = default_timer # Monotonic clock to keep track of elapsed time reliably
|
||||||
self.sequence = 0 # A monotonically increasing ID to avoid some corner cases with deadlines comparison
|
self.paused = TimeQueue(self.clock) # Tasks that are asleep
|
||||||
self.events = {} # All Event objects
|
self.events = {} # All Event objects
|
||||||
self.event_waiting = {} # Coroutines waiting on event objects
|
self.event_waiting = {} # Coroutines waiting on event objects
|
||||||
|
self.sequence = 0
|
||||||
|
|
||||||
def run(self):
|
def run(self):
|
||||||
"""Starts the loop and 'listens' for events until there are either ready or asleep tasks
|
"""Starts the loop and 'listens' for events until there are either ready or asleep tasks
|
||||||
|
@ -111,8 +110,7 @@ class AsyncScheduler:
|
||||||
|
|
||||||
wait(max(0.0, self.paused[0][0] - self.clock())) # Sleep until the closest deadline in order not to waste CPU cycles
|
wait(max(0.0, self.paused[0][0] - self.clock())) # Sleep until the closest deadline in order not to waste CPU cycles
|
||||||
while self.paused[0][0] < self.clock(): # Reschedules tasks when their deadline has elapsed
|
while self.paused[0][0] < self.clock(): # Reschedules tasks when their deadline has elapsed
|
||||||
_, __, task = heappop(self.paused)
|
self.tasks.append(self.paused.get())
|
||||||
self.tasks.append(task)
|
|
||||||
if not self.paused:
|
if not self.paused:
|
||||||
break
|
break
|
||||||
|
|
||||||
|
@ -200,9 +198,10 @@ class AsyncScheduler:
|
||||||
"""Puts the caller to sleep for a given amount of seconds"""
|
"""Puts the caller to sleep for a given amount of seconds"""
|
||||||
|
|
||||||
if seconds:
|
if seconds:
|
||||||
self.sequence += 1
|
|
||||||
self.current_task.status = "sleep"
|
self.current_task.status = "sleep"
|
||||||
heappush(self.paused, (self.clock() + seconds, self.sequence, self.current_task))
|
self.paused.put(self.current_task, seconds)
|
||||||
|
else:
|
||||||
|
self.tasks.append(self.current_task)
|
||||||
|
|
||||||
def event_set(self, event, value):
|
def event_set(self, event, value):
|
||||||
"""Sets an event"""
|
"""Sets an event"""
|
||||||
|
|
|
@ -16,6 +16,7 @@ limitations under the License.
|
||||||
|
|
||||||
import types
|
import types
|
||||||
from ._traps import join, cancel, event_set, event_wait
|
from ._traps import join, cancel, event_set, event_wait
|
||||||
|
from heapq import heappop, heappush
|
||||||
|
|
||||||
class Task:
|
class Task:
|
||||||
|
|
||||||
|
@ -85,3 +86,37 @@ class Event:
|
||||||
if not self._timeout_expired:
|
if not self._timeout_expired:
|
||||||
self.event_caught = True
|
self.event_caught = True
|
||||||
return msg
|
return msg
|
||||||
|
|
||||||
|
|
||||||
|
class TimeQueue:
|
||||||
|
"""An abstraction layer over a heap queue based on time. This is where
|
||||||
|
sleeping tasks will be put when they are asleep"""
|
||||||
|
|
||||||
|
def __init__(self, clock):
|
||||||
|
self.clock = clock
|
||||||
|
self.sequence = 0
|
||||||
|
self.container = []
|
||||||
|
|
||||||
|
def __contains__(self, item):
|
||||||
|
return item in self.container
|
||||||
|
|
||||||
|
def __iter__(self):
|
||||||
|
return iter(self.container)
|
||||||
|
|
||||||
|
def __getitem__(self, item):
|
||||||
|
return self.container.__getitem__(item)
|
||||||
|
|
||||||
|
def __bool__(self):
|
||||||
|
return bool(self.container)
|
||||||
|
|
||||||
|
def __repr__(self):
|
||||||
|
return f"TimeQueue({self.container}, clock={self.clock})"
|
||||||
|
|
||||||
|
def put(self, item, amount):
|
||||||
|
heappush(self.container, (self.clock() + amount, self.sequence, item))
|
||||||
|
self.sequence += 1
|
||||||
|
|
||||||
|
def get(self):
|
||||||
|
return heappop(self.container)[2]
|
||||||
|
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue