Started to work on giambio.with_timeout and fixed a little bug with deferred cancellation

This commit is contained in:
nocturn9x 2020-11-29 20:16:08 +01:00
parent 435ca2e47c
commit 40bcebbf5a
6 changed files with 47 additions and 2 deletions

6
.gitignore vendored
View File

@ -132,3 +132,9 @@ dmypy.json
.pyre/
.*.swp
.idea/.gitignore
.idea/giambio.iml
.idea/inspectionProfiles/
.idea/misc.xml
.idea/modules.xml
.idea/vcs.xml

View File

@ -24,7 +24,7 @@ from . import exceptions, socket
from .socket import wrap_socket
from .traps import sleep, current_task
from .objects import Event
from .run import run, clock, create_pool, get_event_loop, new_event_loop
from .run import run, clock, create_pool, get_event_loop, new_event_loop, with_timeout
from .util import debug
__all__ = [
@ -35,6 +35,7 @@ __all__ = [
"clock",
"wrap_socket",
"create_pool",
"with_timeout",
"get_event_loop",
"current_task",
"new_event_loop",

View File

@ -27,13 +27,15 @@ class TaskManager:
An asynchronous context manager for giambio
"""
def __init__(self, loop: AsyncScheduler) -> None:
def __init__(self, loop: AsyncScheduler, timeout: float = None) -> None:
"""
Object constructor
"""
self.loop = loop
self.tasks = [] # We store a reference to all tasks, even the asleep ones!
self.started = self.loop.clock()
self.timeout = self.started + timeout
def spawn(self, func: types.FunctionType, *args):
"""
@ -66,4 +68,7 @@ class TaskManager:
async def __aexit__(self, exc_type: Exception, exc: Exception, tb):
for task in self.tasks:
# This forces Python to block at the
# end of the block and wait for all
# children to exit
await task.join()

View File

@ -29,6 +29,7 @@ from selectors import DefaultSelector, EVENT_READ, EVENT_WRITE
from .exceptions import (InternalError,
CancelledError,
ResourceBusy,
TooSlowError
)
@ -151,6 +152,12 @@ class AsyncScheduler:
except AttributeError: # If this happens, that's quite bad!
raise InternalError("Uh oh! Something very bad just happened, did"
" you try to mix primitives from other async libraries?") from None
except CancelledError:
# Task was cancelled (pending cancellation)
self.current_task.status = "cancelled"
self.current_task.cancelled = True
self.current_task.cancel_pending = False
self.debugger.after_cancel(self.current_task)
except StopIteration as ret:
# Task finished executing
self.current_task.status = "end"
@ -185,6 +192,17 @@ class AsyncScheduler:
self.tasks.append(self.current_task)
self.to_send = self.current_task
def check_timeouts(self):
"""
Checks for expired timeouts and raises appropriate
errors
"""
if self.clock() >= self.current_pool.timeout:
# A pool with a timeout has expired!
self.cancel_all_from_current_pool()
raise TooSlowError()
def check_events(self):
"""
Checks for ready or expired events and triggers them

View File

@ -64,6 +64,13 @@ class ResourceClosed(GiambioError):
...
class TooSlowError(GiambioError):
"""
This is raised if the timeout of a pool created using
giambio.with_timeout expires
"""
class ErrorStack(GiambioError):
"""
This exception wraps multiple exceptions

View File

@ -91,3 +91,11 @@ def create_pool():
"""
return TaskManager(get_event_loop())
def with_timeout(timeout: int or float):
"""
Creates an async pool with an associated timeout
"""
return TaskManager(get_event_loop(), timeout)