diff --git a/giambio/core.py b/giambio/core.py index d429895..f7fe30d 100644 --- a/giambio/core.py +++ b/giambio/core.py @@ -19,7 +19,6 @@ limitations under the License. # Import libraries and internal resources import types import socket -from time import sleep as wait from timeit import default_timer from giambio.objects import Task, TimeQueue, DeadlinesQueue from giambio.traps import want_read, want_write @@ -228,7 +227,8 @@ class AsyncScheduler: while self.deadlines and self.deadlines[0][0] <= self.clock(): _, __, pool = self.deadlines.get() pool.timed_out = True - self.current_task.throw(TooSlowError()) + self.cancel_all_from_current_pool(pool) + raise TooSlowError() def check_events(self): """ @@ -332,13 +332,6 @@ class AsyncScheduler: # occur self.tasks.append(t) - def get_event_tasks(self): - """ - Returns all tasks currently waiting on events - """ - - return set(waiter for waiter in (evt.waiters for evt in self.events)) - def cancel_all_from_current_pool(self, pool=None): """ Cancels all tasks in the current pool (or the given one) @@ -357,22 +350,60 @@ class AsyncScheduler: else: # If we're at the main task, we're sure everything else exited return True - def get_io_tasks(self): + def get_event_tasks(self): """ - Return all tasks waiting on I/O resources + Yields all tasks currently waiting on events """ - return [k.data for k in self.selector.get_map().values()] + for evt in self.events: + for waiter in evt.waiters: + yield waiter - def get_all_tasks(self): + def get_asleep_tasks(self): """ - Returns all tasks which the loop is currently keeping track of. - This includes both running and paused tasks. A paused task is a - task which is either waiting on an I/O resource, sleeping, or - waiting on an event to be triggered + Yields all tasks currently sleeping """ - return chain(self.tasks, self.paused, self.get_event_tasks(), self.get_io_tasks()) + for asleep in self.paused.container: + yield asleep[2] + + def get_io_tasks(self) -> set: + """ + Yields all tasks waiting on I/O resources + """ + + for k in self.selector.get_map().values(): + yield k.data + + def get_all_tasks(self) -> set: + """ + Returns a generator yielding all tasks which the loop is currently + keeping track of. This includes both running and paused tasks. + A paused task is a task which is either waiting on an I/O resource, + sleeping, or waiting on an event to be triggered + """ + + return chain(self.tasks, + self.get_asleep_tasks(), + self.get_event_tasks(), + self.get_io_tasks()) + + def ensure_discard(self, task: Task): + """ + This method ensures that tasks that need to be cancelled are not + rescheduled further. This will act upon paused tasks only + """ + + if task in self.paused: + self.paused.discard(task) + elif self.selector.get_map(): + for key in self.selector.get_map().values(): + if key.data == task: + self.selector.unregister(task) + elif task in self.get_event_tasks(): + for evt in self.events: + if task in evt.waiters: + evt.waiters.remove(task) def cancel_all(self): """ @@ -467,7 +498,7 @@ class AsyncScheduler: task.cancelled = True task.cancel_pending = False self.debugger.after_cancel(task) - self.paused.discard(task) + self.ensure_discard(task) else: # If we can't cancel in a somewhat "graceful" way, we just # defer this operation for later (check run() for more info) diff --git a/giambio/objects.py b/giambio/objects.py index c43c77e..2550720 100644 --- a/giambio/objects.py +++ b/giambio/objects.py @@ -149,7 +149,7 @@ class TimeQueue: def __iter__(self): return self - + def __next__(self): try: return self.get() @@ -230,4 +230,4 @@ class DeadlinesQueue(TimeQueue): d = heappop(self.container) self.pools.discard(d[2]) - return d \ No newline at end of file + return d