diff --git a/experiment.py b/experiment.py index c2cade5..def0b8c 100644 --- a/experiment.py +++ b/experiment.py @@ -45,12 +45,17 @@ async def main(): print("Spawning countdown immediately, scheduling count for 4 secs from now") task = loop.spawn(countdown(8)) task1 = loop.schedule(count(6, 2), 4) # Schedules the task, it will be ran 4 seconds from now - await giambio.sleep(0) # TODO: Fix this to avoid the need to use a checkpoint before cancelling - await task.cancel() -# result = await task.join() # Would raise TaskError! + await giambio.sleep(2) # TODO: Fix this to avoid the need to use a checkpoint before cancelling +# await task.cancel() + result = await task.join() # Would raise TaskCancelled if the task was cancelled result = 'Task cancelled' if task.cancelled else task.result.val result1 = await task1.join() print(f"countdown returned: {result}\ncount returned: {result1}") print("All done") + print("PT. 2 Context Manager") + async with giambio.TaskManager(loop) as manager: + task2 = await manager.spawn(countdown(8)) + task3 = await manager.schedule(count(16, 2), 4) + print(manager.values) loop.start(main) diff --git a/giambio/abstractions.py b/giambio/abstractions.py index ee2f5c9..465dc2d 100644 --- a/giambio/abstractions.py +++ b/giambio/abstractions.py @@ -25,6 +25,7 @@ class Task: self.loop = loop # The EventLoop object that spawned the task self.cancelled = False self.execution = "INIT" + self.steps = 0 # How many steps did the task do before ending, incremented while executing def run(self): self.status = True @@ -40,8 +41,8 @@ class Task: async def cancel(self): return await cancel(self) - async def join(self): - return await join(self) + async def join(self, silent=False): + return await join(self, silent) def get_result(self): if self.result: diff --git a/giambio/core.py b/giambio/core.py index d5021f9..97088ed 100644 --- a/giambio/core.py +++ b/giambio/core.py @@ -50,6 +50,7 @@ class EventLoop: try: method, *args = self.running.run() getattr(self, method)(*args) # Sneaky method call, thanks to David Beazley for this ;) + self.running.steps += 1 except StopIteration as e: self.running.execution = "FINISH" self.running.result = Result(e.args[0] if e.args else None, None) # Saves the return value diff --git a/giambio/traps.py b/giambio/traps.py index 883abae..4064f1e 100644 --- a/giambio/traps.py +++ b/giambio/traps.py @@ -34,17 +34,21 @@ def want_write(sock: socket.socket): @types.coroutine -def join(task): - """'Tells' the scheduler that the desired task MUST be awaited for completion""" +def join(task, silent=False): + """'Tells' the scheduler that the desired task MUST be awaited for completion + If silent is True, any exception in the child task will be discarded""" if task.cancelled: - raise TaskCancelled("Cannot join cancelled task!") - if task.execution == "FINISH": - raise TaskFinished("Cannot join already terminated task!") + raise TaskCancelled("cannot join cancelled task!") task.joined = True yield "want_join", task - return task.get_result() # This raises an exception if the child task errored - + if silent: + try: + return task.get_result() # Exception silenced + except: + return None + else: + return task.get_result() # Will raise @types.coroutine def cancel(task): @@ -52,3 +56,13 @@ def cancel(task): yield "want_cancel", task +@types.coroutine +def join_unfinished(task): + """Same as join(), but it will raise a TaskFinished exception if the task already ended""" + + if task.execution == "FINISH": + raise TaskFinished("task has already ended!") + yield "want_join", task + task.joined = True + return task.get_result() + diff --git a/giambio/util.py b/giambio/util.py index 20cba33..ec212fb 100644 --- a/giambio/util.py +++ b/giambio/util.py @@ -6,17 +6,18 @@ class TaskManager(Task): """Class to be used inside context managers to spawn multiple tasks and be sure that they will all joined before the code exits the with block""" - def __init__(self, loop): + def __init__(self, loop, silent=False): self.tasks = deque() # All tasks spawned self.values = {} # Results OR exceptions of each task self.loop = loop + self.silent = silent async def __aenter__(self): return self async def __aexit__(self, *args): for task in self.tasks: - self.values[task.coroutine.__name__] = await task.join() + self.values[task.coroutine.__name__] = await task.join(self.silent) async def spawn(self, coro): task = self.loop.spawn(coro)