Fixed lots of minor bugs (and created some new bugs, of course). Pool-level cancellation is broken, but everything else seems to work given the current tests

This commit is contained in:
nocturn9x 2020-12-05 17:09:59 +01:00
parent 40bcebbf5a
commit 98c9440115
15 changed files with 443 additions and 312 deletions

View File

@ -18,8 +18,8 @@ limitations under the License.
import types import types
from .objects import Task from giambio.objects import Task
from .core import AsyncScheduler from giambio.core import AsyncScheduler
class TaskManager: class TaskManager:
@ -33,9 +33,13 @@ class TaskManager:
""" """
self.loop = loop self.loop = loop
self.tasks = [] # We store a reference to all tasks, even the asleep ones! self.tasks = [] # We store a reference to all tasks in this pool, even the paused ones!
self.cancelled = False
self.started = self.loop.clock() self.started = self.loop.clock()
self.timeout = self.started + timeout if timeout:
self.timeout = self.started + timeout
else:
self.timeout = None
def spawn(self, func: types.FunctionType, *args): def spawn(self, func: types.FunctionType, *args):
""" """
@ -44,6 +48,7 @@ class TaskManager:
task = Task(func(*args), func.__name__ or str(func), self) task = Task(func(*args), func.__name__ or str(func), self)
task.joiners = [self.loop.current_task] task.joiners = [self.loop.current_task]
task.next_deadline = self.timeout or 0.0
self.loop.tasks.append(task) self.loop.tasks.append(task)
self.loop.debugger.on_task_spawn(task) self.loop.debugger.on_task_spawn(task)
self.tasks.append(task) self.tasks.append(task)
@ -57,6 +62,7 @@ class TaskManager:
assert n >= 0, "The time delay can't be negative" assert n >= 0, "The time delay can't be negative"
task = Task(func(*args), func.__name__ or str(func), self) task = Task(func(*args), func.__name__ or str(func), self)
task.joiners = [self.loop.current_task] task.joiners = [self.loop.current_task]
task.next_deadline = self.timeout or 0.0
task.sleep_start = self.loop.clock() task.sleep_start = self.loop.clock()
self.loop.paused.put(task, n) self.loop.paused.put(task, n)
self.loop.debugger.on_task_schedule(task, n) self.loop.debugger.on_task_schedule(task, n)
@ -68,7 +74,17 @@ class TaskManager:
async def __aexit__(self, exc_type: Exception, exc: Exception, tb): async def __aexit__(self, exc_type: Exception, exc: Exception, tb):
for task in self.tasks: for task in self.tasks:
# This forces Python to block at the # This forces Python to stop at the
# end of the block and wait for all # end of the block and wait for all
# children to exit # children to exit
await task.join() await task.join()
self.tasks.remove(task)
async def cancel(self):
"""
Cancels the whole block
"""
# TODO: This breaks, somehow, investigation needed
for task in self.tasks:
await task.cancel()

View File

