diff --git a/giambio/__init__.py b/giambio/__init__.py index f3a4e94..90235a6 100644 --- a/giambio/__init__.py +++ b/giambio/__init__.py @@ -1,8 +1,8 @@ __author__ = "Nocturn9x aka Isgiambyy" __version__ = (0, 0, 1) from .core import EventLoop -from .exceptions import GiambioError, AlreadyJoinedError, CancelledError +from .exceptions import GiambioError, AlreadyJoinedError, CancelledError, TaskCancelled from .traps import sleep, join +from .util import TaskManager - -__all__ = ["EventLoop", "sleep", "join", "GiambioError", "AlreadyJoinedError", "CancelledError"] +__all__ = ["EventLoop", "sleep", "join", "GiambioError", "AlreadyJoinedError", "CancelledError", "TaskManager", "TaskCancelled"] diff --git a/giambio/abstractions.py b/giambio/abstractions.py index 16596f8..ee2f5c9 100644 --- a/giambio/abstractions.py +++ b/giambio/abstractions.py @@ -1,5 +1,6 @@ import types from .traps import join, sleep, want_read, want_write, cancel +from .exceptions import CancelledError class Result: """A wrapper for results of coroutines""" @@ -23,6 +24,7 @@ class Task: self.result = None # Updated when the coroutine execution ends self.loop = loop # The EventLoop object that spawned the task self.cancelled = False + self.execution = "INIT" def run(self): self.status = True diff --git a/giambio/core.py b/giambio/core.py index b84178b..d5021f9 100644 --- a/giambio/core.py +++ b/giambio/core.py @@ -11,6 +11,7 @@ from .abstractions import Task, Result from socket import SOL_SOCKET, SO_ERROR from .traps import join, sleep, want_read, want_write, cancel + class EventLoop: """Implementation of an event loop, alternates between execution of coroutines (asynchronous functions) @@ -50,6 +51,7 @@ class EventLoop: method, *args = self.running.run() getattr(self, method)(*args) # Sneaky method call, thanks to David Beazley for this ;) except StopIteration as e: + self.running.execution = "FINISH" self.running.result = Result(e.args[0] if e.args else None, None) # Saves the return value self.to_run.extend(self.joined.pop(self.running, ())) # Reschedules the parent task except RuntimeError: diff --git a/giambio/exceptions.py b/giambio/exceptions.py index 24d3ead..6bd2fcd 100644 --- a/giambio/exceptions.py +++ b/giambio/exceptions.py @@ -14,4 +14,7 @@ class CancelledError(GiambioError): return "giambio.exceptions.CancelledError" class TaskCancelled(GiambioError): - """This exception is raised when the user attempts to join a cancelled task""" \ No newline at end of file + """This exception is raised when the user attempts to join a cancelled task""" + +class TaskFinished(TaskCancelled): + """This exception is raised when the user attempts to join an already finished task""" diff --git a/giambio/traps.py b/giambio/traps.py index 9f45c94..883abae 100644 --- a/giambio/traps.py +++ b/giambio/traps.py @@ -39,6 +39,8 @@ def join(task): if task.cancelled: raise TaskCancelled("Cannot join cancelled task!") + if task.execution == "FINISH": + raise TaskFinished("Cannot join already terminated task!") task.joined = True yield "want_join", task return task.get_result() # This raises an exception if the child task errored diff --git a/giambio/util.py b/giambio/util.py new file mode 100644 index 0000000..20cba33 --- /dev/null +++ b/giambio/util.py @@ -0,0 +1,29 @@ +from .abstractions import Task +from collections import deque + + +class TaskManager(Task): + """Class to be used inside context managers to spawn multiple tasks and be sure that they will all joined before the code exits the with block""" + + + def __init__(self, loop): + self.tasks = deque() # All tasks spawned + self.values = {} # Results OR exceptions of each task + self.loop = loop + + async def __aenter__(self): + return self + + async def __aexit__(self, *args): + for task in self.tasks: + self.values[task.coroutine.__name__] = await task.join() + + async def spawn(self, coro): + task = self.loop.spawn(coro) + self.tasks.append(task) + return task + + async def schedule(self, coro, delay): + task = self.loop.schedule(coro, delay) + self.tasks.append(task) + return task