diff --git a/giambio/_core.py b/giambio/_core.py index b44b54e..0bde401 100644 --- a/giambio/_core.py +++ b/giambio/_core.py @@ -18,13 +18,12 @@ limitations under the License. import types from collections import deque from selectors import DefaultSelector, EVENT_READ, EVENT_WRITE -from heapq import heappush, heappop import socket from .exceptions import AlreadyJoinedError, CancelledError, ResourceBusy from timeit import default_timer from time import sleep as wait from .socket import AsyncSocket, WantWrite, WantRead -from ._layers import Task +from ._layers import Task, TimeQueue from socket import SOL_SOCKET, SO_ERROR from ._traps import want_read, want_write @@ -44,14 +43,14 @@ class AsyncScheduler: """Object constructor""" self.tasks = deque() # Tasks that are ready to run - self.paused = [] # Tasks that are asleep self.selector = DefaultSelector() # Selector object to perform I/O multiplexing self.current_task = None # This will always point to the currently running coroutine (Task object) self.joined = {} # Maps child tasks that need to be joined their respective parent task self.clock = default_timer # Monotonic clock to keep track of elapsed time reliably - self.sequence = 0 # A monotonically increasing ID to avoid some corner cases with deadlines comparison + self.paused = TimeQueue(self.clock) # Tasks that are asleep self.events = {} # All Event objects self.event_waiting = {} # Coroutines waiting on event objects + self.sequence = 0 def run(self): """Starts the loop and 'listens' for events until there are either ready or asleep tasks @@ -111,8 +110,7 @@ class AsyncScheduler: wait(max(0.0, self.paused[0][0] - self.clock())) # Sleep until the closest deadline in order not to waste CPU cycles while self.paused[0][0] < self.clock(): # Reschedules tasks when their deadline has elapsed - _, __, task = heappop(self.paused) - self.tasks.append(task) + self.tasks.append(self.paused.get()) if not self.paused: break @@ -200,9 +198,10 @@ class AsyncScheduler: """Puts the caller to sleep for a given amount of seconds""" if seconds: - self.sequence += 1 self.current_task.status = "sleep" - heappush(self.paused, (self.clock() + seconds, self.sequence, self.current_task)) + self.paused.put(self.current_task, seconds) + else: + self.tasks.append(self.current_task) def event_set(self, event, value): """Sets an event""" diff --git a/giambio/_layers.py b/giambio/_layers.py index 2ea883e..b9135f5 100644 --- a/giambio/_layers.py +++ b/giambio/_layers.py @@ -16,6 +16,7 @@ limitations under the License. import types from ._traps import join, cancel, event_set, event_wait +from heapq import heappop, heappush class Task: @@ -85,3 +86,37 @@ class Event: if not self._timeout_expired: self.event_caught = True return msg + + +class TimeQueue: + """An abstraction layer over a heap queue based on time. This is where + sleeping tasks will be put when they are asleep""" + + def __init__(self, clock): + self.clock = clock + self.sequence = 0 + self.container = [] + + def __contains__(self, item): + return item in self.container + + def __iter__(self): + return iter(self.container) + + def __getitem__(self, item): + return self.container.__getitem__(item) + + def __bool__(self): + return bool(self.container) + + def __repr__(self): + return f"TimeQueue({self.container}, clock={self.clock})" + + def put(self, item, amount): + heappush(self.container, (self.clock() + amount, self.sequence, item)) + self.sequence += 1 + + def get(self): + return heappop(self.container)[2] + +