@ -21,19 +21,21 @@ import types
import socket import socket
from time import sleep as wait from time import sleep as wait
from timeit import default_timer from timeit import default_timer
from .objects import Task, TimeQueue from giambio.objects import Task, TimeQueue, DeadlinesQueue
from .traps import want_read, want_write from giambio.traps import want_read, want_write
from .util.debug import BaseDebugger from giambio.util.debug import BaseDebugger
from itertools import chain from itertools import chain
from selectors import DefaultSelector, EVENT_READ, EVENT_WRITE from selectors import DefaultSelector, EVENT_READ, EVENT_WRITE
from .exceptions import (InternalError, from giambio.exceptions import (InternalError,
CancelledError, CancelledError,
ResourceBusy, ResourceBusy,
TooSlowError GiambioError,
) TooSlowError
)
IOInterrupt = (BlockingIOError, InterruptedError) IOInterrupt = (BlockingIOError, InterruptedError)
IO_SKIP_LIMIT = 5 # TODO: Inspect this
class AsyncScheduler: class AsyncScheduler:
@ -75,6 +77,12 @@ class AsyncScheduler:
self.has_ran = False self.has_ran = False
# The current pool # The current pool
self.current_pool = None self.current_pool = None
# How many times we skipped I/O checks to let a task run
# We limit the number of times we skip such checks to avoid
# I/O starvation in highly concurrent systems
self.io_skip = 0
# A heap queue of deadlines to be checked
self.deadlines = DeadlinesQueue()
def done(self): def done(self):
""" """
@ -107,17 +115,16 @@ class AsyncScheduler:
# sleeping tasks, no events to deliver, # sleeping tasks, no events to deliver,
# no I/O to do and no running tasks, we # no I/O to do and no running tasks, we
# simply tear us down and return to self.start # simply tear us down and return to self.start
self.shutdown() self.close()
break break
elif not self.tasks: elif not self.tasks:
# If there are no actively running tasks # We start by checking for I/O
# we try to schedule the asleep ones self.check_io()
if self.paused: if self.paused:
# Next, if there are no actively running tasks
# we try to schedule the asleep ones
self.awake_sleeping() self.awake_sleeping()
if self.selector.get_map(): # Then we try to awake event-waiting tasks
# The next step is checking for I/O
self.check_io()
# Try to awake event-waiting tasks
if self.events: if self.events:
self.check_events() self.check_events()
# Otherwise, while there are tasks ready to run, well, run them! # Otherwise, while there are tasks ready to run, well, run them!
@ -150,24 +157,47 @@ class AsyncScheduler:
# Sneaky method call, thanks to David Beazley for this ;) # Sneaky method call, thanks to David Beazley for this ;)
getattr(self, method)(*args) getattr(self, method)(*args)
except AttributeError: # If this happens, that's quite bad! except AttributeError: # If this happens, that's quite bad!
# This exception block is meant to be triggered by other async
# libraries, which most likely have different trap names and behaviors.
# If you get this exception and you're 100% sure you're not mixing
# async primitives from other libraries, then it's a bug!
raise InternalError("Uh oh! Something very bad just happened, did" raise InternalError("Uh oh! Something very bad just happened, did"
" you try to mix primitives from other async libraries?") from None " you try to mix primitives from other async libraries?") from None
except CancelledError: except CancelledError:
# Task was cancelled (pending cancellation) # When a task needs to be cancelled, giambio tries to do it gracefully
# first: if the task is paused in either I/O or sleeping, that's perfect.
# But we also need to cancel a task if it was not sleeping or waiting on
# any I/O because it could never do so (therefore blocking everything
# forever). So, when cancellation can't be done right away, we schedule
# if for the next execution step of the task. Giambio will also make sure
# to re-raise cancellations at every checkpoint until the task lets the
# exception propagate into us, because we *really* want the task to be
# cancelled, and since asking kindly didn't work we have to use some
# force :)
self.current_task.status = "cancelled" self.current_task.status = "cancelled"
self.current_task.cancelled = True self.current_task.cancelled = True
self.current_task.cancel_pending = False self.current_task.cancel_pending = False
self.debugger.after_cancel(self.current_task) self.debugger.after_cancel(self.current_task)
self.join(self.current_task)
except StopIteration as ret: except StopIteration as ret:
# Task finished executing # At the end of the day, coroutines are generator functions with
# some tricky behaviors, and this is one of them. When a coroutine
# hits a return statement (either explicit or implicit), it raises
# a StopIteration exception, which has an attribute named value that
# represents the return value of the coroutine, if any. Of course this
# exception is not an error and we should happily keep going after it,
# most of this code below is just useful for internal/debugging purposes
self.current_task.status = "end" self.current_task.status = "end"
self.current_task.result = ret.value self.current_task.result = ret.value
self.current_task.finished = True self.current_task.finished = True
self.debugger.on_task_exit(self.current_task) self.debugger.on_task_exit(self.current_task)
self.join(self.current_task) self.join(self.current_task)
except BaseException as err: except BaseException as err:
# Task raised an exception # TODO: We might want to do a bit more complex traceback hacking to remove any extra
# frames from the exception call stack, but for now removing at least the first one
# seems a sensible approach (it's us catching it so we don't care about that)
self.current_task.exc = err self.current_task.exc = err
self.current_task.exc.__traceback__ = self.current_task.exc.__traceback__.tb_next
self.current_task.status = "crashed" self.current_task.status = "crashed"
self.debugger.on_exception_raised(self.current_task, err) self.debugger.on_exception_raised(self.current_task, err)
self.join(self.current_task) # This propagates the exception self.join(self.current_task) # This propagates the exception
@ -192,17 +222,6 @@ class AsyncScheduler:
self.tasks.append(self.current_task) self.tasks.append(self.current_task)
self.to_send = self.current_task self.to_send = self.current_task
def check_timeouts(self):
"""
Checks for expired timeouts and raises appropriate
errors
"""
if self.clock() >= self.current_pool.timeout:
# A pool with a timeout has expired!
self.cancel_all_from_current_pool()
raise TooSlowError()
def check_events(self): def check_events(self):
""" """
Checks for ready or expired events and triggers them Checks for ready or expired events and triggers them
@ -216,20 +235,17 @@ class AsyncScheduler:
def awake_sleeping(self): def awake_sleeping(self):
""" """
Checks for and reschedules sleeping tasks Reschedules sleeping tasks if their deadline
has elapsed
""" """
wait(max(0.0, self.paused[0][0] - self.clock())) while self.paused and self.paused[0][0] < self.clock():
# Sleep until the closest deadline in order not to waste CPU cycles
while self.paused[0][0] < self.clock():
# Reschedules tasks when their deadline has elapsed # Reschedules tasks when their deadline has elapsed
task = self.paused.get() task = self.paused.get()
slept = self.clock() - task.sleep_start slept = self.clock() - task.sleep_start
task.sleep_start = None task.sleep_start = 0.0
self.tasks.append(task) self.tasks.append(task)
self.debugger.after_sleep(task, slept) self.debugger.after_sleep(task, slept)
if not self.paused:
break
def check_io(self): def check_io(self):
""" """
@ -238,20 +254,44 @@ class AsyncScheduler:
before_time = self.clock() # Used for the debugger before_time = self.clock() # Used for the debugger
if self.tasks or self.events: if self.tasks or self.events:
# If there are either tasks or events and no I/O, never wait self.io_skip += 1
timeout = 0.0 if self.io_skip == IO_SKIP_LIMIT:
# We can't skip every time there's some task ready
# or else we might starve I/O waiting tasks
# when a lot of things are running at the same time
self.io_skip = 0
timeout = 86400
else:
# If there are either tasks or events and no I/O, don't wait
# (unless we already skipped this check too many times)
timeout = 0.0
elif self.paused: elif self.paused:
# If there are asleep tasks, wait until the closest deadline # If there are asleep tasks, wait until the closest deadline
timeout = max(0.0, self.paused[0][0] - self.clock()) if not self.deadlines:
timeout = min([max(0.0, self.paused[0][0] - self.clock())])
else:
deadline = self.deadlines.get()
timeout = min([max(0.0, self.paused[0][0] - self.clock()), deadline])
if timeout != deadline:
# If a sleeping tasks has to run
# before another deadline, we schedule the former
# first and put back the latter on the queue
self.deadlines.put(deadline)
else: else:
# If there is *only* I/O, we wait a fixed amount of time # If there is *only* I/O, we wait a fixed amount of time
timeout = 1.0 timeout = 86400 # Thanks trio :D
self.debugger.before_io(timeout) if self.selector.get_map():
io_ready = self.selector.select(timeout) self.debugger.before_io(timeout)
# Get sockets that are ready and schedule their tasks io_ready = self.selector.select(timeout)
for key, _ in io_ready: # Get sockets that are ready and schedule their tasks
self.tasks.append(key.data) # Resource ready? Schedule its task for key, _ in io_ready:
self.debugger.after_io(self.clock() - before_time) self.tasks.append(key.data) # Resource ready? Schedule its task
self.debugger.after_io(self.clock() - before_time)
else:
# Since select() does not work with 0 fds registered
# we need to call time.sleep() if we need to pause
# and no I/O has been registered
wait(timeout)
def start(self, func: types.FunctionType, *args): def start(self, func: types.FunctionType, *args):
""" """
@ -289,35 +329,19 @@ class AsyncScheduler:
return set(waiter for waiter in (evt.waiters for evt in self.events)) return set(waiter for waiter in (evt.waiters for evt in self.events))
def cancel_all_from_current_pool(self): def cancel_all_from_current_pool(self, pool=None):
""" """
Cancels all tasks in the current pool, Cancels all tasks in the current pool (or the given one)
preparing for the exception throwing
from self.join
""" """
to_reschedule = [] pool = pool or self.current_pool
for to_cancel in chain(self.tasks, self.paused, self.get_event_tasks()): if pool:
if to_cancel.pool is self.current_pool: for to_cancel in pool.tasks:
try: self.cancel(to_cancel)
self.cancel(to_cancel) pool.cancelled = True
except CancelledError: return all([t.cancelled or t.finished or t.exc for t in pool.tasks])
# Task was cancelled else: # If we're at the main task, we're sure everything else exited
self.current_task.status = "cancelled" return True
self.current_task.cancelled = True
self.current_task.cancel_pending = False
self.debugger.after_cancel(self.current_task)
elif to_cancel.status == "sleep":
deadline = to_cancel.next_deadline - self.clock()
to_reschedule.append((to_cancel, deadline))
else:
to_reschedule.append((to_cancel, None))
for task, deadline in to_reschedule:
if deadline is not None:
self.paused.put(task, deadline)
# If there is other work to do (nested pools)
# we tell so to our caller
return bool(to_reschedule)
def cancel_all(self): def cancel_all(self):
""" """
@ -326,22 +350,24 @@ class AsyncScheduler:
""" """
for to_cancel in chain(self.tasks, self.paused, self.get_event_tasks()): for to_cancel in chain(self.tasks, self.paused, self.get_event_tasks()):
try: self.cancel(to_cancel)
self.cancel(to_cancel) return all([t.cancelled or t.exc or t.finished for t in chain(self.tasks, self.paused, self.get_event_tasks())])
except CancelledError:
# Task was cancelled
self.current_task.status = "cancelled"
self.current_task.cancelled = True
self.current_task.cancel_pending = False
self.debugger.after_cancel(self.current_task)
def close(self): def close(self, *, ensure_done: bool = True):
""" """
Closes the event loop, terminating all tasks Closes the event loop, terminating all tasks
inside it and tearing down any extra machinery inside it and tearing down any extra machinery.
If ensure_done equals False, the loop will cancel *ALL*
running and scheduled tasks and then tear itself down.
If ensure_done equals False, which is the default behavior,
this method will raise a GiambioError if the loop hasn't
finished running.
""" """
self.cancel_all() if ensure_done:
self.cancel_all()
elif not self.done():
raise GiambioError("event loop not terminated, call this method with ensure_done=False to forcefully exit")
self.shutdown() self.shutdown()
def join(self, task: Task): def join(self, task: Task):
@ -351,14 +377,23 @@ class AsyncScheduler:
task.join() on the task object) task.join() on the task object)
""" """
if self.current_pool is None:
if not self.done():
return
else:
self.reschedule_joiners(task)
return
task.joined = True task.joined = True
if task.finished or task.cancelled: if task.finished or task.cancelled:
self.reschedule_joiners(task) if all([t.finished or t.cancelled for t in self.current_pool.tasks]):
self.reschedule_joiners(task)
elif task.exc: elif task.exc:
if not self.cancel_all_from_current_pool(): if self.cancel_all_from_current_pool():
# This will reschedule the parent # This will reschedule the parent
# only if any enclosed pool has # only if all the tasks inside it
# already exited, which is what we want # have finished executing, either
# by cancellation, an exception
# or just returned
self.reschedule_joiners(task) self.reschedule_joiners(task)
def sleep(self, seconds: int or float): def sleep(self, seconds: int or float):
@ -375,19 +410,30 @@ class AsyncScheduler:
else: else:
self.tasks.append(self.current_task) self.tasks.append(self.current_task)
def cancel(self, task: Task = None): def cancel(self, task: Task):
""" """
Schedules the task to be cancelled later Schedules the task to be cancelled later
or does so straight away if it is safe to do so or does so straight away if it is safe to do so
""" """
task = task or self.current_task if task.status in ("io", "sleep", "init"):
if not task.finished and not task.exc: # We cancel immediately only in a context where it's safer to do
if task.status in ("io", "sleep"): # so. The concept of "safer" is quite tricky, because even though the
# We cancel right away # task is technically not running, it might leave some unfinished state
# or dangling resource open after being cancelled, so maybe we need
# a different approach altogether
try:
self.do_cancel(task) self.do_cancel(task)
else: except CancelledError:
task.cancel_pending = True # Cancellation is deferred # Task was cancelled
task.status = "cancelled"
task.cancelled = True
task.cancel_pending = False
self.debugger.after_cancel(task)
else:
# If we can't cancel in a somewhat "graceful" way, we just
# defer this operation for later (check run() for more info)
task.cancel_pending = True # Cancellation is deferred
def event_set(self, event): def event_set(self, event):
""" """
@ -488,3 +534,10 @@ class AsyncScheduler:
await want_write(sock) await want_write(sock)
return sock.connect(addr) return sock.connect(addr)
def __del__(self):
"""
Garbage collects itself
"""
self.close()

