diff --git a/giambio/context.py b/giambio/context.py index ba865d2..a5a3d3a 100644 --- a/giambio/context.py +++ b/giambio/context.py @@ -18,8 +18,8 @@ limitations under the License. import types -from .objects import Task -from .core import AsyncScheduler +from giambio.objects import Task +from giambio.core import AsyncScheduler class TaskManager: @@ -33,9 +33,13 @@ class TaskManager: """ self.loop = loop - self.tasks = [] # We store a reference to all tasks, even the asleep ones! + self.tasks = [] # We store a reference to all tasks in this pool, even the paused ones! + self.cancelled = False self.started = self.loop.clock() - self.timeout = self.started + timeout + if timeout: + self.timeout = self.started + timeout + else: + self.timeout = None def spawn(self, func: types.FunctionType, *args): """ @@ -44,6 +48,7 @@ class TaskManager: task = Task(func(*args), func.__name__ or str(func), self) task.joiners = [self.loop.current_task] + task.next_deadline = self.timeout or 0.0 self.loop.tasks.append(task) self.loop.debugger.on_task_spawn(task) self.tasks.append(task) @@ -57,6 +62,7 @@ class TaskManager: assert n >= 0, "The time delay can't be negative" task = Task(func(*args), func.__name__ or str(func), self) task.joiners = [self.loop.current_task] + task.next_deadline = self.timeout or 0.0 task.sleep_start = self.loop.clock() self.loop.paused.put(task, n) self.loop.debugger.on_task_schedule(task, n) @@ -68,7 +74,17 @@ class TaskManager: async def __aexit__(self, exc_type: Exception, exc: Exception, tb): for task in self.tasks: - # This forces Python to block at the + # This forces Python to stop at the # end of the block and wait for all # children to exit await task.join() + self.tasks.remove(task) + + async def cancel(self): + """ + Cancels the whole block + """ + + # TODO: This breaks, somehow, investigation needed + for task in self.tasks: + await task.cancel() diff --git a/giambio/core.py b/giambio/core.py index ec8497f..979b4a1 100644 --- a/giambio/core.py +++ b/giambio/core.py @@ -21,19 +21,21 @@ import types import socket from time import sleep as wait from timeit import default_timer -from .objects import Task, TimeQueue -from .traps import want_read, want_write -from .util.debug import BaseDebugger +from giambio.objects import Task, TimeQueue, DeadlinesQueue +from giambio.traps import want_read, want_write +from giambio.util.debug import BaseDebugger from itertools import chain from selectors import DefaultSelector, EVENT_READ, EVENT_WRITE -from .exceptions import (InternalError, - CancelledError, - ResourceBusy, - TooSlowError - ) +from giambio.exceptions import (InternalError, + CancelledError, + ResourceBusy, + GiambioError, + TooSlowError + ) IOInterrupt = (BlockingIOError, InterruptedError) +IO_SKIP_LIMIT = 5 # TODO: Inspect this class AsyncScheduler: @@ -75,6 +77,12 @@ class AsyncScheduler: self.has_ran = False # The current pool self.current_pool = None + # How many times we skipped I/O checks to let a task run + # We limit the number of times we skip such checks to avoid + # I/O starvation in highly concurrent systems + self.io_skip = 0 + # A heap queue of deadlines to be checked + self.deadlines = DeadlinesQueue() def done(self): """ @@ -107,17 +115,16 @@ class AsyncScheduler: # sleeping tasks, no events to deliver, # no I/O to do and no running tasks, we # simply tear us down and return to self.start - self.shutdown() + self.close() break elif not self.tasks: - # If there are no actively running tasks - # we try to schedule the asleep ones + # We start by checking for I/O + self.check_io() if self.paused: + # Next, if there are no actively running tasks + # we try to schedule the asleep ones self.awake_sleeping() - if self.selector.get_map(): - # The next step is checking for I/O - self.check_io() - # Try to awake event-waiting tasks + # Then we try to awake event-waiting tasks if self.events: self.check_events() # Otherwise, while there are tasks ready to run, well, run them! @@ -150,24 +157,47 @@ class AsyncScheduler: # Sneaky method call, thanks to David Beazley for this ;) getattr(self, method)(*args) except AttributeError: # If this happens, that's quite bad! + # This exception block is meant to be triggered by other async + # libraries, which most likely have different trap names and behaviors. + # If you get this exception and you're 100% sure you're not mixing + # async primitives from other libraries, then it's a bug! raise InternalError("Uh oh! Something very bad just happened, did" " you try to mix primitives from other async libraries?") from None except CancelledError: - # Task was cancelled (pending cancellation) + # When a task needs to be cancelled, giambio tries to do it gracefully + # first: if the task is paused in either I/O or sleeping, that's perfect. + # But we also need to cancel a task if it was not sleeping or waiting on + # any I/O because it could never do so (therefore blocking everything + # forever). So, when cancellation can't be done right away, we schedule + # if for the next execution step of the task. Giambio will also make sure + # to re-raise cancellations at every checkpoint until the task lets the + # exception propagate into us, because we *really* want the task to be + # cancelled, and since asking kindly didn't work we have to use some + # force :) self.current_task.status = "cancelled" self.current_task.cancelled = True self.current_task.cancel_pending = False self.debugger.after_cancel(self.current_task) + self.join(self.current_task) except StopIteration as ret: - # Task finished executing + # At the end of the day, coroutines are generator functions with + # some tricky behaviors, and this is one of them. When a coroutine + # hits a return statement (either explicit or implicit), it raises + # a StopIteration exception, which has an attribute named value that + # represents the return value of the coroutine, if any. Of course this + # exception is not an error and we should happily keep going after it, + # most of this code below is just useful for internal/debugging purposes self.current_task.status = "end" self.current_task.result = ret.value self.current_task.finished = True self.debugger.on_task_exit(self.current_task) self.join(self.current_task) except BaseException as err: - # Task raised an exception + # TODO: We might want to do a bit more complex traceback hacking to remove any extra + # frames from the exception call stack, but for now removing at least the first one + # seems a sensible approach (it's us catching it so we don't care about that) self.current_task.exc = err + self.current_task.exc.__traceback__ = self.current_task.exc.__traceback__.tb_next self.current_task.status = "crashed" self.debugger.on_exception_raised(self.current_task, err) self.join(self.current_task) # This propagates the exception @@ -192,17 +222,6 @@ class AsyncScheduler: self.tasks.append(self.current_task) self.to_send = self.current_task - def check_timeouts(self): - """ - Checks for expired timeouts and raises appropriate - errors - """ - - if self.clock() >= self.current_pool.timeout: - # A pool with a timeout has expired! - self.cancel_all_from_current_pool() - raise TooSlowError() - def check_events(self): """ Checks for ready or expired events and triggers them @@ -216,20 +235,17 @@ class AsyncScheduler: def awake_sleeping(self): """ - Checks for and reschedules sleeping tasks + Reschedules sleeping tasks if their deadline + has elapsed """ - wait(max(0.0, self.paused[0][0] - self.clock())) - # Sleep until the closest deadline in order not to waste CPU cycles - while self.paused[0][0] < self.clock(): + while self.paused and self.paused[0][0] < self.clock(): # Reschedules tasks when their deadline has elapsed task = self.paused.get() slept = self.clock() - task.sleep_start - task.sleep_start = None + task.sleep_start = 0.0 self.tasks.append(task) self.debugger.after_sleep(task, slept) - if not self.paused: - break def check_io(self): """ @@ -238,20 +254,44 @@ class AsyncScheduler: before_time = self.clock() # Used for the debugger if self.tasks or self.events: - # If there are either tasks or events and no I/O, never wait - timeout = 0.0 + self.io_skip += 1 + if self.io_skip == IO_SKIP_LIMIT: + # We can't skip every time there's some task ready + # or else we might starve I/O waiting tasks + # when a lot of things are running at the same time + self.io_skip = 0 + timeout = 86400 + else: + # If there are either tasks or events and no I/O, don't wait + # (unless we already skipped this check too many times) + timeout = 0.0 elif self.paused: # If there are asleep tasks, wait until the closest deadline - timeout = max(0.0, self.paused[0][0] - self.clock()) + if not self.deadlines: + timeout = min([max(0.0, self.paused[0][0] - self.clock())]) + else: + deadline = self.deadlines.get() + timeout = min([max(0.0, self.paused[0][0] - self.clock()), deadline]) + if timeout != deadline: + # If a sleeping tasks has to run + # before another deadline, we schedule the former + # first and put back the latter on the queue + self.deadlines.put(deadline) else: # If there is *only* I/O, we wait a fixed amount of time - timeout = 1.0 - self.debugger.before_io(timeout) - io_ready = self.selector.select(timeout) - # Get sockets that are ready and schedule their tasks - for key, _ in io_ready: - self.tasks.append(key.data) # Resource ready? Schedule its task - self.debugger.after_io(self.clock() - before_time) + timeout = 86400 # Thanks trio :D + if self.selector.get_map(): + self.debugger.before_io(timeout) + io_ready = self.selector.select(timeout) + # Get sockets that are ready and schedule their tasks + for key, _ in io_ready: + self.tasks.append(key.data) # Resource ready? Schedule its task + self.debugger.after_io(self.clock() - before_time) + else: + # Since select() does not work with 0 fds registered + # we need to call time.sleep() if we need to pause + # and no I/O has been registered + wait(timeout) def start(self, func: types.FunctionType, *args): """ @@ -289,35 +329,19 @@ class AsyncScheduler: return set(waiter for waiter in (evt.waiters for evt in self.events)) - def cancel_all_from_current_pool(self): + def cancel_all_from_current_pool(self, pool=None): """ - Cancels all tasks in the current pool, - preparing for the exception throwing - from self.join + Cancels all tasks in the current pool (or the given one) """ - to_reschedule = [] - for to_cancel in chain(self.tasks, self.paused, self.get_event_tasks()): - if to_cancel.pool is self.current_pool: - try: - self.cancel(to_cancel) - except CancelledError: - # Task was cancelled - self.current_task.status = "cancelled" - self.current_task.cancelled = True - self.current_task.cancel_pending = False - self.debugger.after_cancel(self.current_task) - elif to_cancel.status == "sleep": - deadline = to_cancel.next_deadline - self.clock() - to_reschedule.append((to_cancel, deadline)) - else: - to_reschedule.append((to_cancel, None)) - for task, deadline in to_reschedule: - if deadline is not None: - self.paused.put(task, deadline) - # If there is other work to do (nested pools) - # we tell so to our caller - return bool(to_reschedule) + pool = pool or self.current_pool + if pool: + for to_cancel in pool.tasks: + self.cancel(to_cancel) + pool.cancelled = True + return all([t.cancelled or t.finished or t.exc for t in pool.tasks]) + else: # If we're at the main task, we're sure everything else exited + return True def cancel_all(self): """ @@ -326,22 +350,24 @@ class AsyncScheduler: """ for to_cancel in chain(self.tasks, self.paused, self.get_event_tasks()): - try: - self.cancel(to_cancel) - except CancelledError: - # Task was cancelled - self.current_task.status = "cancelled" - self.current_task.cancelled = True - self.current_task.cancel_pending = False - self.debugger.after_cancel(self.current_task) + self.cancel(to_cancel) + return all([t.cancelled or t.exc or t.finished for t in chain(self.tasks, self.paused, self.get_event_tasks())]) - def close(self): + def close(self, *, ensure_done: bool = True): """ Closes the event loop, terminating all tasks - inside it and tearing down any extra machinery + inside it and tearing down any extra machinery. + If ensure_done equals False, the loop will cancel *ALL* + running and scheduled tasks and then tear itself down. + If ensure_done equals False, which is the default behavior, + this method will raise a GiambioError if the loop hasn't + finished running. """ - self.cancel_all() + if ensure_done: + self.cancel_all() + elif not self.done(): + raise GiambioError("event loop not terminated, call this method with ensure_done=False to forcefully exit") self.shutdown() def join(self, task: Task): @@ -351,14 +377,23 @@ class AsyncScheduler: task.join() on the task object) """ + if self.current_pool is None: + if not self.done(): + return + else: + self.reschedule_joiners(task) + return task.joined = True if task.finished or task.cancelled: - self.reschedule_joiners(task) + if all([t.finished or t.cancelled for t in self.current_pool.tasks]): + self.reschedule_joiners(task) elif task.exc: - if not self.cancel_all_from_current_pool(): + if self.cancel_all_from_current_pool(): # This will reschedule the parent - # only if any enclosed pool has - # already exited, which is what we want + # only if all the tasks inside it + # have finished executing, either + # by cancellation, an exception + # or just returned self.reschedule_joiners(task) def sleep(self, seconds: int or float): @@ -375,19 +410,30 @@ class AsyncScheduler: else: self.tasks.append(self.current_task) - def cancel(self, task: Task = None): + def cancel(self, task: Task): """ Schedules the task to be cancelled later or does so straight away if it is safe to do so """ - task = task or self.current_task - if not task.finished and not task.exc: - if task.status in ("io", "sleep"): - # We cancel right away + if task.status in ("io", "sleep", "init"): + # We cancel immediately only in a context where it's safer to do + # so. The concept of "safer" is quite tricky, because even though the + # task is technically not running, it might leave some unfinished state + # or dangling resource open after being cancelled, so maybe we need + # a different approach altogether + try: self.do_cancel(task) - else: - task.cancel_pending = True # Cancellation is deferred + except CancelledError: + # Task was cancelled + task.status = "cancelled" + task.cancelled = True + task.cancel_pending = False + self.debugger.after_cancel(task) + else: + # If we can't cancel in a somewhat "graceful" way, we just + # defer this operation for later (check run() for more info) + task.cancel_pending = True # Cancellation is deferred def event_set(self, event): """ @@ -488,3 +534,10 @@ class AsyncScheduler: await want_write(sock) return sock.connect(addr) + + def __del__(self): + """ + Garbage collects itself + """ + + self.close() diff --git a/giambio/exceptions.py b/giambio/exceptions.py index 57ed1b4..ee2cdf6 100644 --- a/giambio/exceptions.py +++ b/giambio/exceptions.py @@ -16,9 +16,8 @@ See the License for the specific language governing permissions and limitations under the License. """ - -from typing import List import traceback +from typing import List class GiambioError(Exception): diff --git a/giambio/objects.py b/giambio/objects.py index 667f71a..8d57e5e 100644 --- a/giambio/objects.py +++ b/giambio/objects.py @@ -16,11 +16,11 @@ See the License for the specific language governing permissions and limitations under the License. """ -import types -from .traps import join, cancel, event_set, event_wait +from giambio.traps import join, cancel, event_set, event_wait from heapq import heappop, heappush -from .exceptions import GiambioError +from giambio.exceptions import GiambioError from dataclasses import dataclass, field +import typing @dataclass @@ -30,7 +30,7 @@ class Task: A simple wrapper around a coroutine object """ - coroutine: types.CoroutineType + coroutine: typing.Coroutine name: str pool: "giambio.context.TaskManager" cancelled: bool = False @@ -77,9 +77,6 @@ class Task: await cancel(self) - def __del__(self): - self.coroutine.close() - def __hash__(self): return hash(self.coroutine) @@ -129,6 +126,9 @@ class TimeQueue: self.clock = clock self.sequence = 0 + # The sequence number handles the race condition + # of two tasks with identical deadlines acting + # as a tie breaker self.container = [] def __contains__(self, item): @@ -152,7 +152,7 @@ class TimeQueue: def __repr__(self): return f"TimeQueue({self.container}, clock={self.clock})" - def put(self, item, amount): + def put(self, item, amount: float): """ Pushes an item onto the queue with its unique time amount and ID @@ -167,3 +167,49 @@ class TimeQueue: """ return heappop(self.container)[2] + + +class DeadlinesQueue(TimeQueue): + """ + An ordered queue for storing tasks deadlines + """ + + def __init__(self): + """ + Object constructor + """ + + super().__init__(None) + + def __contains__(self, item): + return super().__contains__(item) + + def __iter__(self): + return super().__iter__() + + def __next__(self): + return super().__next__() + + def __getitem__(self, item): + return super().__getitem__(item) + + def __bool__(self): + return super().__bool__() + + def __repr__(self): + return f"DeadlinesQueue({self.container})" + + def put(self, amount: float): + """ + Pushes a deadline (timeout) onto the queue + """ + + heappush(self.container, (amount, self.sequence)) + self.sequence += 1 + + def get(self): + """ + Gets the first task that is meant to run + """ + + return super().get() diff --git a/giambio/run.py b/giambio/run.py index eda281b..c15d583 100644 --- a/giambio/run.py +++ b/giambio/run.py @@ -18,11 +18,11 @@ limitations under the License. import inspect import threading -from .core import AsyncScheduler -from .exceptions import GiambioError -from .context import TaskManager +from giambio.core import AsyncScheduler +from giambio.exceptions import GiambioError +from giambio.context import TaskManager from timeit import default_timer -from .util.debug import BaseDebugger +from giambio.util.debug import BaseDebugger from types import FunctionType @@ -70,7 +70,7 @@ def run(func: FunctionType, *args, **kwargs): if inspect.iscoroutine(func): raise GiambioError("Looks like you tried to call giambio.run(your_func(arg1, arg2, ...)), that is wrong!" "\nWhat you wanna do, instead, is this: giambio.run(your_func, arg1, arg2, ...)") - elif not isinstance(func, FunctionType): + elif not inspect.iscoroutinefunction(func): raise GiambioError("giambio.run() requires an async function as parameter!") new_event_loop(kwargs.get("debugger", None), kwargs.get("clock", default_timer)) get_event_loop().start(func, *args) diff --git a/giambio/socket.py b/giambio/socket.py index 00f679c..a01261c 100644 --- a/giambio/socket.py +++ b/giambio/socket.py @@ -16,9 +16,9 @@ See the License for the specific language governing permissions and limitations under the License. """ -from .run import get_event_loop import socket -from .exceptions import ResourceClosed +from giambio.run import get_event_loop +from giambio.exceptions import ResourceClosed class AsyncSocket: diff --git a/giambio/traps.py b/giambio/traps.py index ba08e8a..fa4672a 100644 --- a/giambio/traps.py +++ b/giambio/traps.py @@ -39,7 +39,7 @@ def create_trap(method, *args): async def sleep(seconds: int): """ Pause the execution of an async function for a given amount of seconds, - without blocking the entire event loop, which keeps watching for other events + without blocking the entire event loop, which keeps watching for other events. This function is also useful as a sort of checkpoint, because it returns control to the scheduler, which can then switch to another task. If your code @@ -78,7 +78,7 @@ async def join(task): async def cancel(task): """ - Cancels the given task + Cancels the given task. The concept of cancellation is tricky, because there is no real way to 'stop' a task if not by raising an exception inside it and ignoring whatever it @@ -128,7 +128,11 @@ async def event_set(event): async def event_wait(event): """ Notifies the event loop that the current task has to wait - for the given event to trigger + for the given event to trigger. This trap returns + immediately if the event has already been set """ + if event.set: + return await create_trap("event_wait", event) + diff --git a/giambio/util/debug.py b/giambio/util/debug.py index c38c785..8d6988e 100644 --- a/giambio/util/debug.py +++ b/giambio/util/debug.py @@ -20,193 +20,192 @@ from giambio.objects import Task class BaseDebugger(ABC): - """ - The base for all debugger objects - """ + """ + The base for all debugger objects + """ - @abstractmethod - def on_start(self): - """ - This method is called when the event - loop starts executing - """ + @abstractmethod + def on_start(self): + """ + This method is called when the event + loop starts executing + """ - raise NotImplementedError + raise NotImplementedError - @abstractmethod - def on_exit(self): - """ - This method is called when the event - loop exits entirely (all tasks completed) - """ + @abstractmethod + def on_exit(self): + """ + This method is called when the event + loop exits entirely (all tasks completed) + """ - raise NotImplementedError + raise NotImplementedError - @abstractmethod - def on_task_schedule(self, task: Task, delay: float): - """ - This method is called when a new task is - scheduled (not spawned) + @abstractmethod + def on_task_schedule(self, task: Task, delay: float): + """ + This method is called when a new task is + scheduled (not spawned) - :param task: The Task object representing a - giambio Task and wrapping a coroutine - :type task: :class: giambio.objects.Task - :param delay: The delay, in seconds, after which - the task will start executing - :type delay: float - """ + :param task: The Task object representing a + giambio Task and wrapping a coroutine + :type task: :class: giambio.objects.Task + :param delay: The delay, in seconds, after which + the task will start executing + :type delay: float + """ - raise NotImplementedError + raise NotImplementedError - @abstractmethod - def on_task_spawn(self, task: Task): - """ - This method is called when a new task is - spawned + @abstractmethod + def on_task_spawn(self, task: Task): + """ + This method is called when a new task is + spawned - :param task: The Task object representing a - giambio Task and wrapping a coroutine - :type task: :class: giambio.objects.Task - """ + :param task: The Task object representing a + giambio Task and wrapping a coroutine + :type task: :class: giambio.objects.Task + """ - raise NotImplementedError + raise NotImplementedError - @abstractmethod - def on_task_exit(self, task: Task): - """ - This method is called when a task exits + @abstractmethod + def on_task_exit(self, task: Task): + """ + This method is called when a task exits - :param task: The Task object representing a - giambio Task and wrapping a coroutine - :type task: :class: giambio.objects.Task - """ + :param task: The Task object representing a + giambio Task and wrapping a coroutine + :type task: :class: giambio.objects.Task + """ - raise NotImplementedError + raise NotImplementedError - @abstractmethod - def before_task_step(self, task: Task): - """ - This method is called right before - calling a task's run() method + @abstractmethod + def before_task_step(self, task: Task): + """ + This method is called right before + calling a task's run() method - :param task: The Task object representing a - giambio Task and wrapping a coroutine - :type task: :class: giambio.objects.Task - """ + :param task: The Task object representing a + giambio Task and wrapping a coroutine + :type task: :class: giambio.objects.Task + """ - raise NotImplementedError + raise NotImplementedError - @abstractmethod - def after_task_step(self, task: Task): - """ - This method is called right after - calling a task's run() method + @abstractmethod + def after_task_step(self, task: Task): + """ + This method is called right after + calling a task's run() method - :param task: The Task object representing a - giambio Task and wrapping a coroutine - :type task: :class: giambio.objects.Task - """ + :param task: The Task object representing a + giambio Task and wrapping a coroutine + :type task: :class: giambio.objects.Task + """ - raise NotImplementedError + raise NotImplementedError - @abstractmethod - def before_sleep(self, task: Task, seconds: float): - """ - This method is called before a task goes - to sleep + @abstractmethod + def before_sleep(self, task: Task, seconds: float): + """ + This method is called before a task goes + to sleep - :param task: The Task object representing a - giambio Task and wrapping a coroutine - :type task: :class: giambio.objects.Task - :param seconds: The amount of seconds the - task wants to sleep - :type seconds: int - """ + :param task: The Task object representing a + giambio Task and wrapping a coroutine + :type task: :class: giambio.objects.Task + :param seconds: The amount of seconds the + task wants to sleep + :type seconds: int + """ - raise NotImplementedError + raise NotImplementedError - @abstractmethod - def after_sleep(self, task: Task, seconds: float): - """ - This method is called after a tasks - awakes from sleeping + @abstractmethod + def after_sleep(self, task: Task, seconds: float): + """ + This method is called after a tasks + awakes from sleeping - :param task: The Task object representing a - giambio Task and wrapping a coroutine - :type task: :class: giambio.objects.Task - :param seconds: The amount of seconds the - task actually slept - :type seconds: float - """ + :param task: The Task object representing a + giambio Task and wrapping a coroutine + :type task: :class: giambio.objects.Task + :param seconds: The amount of seconds the + task actually slept + :type seconds: float + """ - raise NotImplementedError + raise NotImplementedError - @abstractmethod - def before_io(self, timeout: float): - """ - This method is called right before - the event loop checks for I/O events + @abstractmethod + def before_io(self, timeout: float): + """ + This method is called right before + the event loop checks for I/O events - :param timeout: The max. amount of seconds - that the loop will hang when using the select() - system call - :type timeout: float - """ + :param timeout: The max. amount of seconds + that the loop will hang when using the select() + system call + :type timeout: float + """ - raise NotImplementedError + raise NotImplementedError - @abstractmethod - def after_io(self, timeout: float): - """ - This method is called right after - the event loop has checked for I/O events + @abstractmethod + def after_io(self, timeout: float): + """ + This method is called right after + the event loop has checked for I/O events - :param timeout: The actual amount of seconds - that the loop has hung when using the select() - system call - :type timeout: float - """ + :param timeout: The actual amount of seconds + that the loop has hung when using the select() + system call + :type timeout: float + """ - raise NotImplementedError + raise NotImplementedError - @abstractmethod - def before_cancel(self, task: Task): - """ - This method is called right before a task - gets cancelled + @abstractmethod + def before_cancel(self, task: Task): + """ + This method is called right before a task + gets cancelled - :param task: The Task object representing a - giambio Task and wrapping a coroutine - :type task: :class: giambio.objects.Task - """ + :param task: The Task object representing a + giambio Task and wrapping a coroutine + :type task: :class: giambio.objects.Task + """ - raise NotImplementedError + raise NotImplementedError - @abstractmethod - def after_cancel(self, task: Task): - """ - This method is called right after a task - gets cancelled + @abstractmethod + def after_cancel(self, task: Task) -> object: + """ + This method is called right after a task + gets cancelled - :param task: The Task object representing a - giambio Task and wrapping a coroutine - :type task: :class: giambio.objects.Task - """ + :param task: The Task object representing a + giambio Task and wrapping a coroutine + :type task: :class: giambio.objects.Task + """ - raise NotImplementedError + raise NotImplementedError - @abstractmethod - def on_exception_raised(self, task: Task, exc: BaseException): - """ - This method is called right after a task - has raised an exception + @abstractmethod + def on_exception_raised(self, task: Task, exc: BaseException): + """ + This method is called right after a task + has raised an exception - :param task: The Task object representing a - giambio Task and wrapping a coroutine - :type task: :class: giambio.objects.Task - :param exc: The exception that was raised - :type exc: BaseException - """ - - raise NotImplementedError + :param task: The Task object representing a + giambio Task and wrapping a coroutine + :type task: :class: giambio.objects.Task + :param exc: The exception that was raised + :type exc: BaseException + """ + raise NotImplementedError diff --git a/tests/cancel.py b/tests/cancel.py index c87cba9..e1a90c9 100644 --- a/tests/cancel.py +++ b/tests/cancel.py @@ -17,7 +17,7 @@ async def child1(): async def main(): start = giambio.clock() async with giambio.create_pool() as pool: - pool.spawn(child) + pool.spawn(child) # If you comment this line, the pool will exit immediately! task = pool.spawn(child1) await task.cancel() print("[main] Children spawned, awaiting completion") diff --git a/tests/cancel_pool.py b/tests/cancel_pool.py new file mode 100644 index 0000000..e5d5deb --- /dev/null +++ b/tests/cancel_pool.py @@ -0,0 +1,36 @@ +import giambio +from debugger import Debugger + + +async def child(): + print("[child] Child spawned!! Sleeping for 2 seconds") + await giambio.sleep(2) + print("[child] Had a nice nap!") + + +async def child1(): + print("[child 1] Child spawned!! Sleeping for 2 seconds") + await giambio.sleep(2) + print("[child 1] Had a nice nap!") + + +async def child2(): + print("[child 2] Child spawned!! Sleeping for 2 seconds") + await giambio.sleep(2) + print("[child 2] Had a nice nap!") + + +async def main(): + start = giambio.clock() + async with giambio.create_pool() as pool: + pool.spawn(child) + pool.spawn(child1) + # async with giambio.create_pool() as a_pool: + # a_pool.spawn(child2) + await pool.cancel() # This cancels the *whole* block + print("[main] Children spawned, awaiting completion") + print(f"[main] Children execution complete in {giambio.clock() - start:.2f} seconds") + + +if __name__ == "__main__": + giambio.run(main, debugger=Debugger()) diff --git a/tests/error_stack.py b/tests/error_stack.py deleted file mode 100644 index e97a15f..0000000 --- a/tests/error_stack.py +++ /dev/null @@ -1,34 +0,0 @@ -import giambio -from debugger import Debugger - - -# TODO: How to create a race condition of 2 exceptions at the same time? - -async def child(): - print("[child] Child spawned!! Sleeping for 2 seconds") - await giambio.sleep(2) - print("[child] Had a nice nap!") - - -async def child1(): - print("[child 1] Child spawned!! Sleeping for 2 seconds") - await giambio.sleep(2) - print("[child 1] Had a nice nap!") - raise Exception("bruh") - - -async def main(): - start = giambio.clock() - try: - async with giambio.create_pool() as pool: - pool.spawn(child) - pool.spawn(child1) - print("[main] Children spawned, awaiting completion") - except Exception as error: - # Because exceptions just *work*! - print(f"[main] Exception from child caught! {repr(error)}") - print(f"[main] Children execution complete in {giambio.clock() - start:.2f} seconds") - - -if __name__ == "__main__": - giambio.run(main, debugger=()) diff --git a/tests/exceptions.py b/tests/exceptions.py index ac4cb39..b0d480d 100644 --- a/tests/exceptions.py +++ b/tests/exceptions.py @@ -6,13 +6,14 @@ async def child(): print("[child] Child spawned!! Sleeping for 2 seconds") await giambio.sleep(2) print("[child] Had a nice nap!") - raise TypeError("rip") # Watch the exception magically propagate! + # raise TypeError("rip") async def child1(): - print("[child 1] Child spawned!! Sleeping for 2 seconds") - await giambio.sleep(2) + print("[child 1] Child spawned!! Sleeping for 4 seconds") + await giambio.sleep(4) print("[child 1] Had a nice nap!") + raise TypeError("rip") async def main(): @@ -29,4 +30,4 @@ async def main(): if __name__ == "__main__": - giambio.run(main, debugger=Debugger()) + giambio.run(main, debugger=()) diff --git a/tests/nested_exception.py b/tests/nested_exception.py index f2de202..2d21b09 100644 --- a/tests/nested_exception.py +++ b/tests/nested_exception.py @@ -21,6 +21,12 @@ async def child2(): print("[child 2] Had a nice nap!") +async def child3(): + print("[child 3] Child spawned!! Sleeping for 6 seconds") + await giambio.sleep(6) + print("[child 3] Had a nice nap!") + + async def main(): start = giambio.clock() try: @@ -29,7 +35,12 @@ async def main(): pool.spawn(child1) print("[main] Children spawned, awaiting completion") async with giambio.create_pool() as new_pool: + # This pool won't be affected from exceptions + # in outer pools. This is a guarantee that giambio + # ensures: an exception will only be propagated + # after all enclosed task pools have exited new_pool.spawn(child2) + new_pool.spawn(child3) print("[main] 3rd child spawned") except Exception as error: # Because exceptions just *work*! diff --git a/tests/server.py b/tests/server.py index 9b0e42a..3b17598 100644 --- a/tests/server.py +++ b/tests/server.py @@ -1,5 +1,6 @@ import giambio from giambio.socket import AsyncSocket +from debugger import Debugger import socket import logging import sys @@ -45,9 +46,8 @@ if __name__ == "__main__": port = int(sys.argv[1]) if len(sys.argv) > 1 else 1500 logging.basicConfig(level=20, format="[%(levelname)s] %(asctime)s %(message)s", datefmt="%d/%m/%Y %p") try: - giambio.run(serve, ("localhost", port)) + giambio.run(serve, ("localhost", port), debugger=None) except (Exception, KeyboardInterrupt) as error: # Exceptions propagate! - raise if isinstance(error, KeyboardInterrupt): logging.info("Ctrl+C detected, exiting") else: diff --git a/tests/sleep.py b/tests/sleep.py index 4184fbf..4153a16 100644 --- a/tests/sleep.py +++ b/tests/sleep.py @@ -2,8 +2,8 @@ import giambio async def child(): - print("[child] Child spawned!! Sleeping for 2 seconds") - await giambio.sleep(2) + print("[child] Child spawned!! Sleeping for 4 seconds") + await giambio.sleep(4) print("[child] Had a nice nap!")