diff --git a/.gitignore b/.gitignore index f8efa38..a3201ab 100644 --- a/.gitignore +++ b/.gitignore @@ -132,3 +132,9 @@ dmypy.json .pyre/ .*.swp +.idea/.gitignore +.idea/giambio.iml +.idea/inspectionProfiles/ +.idea/misc.xml +.idea/modules.xml +.idea/vcs.xml diff --git a/giambio/__init__.py b/giambio/__init__.py index c684fa0..d04344f 100644 --- a/giambio/__init__.py +++ b/giambio/__init__.py @@ -24,7 +24,7 @@ from . import exceptions, socket from .socket import wrap_socket from .traps import sleep, current_task from .objects import Event -from .run import run, clock, create_pool, get_event_loop, new_event_loop +from .run import run, clock, create_pool, get_event_loop, new_event_loop, with_timeout from .util import debug __all__ = [ @@ -35,6 +35,7 @@ __all__ = [ "clock", "wrap_socket", "create_pool", + "with_timeout", "get_event_loop", "current_task", "new_event_loop", diff --git a/giambio/context.py b/giambio/context.py index d7b00a9..ba865d2 100644 --- a/giambio/context.py +++ b/giambio/context.py @@ -27,13 +27,15 @@ class TaskManager: An asynchronous context manager for giambio """ - def __init__(self, loop: AsyncScheduler) -> None: + def __init__(self, loop: AsyncScheduler, timeout: float = None) -> None: """ Object constructor """ self.loop = loop self.tasks = [] # We store a reference to all tasks, even the asleep ones! + self.started = self.loop.clock() + self.timeout = self.started + timeout def spawn(self, func: types.FunctionType, *args): """ @@ -66,4 +68,7 @@ class TaskManager: async def __aexit__(self, exc_type: Exception, exc: Exception, tb): for task in self.tasks: + # This forces Python to block at the + # end of the block and wait for all + # children to exit await task.join() diff --git a/giambio/core.py b/giambio/core.py index d0a292e..ec8497f 100644 --- a/giambio/core.py +++ b/giambio/core.py @@ -29,6 +29,7 @@ from selectors import DefaultSelector, EVENT_READ, EVENT_WRITE from .exceptions import (InternalError, CancelledError, ResourceBusy, + TooSlowError ) @@ -151,6 +152,12 @@ class AsyncScheduler: except AttributeError: # If this happens, that's quite bad! raise InternalError("Uh oh! Something very bad just happened, did" " you try to mix primitives from other async libraries?") from None + except CancelledError: + # Task was cancelled (pending cancellation) + self.current_task.status = "cancelled" + self.current_task.cancelled = True + self.current_task.cancel_pending = False + self.debugger.after_cancel(self.current_task) except StopIteration as ret: # Task finished executing self.current_task.status = "end" @@ -185,6 +192,17 @@ class AsyncScheduler: self.tasks.append(self.current_task) self.to_send = self.current_task + def check_timeouts(self): + """ + Checks for expired timeouts and raises appropriate + errors + """ + + if self.clock() >= self.current_pool.timeout: + # A pool with a timeout has expired! + self.cancel_all_from_current_pool() + raise TooSlowError() + def check_events(self): """ Checks for ready or expired events and triggers them diff --git a/giambio/exceptions.py b/giambio/exceptions.py index 04fc5c9..57ed1b4 100644 --- a/giambio/exceptions.py +++ b/giambio/exceptions.py @@ -64,6 +64,13 @@ class ResourceClosed(GiambioError): ... +class TooSlowError(GiambioError): + """ + This is raised if the timeout of a pool created using + giambio.with_timeout expires + """ + + class ErrorStack(GiambioError): """ This exception wraps multiple exceptions diff --git a/giambio/run.py b/giambio/run.py index 60a328a..eda281b 100644 --- a/giambio/run.py +++ b/giambio/run.py @@ -91,3 +91,11 @@ def create_pool(): """ return TaskManager(get_event_loop()) + + +def with_timeout(timeout: int or float): + """ + Creates an async pool with an associated timeout + """ + + return TaskManager(get_event_loop(), timeout)