View File

@ -16,9 +16,8 @@ See the License for the specific language governing permissions and
limitations under the License. limitations under the License.
""" """
from typing import List
import traceback import traceback
from typing import List
class GiambioError(Exception): class GiambioError(Exception):

View File

@ -16,11 +16,11 @@ See the License for the specific language governing permissions and
limitations under the License. limitations under the License.
""" """
import types from giambio.traps import join, cancel, event_set, event_wait
from .traps import join, cancel, event_set, event_wait
from heapq import heappop, heappush from heapq import heappop, heappush
from .exceptions import GiambioError from giambio.exceptions import GiambioError
from dataclasses import dataclass, field from dataclasses import dataclass, field
import typing
@dataclass @dataclass
@ -30,7 +30,7 @@ class Task:
A simple wrapper around a coroutine object A simple wrapper around a coroutine object
""" """
coroutine: types.CoroutineType coroutine: typing.Coroutine
name: str name: str
pool: "giambio.context.TaskManager" pool: "giambio.context.TaskManager"
cancelled: bool = False cancelled: bool = False
@ -77,9 +77,6 @@ class Task:
await cancel(self) await cancel(self)
def __del__(self):
self.coroutine.close()
def __hash__(self): def __hash__(self):
return hash(self.coroutine) return hash(self.coroutine)
@ -129,6 +126,9 @@ class TimeQueue:
self.clock = clock self.clock = clock
self.sequence = 0 self.sequence = 0
# The sequence number handles the race condition
# of two tasks with identical deadlines acting
# as a tie breaker
self.container = [] self.container = []
def __contains__(self, item): def __contains__(self, item):
@ -152,7 +152,7 @@ class TimeQueue:
def __repr__(self): def __repr__(self):
return f"TimeQueue({self.container}, clock={self.clock})" return f"TimeQueue({self.container}, clock={self.clock})"
def put(self, item, amount): def put(self, item, amount: float):
""" """
Pushes an item onto the queue with its unique Pushes an item onto the queue with its unique
time amount and ID time amount and ID
@ -167,3 +167,49 @@ class TimeQueue:
""" """
return heappop(self.container)[2] return heappop(self.container)[2]
class DeadlinesQueue(TimeQueue):
"""
An ordered queue for storing tasks deadlines
"""
def __init__(self):
"""
Object constructor
"""
super().__init__(None)
def __contains__(self, item):
return super().__contains__(item)
def __iter__(self):
return super().__iter__()
def __next__(self):
return super().__next__()
def __getitem__(self, item):
return super().__getitem__(item)
def __bool__(self):
return super().__bool__()
def __repr__(self):
return f"DeadlinesQueue({self.container})"
def put(self, amount: float):
"""
Pushes a deadline (timeout) onto the queue
"""
heappush(self.container, (amount, self.sequence))
self.sequence += 1
def get(self):
"""
Gets the first task that is meant to run
"""
return super().get()

View File

@ -18,11 +18,11 @@ limitations under the License.
import inspect import inspect
import threading import threading
from .core import AsyncScheduler from giambio.core import AsyncScheduler
from .exceptions import GiambioError from giambio.exceptions import GiambioError
from .context import TaskManager from giambio.context import TaskManager
from timeit import default_timer from timeit import default_timer
from .util.debug import BaseDebugger from giambio.util.debug import BaseDebugger
from types import FunctionType from types import FunctionType
@ -70,7 +70,7 @@ def run(func: FunctionType, *args, **kwargs):
if inspect.iscoroutine(func): if inspect.iscoroutine(func):
raise GiambioError("Looks like you tried to call giambio.run(your_func(arg1, arg2, ...)), that is wrong!" raise GiambioError("Looks like you tried to call giambio.run(your_func(arg1, arg2, ...)), that is wrong!"
"\nWhat you wanna do, instead, is this: giambio.run(your_func, arg1, arg2, ...)") "\nWhat you wanna do, instead, is this: giambio.run(your_func, arg1, arg2, ...)")
elif not isinstance(func, FunctionType): elif not inspect.iscoroutinefunction(func):
raise GiambioError("giambio.run() requires an async function as parameter!") raise GiambioError("giambio.run() requires an async function as parameter!")
new_event_loop(kwargs.get("debugger", None), kwargs.get("clock", default_timer)) new_event_loop(kwargs.get("debugger", None), kwargs.get("clock", default_timer))
get_event_loop().start(func, *args) get_event_loop().start(func, *args)

View File

@ -16,9 +16,9 @@ See the License for the specific language governing permissions and
limitations under the License. limitations under the License.
""" """
from .run import get_event_loop
import socket import socket
from .exceptions import ResourceClosed from giambio.run import get_event_loop
from giambio.exceptions import ResourceClosed
class AsyncSocket: class AsyncSocket:

View File

@ -39,7 +39,7 @@ def create_trap(method, *args):
async def sleep(seconds: int): async def sleep(seconds: int):
""" """
Pause the execution of an async function for a given amount of seconds, Pause the execution of an async function for a given amount of seconds,
without blocking the entire event loop, which keeps watching for other events without blocking the entire event loop, which keeps watching for other events.
This function is also useful as a sort of checkpoint, because it returns This function is also useful as a sort of checkpoint, because it returns
control to the scheduler, which can then switch to another task. If your code control to the scheduler, which can then switch to another task. If your code
@ -78,7 +78,7 @@ async def join(task):
async def cancel(task): async def cancel(task):
""" """
Cancels the given task Cancels the given task.
The concept of cancellation is tricky, because there is no real way to 'stop' The concept of cancellation is tricky, because there is no real way to 'stop'
a task if not by raising an exception inside it and ignoring whatever it a task if not by raising an exception inside it and ignoring whatever it
@ -128,7 +128,11 @@ async def event_set(event):
async def event_wait(event): async def event_wait(event):
""" """
Notifies the event loop that the current task has to wait Notifies the event loop that the current task has to wait
for the given event to trigger for the given event to trigger. This trap returns
immediately if the event has already been set
""" """
if event.set:
return
await create_trap("event_wait", event) await create_trap("event_wait", event)

View File

@ -20,193 +20,192 @@ from giambio.objects import Task
class BaseDebugger(ABC): class BaseDebugger(ABC):
""" """
The base for all debugger objects The base for all debugger objects
""" """
@abstractmethod @abstractmethod
def on_start(self): def on_start(self):
""" """
This method is called when the event This method is called when the event
loop starts executing loop starts executing
""" """
raise NotImplementedError raise NotImplementedError
@abstractmethod @abstractmethod
def on_exit(self): def on_exit(self):
""" """
This method is called when the event This method is called when the event
loop exits entirely (all tasks completed) loop exits entirely (all tasks completed)
""" """
raise NotImplementedError raise NotImplementedError
@abstractmethod @abstractmethod
def on_task_schedule(self, task: Task, delay: float): def on_task_schedule(self, task: Task, delay: float):
""" """
This method is called when a new task is This method is called when a new task is
scheduled (not spawned) scheduled (not spawned)
:param task: The Task object representing a :param task: The Task object representing a
giambio Task and wrapping a coroutine giambio Task and wrapping a coroutine
:type task: :class: giambio.objects.Task :type task: :class: giambio.objects.Task
:param delay: The delay, in seconds, after which :param delay: The delay, in seconds, after which
the task will start executing the task will start executing
:type delay: float :type delay: float
""" """
raise NotImplementedError raise NotImplementedError
@abstractmethod @abstractmethod
def on_task_spawn(self, task: Task): def on_task_spawn(self, task: Task):
""" """
This method is called when a new task is This method is called when a new task is
spawned spawned
:param task: The Task object representing a :param task: The Task object representing a
giambio Task and wrapping a coroutine giambio Task and wrapping a coroutine
:type task: :class: giambio.objects.Task :type task: :class: giambio.objects.Task
""" """
raise NotImplementedError raise NotImplementedError
@abstractmethod @abstractmethod
def on_task_exit(self, task: Task): def on_task_exit(self, task: Task):
""" """
This method is called when a task exits This method is called when a task exits
:param task: The Task object representing a :param task: The Task object representing a
giambio Task and wrapping a coroutine giambio Task and wrapping a coroutine
:type task: :class: giambio.objects.Task :type task: :class: giambio.objects.Task
""" """
raise NotImplementedError raise NotImplementedError
@abstractmethod @abstractmethod
def before_task_step(self, task: Task): def before_task_step(self, task: Task):
""" """
This method is called right before This method is called right before
calling a task's run() method calling a task's run() method
:param task: The Task object representing a :param task: The Task object representing a
giambio Task and wrapping a coroutine giambio Task and wrapping a coroutine
:type task: :class: giambio.objects.Task :type task: :class: giambio.objects.Task
""" """
raise NotImplementedError raise NotImplementedError
@abstractmethod @abstractmethod
def after_task_step(self, task: Task): def after_task_step(self, task: Task):
""" """
This method is called right after This method is called right after
calling a task's run() method calling a task's run() method
:param task: The Task object representing a :param task: The Task object representing a
giambio Task and wrapping a coroutine giambio Task and wrapping a coroutine
:type task: :class: giambio.objects.Task :type task: :class: giambio.objects.Task
""" """
raise NotImplementedError raise NotImplementedError
@abstractmethod @abstractmethod
def before_sleep(self, task: Task, seconds: float): def before_sleep(self, task: Task, seconds: float):
""" """
This method is called before a task goes This method is called before a task goes
to sleep to sleep
:param task: The Task object representing a :param task: The Task object representing a
giambio Task and wrapping a coroutine giambio Task and wrapping a coroutine
:type task: :class: giambio.objects.Task :type task: :class: giambio.objects.Task
:param seconds: The amount of seconds the :param seconds: The amount of seconds the
task wants to sleep task wants to sleep
:type seconds: int :type seconds: int
""" """
raise NotImplementedError raise NotImplementedError
@abstractmethod @abstractmethod
def after_sleep(self, task: Task, seconds: float): def after_sleep(self, task: Task, seconds: float):
""" """
This method is called after a tasks This method is called after a tasks
awakes from sleeping awakes from sleeping
:param task: The Task object representing a :param task: The Task object representing a
giambio Task and wrapping a coroutine giambio Task and wrapping a coroutine
:type task: :class: giambio.objects.Task :type task: :class: giambio.objects.Task
:param seconds: The amount of seconds the :param seconds: The amount of seconds the
task actually slept task actually slept
:type seconds: float :type seconds: float
""" """
raise NotImplementedError raise NotImplementedError
@abstractmethod @abstractmethod
def before_io(self, timeout: float): def before_io(self, timeout: float):
""" """
This method is called right before This method is called right before
the event loop checks for I/O events the event loop checks for I/O events
:param timeout: The max. amount of seconds :param timeout: The max. amount of seconds
that the loop will hang when using the select() that the loop will hang when using the select()
system call system call
:type timeout: float :type timeout: float
""" """
raise NotImplementedError raise NotImplementedError
@abstractmethod @abstractmethod
def after_io(self, timeout: float): def after_io(self, timeout: float):
""" """
This method is called right after This method is called right after
the event loop has checked for I/O events the event loop has checked for I/O events
:param timeout: The actual amount of seconds :param timeout: The actual amount of seconds
that the loop has hung when using the select() that the loop has hung when using the select()
system call system call
:type timeout: float :type timeout: float
""" """
raise NotImplementedError raise NotImplementedError
@abstractmethod @abstractmethod
def before_cancel(self, task: Task): def before_cancel(self, task: Task):
""" """
This method is called right before a task This method is called right before a task
gets cancelled gets cancelled
:param task: The Task object representing a :param task: The Task object representing a
giambio Task and wrapping a coroutine giambio Task and wrapping a coroutine
:type task: :class: giambio.objects.Task :type task: :class: giambio.objects.Task
""" """
raise NotImplementedError raise NotImplementedError
@abstractmethod @abstractmethod
def after_cancel(self, task: Task): def after_cancel(self, task: Task) -> object:
""" """
This method is called right after a task This method is called right after a task
gets cancelled gets cancelled
:param task: The Task object representing a :param task: The Task object representing a
giambio Task and wrapping a coroutine giambio Task and wrapping a coroutine
:type task: :class: giambio.objects.Task :type task: :class: giambio.objects.Task
""" """
raise NotImplementedError raise NotImplementedError
@abstractmethod @abstractmethod
def on_exception_raised(self, task: Task, exc: BaseException): def on_exception_raised(self, task: Task, exc: BaseException):
""" """
This method is called right after a task This method is called right after a task
has raised an exception has raised an exception
:param task: The Task object representing a :param task: The Task object representing a
giambio Task and wrapping a coroutine giambio Task and wrapping a coroutine
:type task: :class: giambio.objects.Task :type task: :class: giambio.objects.Task
:param exc: The exception that was raised :param exc: The exception that was raised
:type exc: BaseException :type exc: BaseException
""" """
raise NotImplementedError
raise NotImplementedError

View File

@ -17,7 +17,7 @@ async def child1():
async def main(): async def main():
start = giambio.clock() start = giambio.clock()
async with giambio.create_pool() as pool: async with giambio.create_pool() as pool:
pool.spawn(child) pool.spawn(child) # If you comment this line, the pool will exit immediately!
task = pool.spawn(child1) task = pool.spawn(child1)
await task.cancel() await task.cancel()
print("[main] Children spawned, awaiting completion") print("[main] Children spawned, awaiting completion")

36
tests/cancel_pool.py Normal file
View File

@ -0,0 +1,36 @@
import giambio
from debugger import Debugger
async def child():
print("[child] Child spawned!! Sleeping for 2 seconds")
await giambio.sleep(2)
print("[child] Had a nice nap!")
async def child1():
print("[child 1] Child spawned!! Sleeping for 2 seconds")
await giambio.sleep(2)
print("[child 1] Had a nice nap!")
async def child2():
print("[child 2] Child spawned!! Sleeping for 2 seconds")
await giambio.sleep(2)
print("[child 2] Had a nice nap!")
async def main():
start = giambio.clock()
async with giambio.create_pool() as pool:
pool.spawn(child)
pool.spawn(child1)
# async with giambio.create_pool() as a_pool:
# a_pool.spawn(child2)
await pool.cancel() # This cancels the *whole* block
print("[main] Children spawned, awaiting completion")
print(f"[main] Children execution complete in {giambio.clock() - start:.2f} seconds")
if __name__ == "__main__":
giambio.run(main, debugger=Debugger())

View File

@ -1,34 +0,0 @@
import giambio
from debugger import Debugger
# TODO: How to create a race condition of 2 exceptions at the same time?
async def child():
print("[child] Child spawned!! Sleeping for 2 seconds")
await giambio.sleep(2)
print("[child] Had a nice nap!")
async def child1():
print("[child 1] Child spawned!! Sleeping for 2 seconds")
await giambio.sleep(2)
print("[child 1] Had a nice nap!")
raise Exception("bruh")
async def main():
start = giambio.clock()
try:
async with giambio.create_pool() as pool:
pool.spawn(child)
pool.spawn(child1)
print("[main] Children spawned, awaiting completion")
except Exception as error:
# Because exceptions just *work*!
print(f"[main] Exception from child caught! {repr(error)}")
print(f"[main] Children execution complete in {giambio.clock() - start:.2f} seconds")
if __name__ == "__main__":
giambio.run(main, debugger=())

View File

@ -6,13 +6,14 @@ async def child():
print("[child] Child spawned!! Sleeping for 2 seconds") print("[child] Child spawned!! Sleeping for 2 seconds")
await giambio.sleep(2) await giambio.sleep(2)
print("[child] Had a nice nap!") print("[child] Had a nice nap!")
raise TypeError("rip") # Watch the exception magically propagate! # raise TypeError("rip")
async def child1(): async def child1():
print("[child 1] Child spawned!! Sleeping for 2 seconds") print("[child 1] Child spawned!! Sleeping for 4 seconds")
await giambio.sleep(2) await giambio.sleep(4)
print("[child 1] Had a nice nap!") print("[child 1] Had a nice nap!")
raise TypeError("rip")
async def main(): async def main():
@ -29,4 +30,4 @@ async def main():
if __name__ == "__main__": if __name__ == "__main__":
giambio.run(main, debugger=Debugger()) giambio.run(main, debugger=())

View File

@ -21,6 +21,12 @@ async def child2():
print("[child 2] Had a nice nap!") print("[child 2] Had a nice nap!")
async def child3():
print("[child 3] Child spawned!! Sleeping for 6 seconds")
await giambio.sleep(6)
print("[child 3] Had a nice nap!")
async def main(): async def main():
start = giambio.clock() start = giambio.clock()
try: try:
@ -29,7 +35,12 @@ async def main():
pool.spawn(child1) pool.spawn(child1)
print("[main] Children spawned, awaiting completion") print("[main] Children spawned, awaiting completion")
async with giambio.create_pool() as new_pool: async with giambio.create_pool() as new_pool:
# This pool won't be affected from exceptions
# in outer pools. This is a guarantee that giambio
# ensures: an exception will only be propagated
# after all enclosed task pools have exited
new_pool.spawn(child2) new_pool.spawn(child2)
new_pool.spawn(child3)
print("[main] 3rd child spawned") print("[main] 3rd child spawned")
except Exception as error: except Exception as error:
# Because exceptions just *work*! # Because exceptions just *work*!

View File

@ -1,5 +1,6 @@
import giambio import giambio
from giambio.socket import AsyncSocket from giambio.socket import AsyncSocket
from debugger import Debugger
import socket import socket
import logging import logging
import sys import sys
@ -45,9 +46,8 @@ if __name__ == "__main__":
port = int(sys.argv[1]) if len(sys.argv) > 1 else 1500 port = int(sys.argv[1]) if len(sys.argv) > 1 else 1500
logging.basicConfig(level=20, format="[%(levelname)s] %(asctime)s %(message)s", datefmt="%d/%m/%Y %p") logging.basicConfig(level=20, format="[%(levelname)s] %(asctime)s %(message)s", datefmt="%d/%m/%Y %p")
try: try:
giambio.run(serve, ("localhost", port)) giambio.run(serve, ("localhost", port), debugger=None)
except (Exception, KeyboardInterrupt) as error: # Exceptions propagate! except (Exception, KeyboardInterrupt) as error: # Exceptions propagate!
raise
if isinstance(error, KeyboardInterrupt): if isinstance(error, KeyboardInterrupt):
logging.info("Ctrl+C detected, exiting") logging.info("Ctrl+C detected, exiting")
else: else:

View File

@ -2,8 +2,8 @@ import giambio
async def child(): async def child():
print("[child] Child spawned!! Sleeping for 2 seconds") print("[child] Child spawned!! Sleeping for 4 seconds")
await giambio.sleep(2) await giambio.sleep(4)
print("[child] Had a nice nap!") print("[child] Had a nice nap!")